enabled syntax highlighting for queries

This commit is contained in:
Lucas Jensen
2024-05-25 18:32:38 -07:00
parent b35e8406a6
commit 32f9a02528
14 changed files with 198 additions and 169 deletions

View File

@@ -1,6 +1,8 @@
from typing import Callable
from icecream import ic
from mysql.connector.connection import MySQLConnection
from mysql.connector.cursor import MySQLCursor
from app.db.conn import connect_db
@@ -15,25 +17,39 @@ class BaseQueries:
def __init__(self) -> None:
self.table: str = None # type: ignore
self.connect_db: Callable = connect_db
self.connect_db: Callable[[], MySQLConnection] = connect_db
def select_all_series(self) -> list[dict]:
query = f"SELECT * FROM {self.table}"
db = connect_db()
cursor = db.cursor(dictionary=True)
def select_all(self) -> list[dict]:
query = f"""-- sql
SELECT * FROM {self.table}
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query)
data = cursor.fetchall()
cursor.close()
db.close()
return data # type: ignore
data: list[dict] = cursor.fetchall() # type: ignore
self.close_cursor_and_conn(cursor, conn)
fake_query = f"""-- sql
select * from dogs
"""
return data
def select_one_by_id(self, id: int) -> dict | None:
query = f"SELECT * FROM {self.table} WHERE id = %s"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
query = f"""-- sql
SELECT * FROM {self.table} WHERE id = %s
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (id,))
data = cursor.fetchone()
cursor.close()
db.close()
data: dict | None = cursor.fetchone() # type: ignore
self.close_cursor_and_conn(cursor, conn)
return data # type: ignore
return data
def get_cursor_and_conn(self) -> tuple[MySQLCursor, MySQLConnection]:
conn = self.connect_db()
cursor = conn.cursor(dictionary=True)
return cursor, conn
def close_cursor_and_conn(self, cursor: MySQLCursor, conn: MySQLConnection) -> None:
cursor.close()
conn.close()

View File

@@ -1,5 +1,3 @@
from asyncio import gather
from icecream import ic
from app.constants import EVENT_TABLE, SERIES_TABLE
@@ -15,52 +13,46 @@ class EventQueries(BaseQueries):
def __init__(self) -> None:
super().__init__()
self.table = SERIES_TABLE
def select_one_by_id(self, series_id: int) -> list[dict] | None:
query = f"""
SELECT s.series_id , s.name , s.description , s.poster_id , e.event_id , e.location , e.`time` , e.ticket_url , e.map_url
FROM {SERIES_TABLE} s
LEFT JOIN {EVENT_TABLE} e
ON s.series_id = e.series_id
WHERE s.series_id = %s
"""
db = self.connect_db()
cursor = db.cursor(dictionary=True)
query = f"""-- sql
SELECT s.series_id , s.name , s.description , s.poster_id , e.event_id , e.location , e.`time` , e.ticket_url , e.map_url
FROM {SERIES_TABLE} s
LEFT JOIN {EVENT_TABLE} e
ON s.series_id = e.series_id
WHERE s.series_id = %s
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (series_id,))
data = cursor.fetchall()
cursor.close()
db.close()
data: list[dict] = cursor.fetchall() # type: ignore
self.close_cursor_and_conn(cursor, conn)
return data
def select_all_series(self) -> list[dict]:
def select_all(self) -> list[dict]:
"""
Queries for all Series and Event info and returns a list of dictionaries.
Data is gathered with a LEFT JOIN on the Event table to ensure all Series are returned.
A Series with no Events is valid.
"""
query = f"""
query = f"""-- sql
SELECT s.series_id , s.name , s.description , s.poster_id , e.event_id , e.location , e.`time` , e.ticket_url , e.map_url
FROM {SERIES_TABLE} s
LEFT JOIN {EVENT_TABLE} e
ON s.series_id = e.series_id
"""
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query)
data = cursor.fetchall()
cursor.close()
db.close()
data: list[dict] = cursor.fetchall() # type: ignore
self.close_cursor_and_conn(cursor, conn)
return data
def insert_one_series(self, series: NewEventSeries) -> int:
query = f"""
INSERT INTO {self.table} (name, description)
query = f"""-- sql
INSERT INTO {SERIES_TABLE} (name, description)
VALUES (%s, %s)
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
cursor.execute(
query,
(
@@ -69,91 +61,87 @@ class EventQueries(BaseQueries):
),
)
inserted_id = cursor.lastrowid
db.commit()
cursor.close()
db.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)
if inserted_id is None:
raise Exception("insertion error")
return inserted_id
def insert_one_event(self, event: NewEvent, series_id: int) -> int:
query = f"""
query = f"""-- sql
INSERT INTO {EVENT_TABLE} (series_id, location, time, ticket_url, map_url)
VALUES (%s, %s, %s, %s, %s)
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
ticket_url = str(event.ticket_url) if event.ticket_url else None
map_url = str(event.map_url) if event.map_url else None
cursor.execute(
query, (series_id, event.location, event.time, ticket_url, map_url)
)
iserted_id = cursor.lastrowid
db.commit()
cursor.close()
db.close()
return iserted_id
inserted_id = cursor.lastrowid
conn.commit()
self.close_cursor_and_conn(cursor, conn)
if inserted_id is None:
raise Exception("error inserting event")
return inserted_id
def delete_events_by_series(self, series: EventSeries) -> None:
query = f"""
query = f"""-- sql
DELETE FROM {EVENT_TABLE}
WHERE series_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (series.series_id,))
db.commit()
cursor.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)
def delete_one_series(self, series: EventSeries) -> None:
query = f"""
DELETE FROM {self.table}
query = f"""-- sql
DELETE FROM {SERIES_TABLE}
WHERE series_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (series.series_id,))
db.commit()
cursor.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)
def update_series_poster(self, series: EventSeries) -> None:
query = f"""
UPDATE {self.table}
query = f"""-- sql
UPDATE {SERIES_TABLE}
SET poster_id = %s
WHERE series_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (series.poster_id, series.series_id))
db.commit()
cursor.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)
def replace_event(self, event: Event) -> None:
query = f"""
query = f"""-- sql
UPDATE {EVENT_TABLE}
SET location = %s, time = %s, ticket_url = %s, map_url = %s
WHERE event_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
ticket_url = str(event.ticket_url) if event.ticket_url else None
map_url = str(event.map_url) if event.map_url else None
cursor.execute(
query, (event.location, event.time, ticket_url, map_url, event.event_id)
)
db.commit()
cursor.close()
db.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)
def replace_series(self, series: EventSeries) -> None:
query = f"""
UPDATE {self.table}
query = f"""-- sql
UPDATE {SERIES_TABLE}
SET name = %s, description = %s, poster_id = %s
WHERE series_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
cursor, conn = self.get_cursor_and_conn()
cursor.execute(
query, (series.name, series.description, series.poster_id, series.series_id)
)
db.commit()
cursor.close()
db.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)

View File

@@ -8,29 +8,29 @@ class GroupQueries(BaseQueries):
self.table = GROUP_TABLE
def select_one_by_id(self) -> dict:
query = f"SELECT * FROM {self.table}"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
query = f"""-- sql
SELECT * FROM {self.table}
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query)
data = cursor.fetchone()
cursor.close()
db.close()
data: dict = cursor.fetchone() # type: ignore
self.close_cursor_and_conn(cursor, conn)
if not data:
raise Exception("error retrieving group")
return data
def select_all_series(self) -> None:
def select_all(self) -> None:
raise NotImplementedError(
"get_all method not implemented for GroupQueries. There's only one row in the table."
)
def update_group_bio(self, bio: str) -> None:
db = self.connect_db()
cursor = db.cursor()
query = f"UPDATE {self.table} SET bio = %s WHERE id = 1" # only one row in the table
cursor, conn = self.get_cursor_and_conn()
query = f"""-- sql
UPDATE {self.table} SET bio = %s WHERE id = 1
""" # only one row in the table
cursor.execute(query, (bio,))
db.commit()
cursor.close()
db.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)

View File

@@ -20,7 +20,9 @@ class MusicianQueries(BaseQueries):
"""
db = connect_db()
cursor = db.cursor()
query = f"UPDATE {self.table} SET bio = %s WHERE id = %s"
query = f"""-- sql
UPDATE {self.table} SET bio = %s WHERE id = %s
"""
cursor.execute(query, (bio, musician.id))
db.commit()
cursor.close()
@@ -36,7 +38,9 @@ class MusicianQueries(BaseQueries):
"""
db = connect_db()
cursor = db.cursor()
query = f"UPDATE {self.table} SET headshot_id = %s WHERE id = %s"
query = f"""-- sql
UPDATE {self.table} SET headshot_id = %s WHERE id = %s
"""
cursor.execute(query, (headshot_id, musician.id))
db.commit()
cursor.close()

View File

@@ -3,40 +3,59 @@ from app.db.base_queries import BaseQueries
class UserQueries(BaseQueries):
"""
Used for quering the database for User related data
"""
def __init__(self) -> None:
super().__init__()
self.table = USER_TABLE
def select_one_by_email(self, email: str) -> dict | None:
query = f"SELECT * FROM {self.table} WHERE email = %s"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
"""
Select one user by their email address
:param str email: user email
:return dict | None: a dictionary of found data, or None if no user found
"""
query = f"""-- sql
SELECT * FROM {USER_TABLE} WHERE email = %s
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (email,))
data = cursor.fetchone()
cursor.close()
db.close()
data: dict = cursor.fetchone() # type: ignore
self.close_cursor_and_conn(cursor, conn)
return data
def select_one_by_sub(self, sub: str) -> dict | None:
query = f"SELECT * FROM {self.table} WHERE sub = %s"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query, (sub,))
data = cursor.fetchone()
cursor.close()
db.close()
"""
Select one user by their unique sub identifier
if not data:
return None
:param str sub: user sub
:return dict | None: a dict of user data
"""
query = f"""-- sql
SELECT * FROM {USER_TABLE} WHERE sub = %s
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (sub,))
data: dict = cursor.fetchone() # type: ignore
self.close_cursor_and_conn(cursor, conn)
return data
def update_sub(self, email: str, sub: str) -> None:
query = f"UPDATE {self.table} SET sub = %s WHERE email = %s"
db = self.connect_db()
cursor = db.cursor()
"""
Update user sub. Used when a user logs in for the first time.
:param str email: user email
:param str sub: the new unique sub identifier
"""
query = f"""-- sql
UPDATE {USER_TABLE} SET sub = %s WHERE email = %s
"""
cursor, conn = self.get_cursor_and_conn()
cursor.execute(query, (sub, email))
db.commit()
cursor.close()
db.close()
conn.commit()
self.close_cursor_and_conn(cursor, conn)