Source code for Application.database

import csv
import logging
import queue
import re
import sqlite3
from datetime import datetime
from pathlib import Path

sqlite3.register_adapter(datetime, lambda x: int(x.timestamp()))
sqlite3.register_converter("timestamp", lambda x: datetime.fromtimestamp(int(x)))

logger = logging.getLogger("Core.Database")


[docs] class GarbageMessListError(Exception): pass
[docs] def connect() -> sqlite3.Connection: connection = sqlite3.connect( "./Data/data.db", detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False ) connection.execute("pragma foreign_keys = ON") return connection
[docs] class ConnectionPool: """ Allows database connections to be reused Attributes: _q: A queue.SimpleQueue that stores the connections """ # it will be easier to make this asynchronous in the future, I guess _q = queue.SimpleQueue()
[docs] def __enter__(self) -> sqlite3.Connection: """ Gets a connections from ConnectionPool._q, creates a new connection if the queue is empty. Returns: sqlite3.Connection """ try: self.connection = self._q.get_nowait() except queue.Empty: self.connection = connect() return self.connection
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """ Commits the changes if there was no error, rolls back if there was an error. Puts the connection back into the queue. Args: exc_type (): exc_val (): exc_tb (): """ if exc_type: self.connection.rollback() else: self.connection.commit() self._q.put(self.connection)
[docs] @classmethod def close(cls): """Closes all open database connections""" while not cls._q.empty(): cls._q.get_nowait().close()
[docs] def scan_mess_list(path: Path) -> list[dict[str, str]]: """ Verifies that the csv file is a messand returns which datatype of each column """ # TODO: There needs to be instruction in the docs to turn all NULL hoscodes to ps and all NULL roomnos to 0 (a roomno of 1 does not make sense) with open(path) as file: reader = csv.reader(file) rows = [row for row in reader] pattern = re.compile( r"(?x)(?P<id>\d{4}[a-zA-Z\d]{4}\d{4})|(?P<room>[0-9]{2,4}|0)|(?P<gender>[mMfF]{1})|(?P<name_or_bhawan>^[a-zA-Z_.\-\s]*$)" ) rows = rows[1:] # we are assuming the first row is all headers vals = [] hoscodes = { "ak", "bd", "bg", "cvr", "ds", "ps", "gn", "kr", "ml", "msa", "mr", "rm", "rp", "sk", "sr", "vk", "vy", "rha", "rhb", "rhc", "rhc", "rhd", "rhe", "rhf", } for i, row in enumerate(rows): val = {} for item in row: m = pattern.fullmatch(item) if not m: continue else: match m.lastgroup: case "id": val["idno"] = item case "room": val["roomno"] = item case "gender": val["gender"] = item case "name_or_bhawan": if item.lower() in hoscodes: val["hoscode"] = item else: val["name"] = item val["nick"] = None if len(val.keys()) != 6: logger.error(f"Something is wrong in row {i+2}\n\nScan results: {val}") raise GarbageMessListError else: vals.append(val) logger.debug(val) logger.info(f"Scanned {len(vals)} rows and found no discrepencies") return vals
[docs] def read_mess_list(path: Path): """ Populates the database with values from a csv file. Args: path: Path to the csv file """ with ConnectionPool() as db: vals = scan_mess_list(path) db.executemany( "INSERT INTO students (idno, name, gender, hoscode, roomno, nick) VALUES(:idno, :name, :gender, :hoscode, :roomno, :nick) ON CONFLICT(idno) DO UPDATE SET hoscode=:hoscode, roomno=:roomno", vals, ) db.commit()
[docs] def get_file_name(id): """ Returns the file name skeleton which can be formated with the roll number. This if the name of the output of the billing process. Args: id (): The complete ID of the person who's image is being saved. Returns: str of the form "hoscode_roomno_{}_YEARLAST4DIGITS" """ with ConnectionPool() as db: cursor = db.execute( "SELECT hoscode, roomno FROM students WHERE idno = ?", (id,) ) hoscode, roomno = cursor.fetchone() return ( f"{hoscode}_{roomno}_{'{}'}{'{:02}'}_{'{}'}_{id[2:4]}{id[-4:]}", hoscode, roomno, )
[docs] def get_all_info(id): """ Returns the file name skeleton which can be formated with the roll number. This if the name of the output of the billing process. Args: id (): The complete ID of the person who's image is being saved. Returns: str of the form "hoscode_roomno_{}_YEARLAST4DIGITS" """ with ConnectionPool() as db: cursor = db.execute( "SELECT idno, name, hoscode, roomno FROM students WHERE idno = ?", (id,) ) return cursor.fetchone()
[docs] def get_hoscode_roomno_from_short_id(id): """ Basically only used to rename files in the event of a fuckup """ with ConnectionPool() as db: cursor = db.execute( "SELECT hoscode, roomno, idno FROM students WHERE idno LIKE ?", (f"%{id[:2]}%{id[2:]}",), ) # change this after a thousand years I guess bruh = cursor.fetchall() return bruh
[docs] def set_nick(nick, id): with ConnectionPool() as db: db.execute("UPDATE students SET nick = ? WHERE idno = ?", (nick, id)) return True
[docs] def get_nick(id): with ConnectionPool() as db: cursor = db.execute("SELECT nick FROM students WHERE idno = ?", (id,)) return cursor.fetchone()
[docs] def get_name(id): with ConnectionPool() as db: cursor = db.execute("SELECT name FROM students WHERE idno = ?", (id,)) return cursor.fetchone()
[docs] def get_all_nicks(): with ConnectionPool() as db: cursor = db.execute( "SELECT name, idno, nick FROM students WHERE nick IS NOT NULL" ) return cursor.fetchall()
[docs] def resolve_id(id_last_four, bhawan, roomno): with ConnectionPool() as db: cursor = db.execute( "SELECT idno FROM students WHERE idno LIKE ? AND hoscode = ? AND roomno = ?", ("%" + id_last_four, bhawan, roomno), ) return cursor.fetchone()