initial commit

This commit is contained in:
Lucas Jensen
2024-05-01 09:19:01 -07:00
commit 5d67c0c2b2
117 changed files with 9917 additions and 0 deletions

View File

@@ -0,0 +1,9 @@
from .events import EventQueries
from .group import GroupQueries
from .musicians import MusicianQueries
from .users import UserQueries
event_queries = EventQueries()
user_queries = UserQueries()
musician_queries = MusicianQueries()
group_queries = GroupQueries()

View File

@@ -0,0 +1,32 @@
from typing import Callable
from app.db.conn import connect_db
class BaseQueries:
from icecream import ic
def __init__(self) -> None:
self.table: str = None # type: ignore
self.connect_db: Callable = connect_db
async def get_all(self) -> list[dict]:
query = f"SELECT * FROM {self.table}"
db = connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query)
data = cursor.fetchall()
cursor.close()
db.close()
return data # type: ignore
async def get_one(self, id: int) -> dict | None:
query = f"SELECT * FROM {self.table} WHERE id = %s"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query, (id,))
data = cursor.fetchone()
cursor.close()
db.close()
return data # type: ignore

31
server/app/db/conn.py Normal file
View File

@@ -0,0 +1,31 @@
import os
import mysql.connector
from dotenv import load_dotenv
class DBException(Exception):
pass
def connect_db() -> mysql.connector.MySQLConnection:
load_dotenv()
host = os.getenv("DB_HOST")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
database = os.getenv("DB_DATABASE")
if None in [host, user, password, database]:
raise DBException("Missing database credentials")
try:
return mysql.connector.connect(
host=host,
user=user,
password=password,
database=database,
auth_plugin="mysql_native_password",
) # type: ignore
except mysql.connector.Error as err:
raise DBException("Could not connect to database") from err

140
server/app/db/events.py Normal file
View File

@@ -0,0 +1,140 @@
from asyncio import gather
from icecream import ic
from app.db.base_queries import BaseQueries
from app.models.event import (
EVENT_TABLE,
SERIES_TABLE,
Event,
EventSeries,
NewEvent,
NewEventSeries,
)
class EventQueries(BaseQueries):
def __init__(self) -> None:
super().__init__()
self.table = SERIES_TABLE
async def get_one(self, series_id: int) -> list[dict] | None:
query = f"""
SELECT *
FROM {SERIES_TABLE}
INNER JOIN {EVENT_TABLE}
ON {SERIES_TABLE}.series_id = {EVENT_TABLE}.series_id
WHERE {SERIES_TABLE}.series_id = %s
"""
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query, (series_id,))
data = cursor.fetchall()
cursor.close()
db.close()
return data
async def get_all(self) -> list[dict]:
query = f"""
SELECT *
FROM {SERIES_TABLE}
INNER JOIN {EVENT_TABLE}
ON {SERIES_TABLE}.series_id = {EVENT_TABLE}.series_id
"""
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query)
data = cursor.fetchall()
cursor.close()
db.close()
return data
async def insert_one_series(self, series: NewEventSeries) -> int:
query = f"INSERT INTO {self.table} (name, description) VALUES (%s, %s)"
db = self.connect_db()
cursor = db.cursor()
cursor.execute(
query,
(
series.name,
series.description,
),
)
inserted_id = cursor.lastrowid
db.commit()
cursor.close()
db.close()
return inserted_id
async def insert_one_event(self, event: NewEvent, series_id: int) -> int:
query = f"INSERT INTO {EVENT_TABLE} (series_id, location, time, ticket_url, map_url) VALUES (%s, %s, %s, %s, %s)"
db = self.connect_db()
cursor = db.cursor()
ticket_url = str(event.ticket_url) if event.ticket_url else None
map_url = str(event.map_url) if event.map_url else None
cursor.execute(
query, (series_id, event.location, event.time, ticket_url, map_url)
)
iserted_id = cursor.lastrowid
db.commit()
cursor.close()
db.close()
return iserted_id
async def delete_events_by_series(self, series: EventSeries) -> None:
query = f"DELETE FROM {EVENT_TABLE} WHERE series_id = %s"
db = self.connect_db()
cursor = db.cursor()
cursor.execute(query, (series.series_id,))
db.commit()
cursor.close()
async def delete_one_series(self, series: EventSeries) -> None:
query = f"DELETE FROM {self.table} WHERE series_id = %s"
db = self.connect_db()
cursor = db.cursor()
cursor.execute(query, (series.series_id,))
db.commit()
cursor.close()
async def update_series_poster(self, series: EventSeries) -> None:
query = f"UPDATE {self.table} SET poster_id = %s WHERE series_id = %s"
db = self.connect_db()
cursor = db.cursor()
cursor.execute(query, (series.poster_id, series.series_id))
db.commit()
cursor.close()
async def replace_event(self, event: Event) -> None:
query = f"""
UPDATE {EVENT_TABLE}
SET location = %s, time = %s, ticket_url = %s, map_url = %s
WHERE event_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
ticket_url = str(event.ticket_url) if event.ticket_url else None
map_url = str(event.map_url) if event.map_url else None
cursor.execute(
query, (event.location, event.time, ticket_url, map_url, event.event_id)
)
db.commit()
cursor.close()
db.close()
async def replace_series(self, series: EventSeries) -> None:
query = f"""
UPDATE {self.table}
SET name = %s, description = %s, poster_id = %s
WHERE series_id = %s
"""
db = self.connect_db()
cursor = db.cursor()
cursor.execute(
query, (series.name, series.description, series.poster_id, series.series_id)
)
db.commit()
cursor.close()
db.close()

36
server/app/db/group.py Normal file
View File

@@ -0,0 +1,36 @@
from app.db.base_queries import BaseQueries
from app.models.group import GROUP_TABLE
class GroupQueries(BaseQueries):
def __init__(self) -> None:
super().__init__()
self.table = GROUP_TABLE
async def get_one(self) -> dict:
query = f"SELECT * FROM {self.table}"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query)
data = cursor.fetchone()
cursor.close()
db.close()
if not data:
raise Exception("error retrieving group")
return data
async def get_all(self) -> None:
raise NotImplementedError(
"get_all method not implemented for GroupQueries. There's only one row in the table."
)
async def update_group_bio(self, bio: str) -> None:
db = self.connect_db()
cursor = db.cursor()
query = f"UPDATE {self.table} SET bio = %s WHERE id = 1" # only one row in the table
cursor.execute(query, (bio,))
db.commit()
cursor.close()
db.close()

View File

@@ -0,0 +1,29 @@
from icecream import ic
from app.db.base_queries import BaseQueries
from app.db.conn import connect_db
from app.models.musician import MUSICIAN_TABLE
class MusicianQueries(BaseQueries):
def __init__(self) -> None:
super().__init__()
self.table = MUSICIAN_TABLE
async def update_bio(self, id: int, bio: str) -> None:
db = connect_db()
cursor = db.cursor()
query = f"UPDATE {self.table} SET bio = %s WHERE id = %s"
cursor.execute(query, (bio, id))
db.commit()
cursor.close()
db.close()
async def update_headshot(self, id: int, headshot_id: str) -> None:
db = connect_db()
cursor = db.cursor()
query = f"UPDATE {self.table} SET headshot_id = %s WHERE id = %s"
cursor.execute(query, (headshot_id, id))
db.commit()
cursor.close()
db.close()

42
server/app/db/users.py Normal file
View File

@@ -0,0 +1,42 @@
from app.db.base_queries import BaseQueries
from app.models.user import USER_TABLE
class UserQueries(BaseQueries):
def __init__(self) -> None:
super().__init__()
self.table = USER_TABLE
async def get_one_by_email(self, email: str) -> dict | None:
query = f"SELECT * FROM {self.table} WHERE email = %s"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query, (email,))
data = cursor.fetchone()
cursor.close()
db.close()
return data
async def get_one_by_sub(self, sub: str) -> dict | None:
query = f"SELECT * FROM {self.table} WHERE sub = %s"
db = self.connect_db()
cursor = db.cursor(dictionary=True)
cursor.execute(query, (sub,))
data = cursor.fetchone()
cursor.close()
db.close()
if not data:
return None
return data
async def update_sub(self, email: str, sub: str) -> None:
query = f"UPDATE {self.table} SET sub = %s WHERE email = %s"
db = self.connect_db()
cursor = db.cursor()
cursor.execute(query, (sub, email))
db.commit()
cursor.close()
db.close()