import sqlite3 import os import logging import secrets from datetime import datetime, timedelta from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # Locate db correctly in the same directory DB_PATH = os.path.join(os.path.dirname(__file__), "localfood.db") def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def create_tables(): """Initialize the SQLite database with required tables""" try: conn = get_db_connection() cursor = conn.cursor() # Create users table securely locally cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create sessions table for database-backed tokens cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, expires_at TIMESTAMP NOT NULL, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') conn.commit() conn.close() logger.info("Database and tables initialized successfully.") except Exception as e: logger.error(f"Error initializing database: {e}") raise def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: """Retrieve user dictionary if they exist""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", (username,)) row = cursor.fetchone() conn.close() return dict(row) if row else None except Exception as e: logger.error(f"Database error fetching user: {e}") return None def create_user(username: str, password_hash: str) -> Optional[int]: """Creates a user securely. Returns user_id if successful, None if username exists.""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, password_hash) ) user_id = cursor.lastrowid conn.commit() conn.close() return user_id except sqlite3.IntegrityError: return None except Exception as e: logger.error(f"Database error during user creation: {e}") raise def create_session(user_id: int) -> str: """Create a secure 32-character session token in the DB valid for 24h""" token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(hours=24) try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)", (token, user_id, expires_at) ) conn.commit() conn.close() return token except Exception as e: logger.error(f"Error creating session: {e}") raise def get_user_from_token(token: str) -> Optional[Dict[str, Any]]: """Verify a session token and return the associated user data if valid and not expired""" try: conn = get_db_connection() cursor = conn.cursor() # Find user if token exists and hasn't expired cursor.execute(''' SELECT users.* FROM users JOIN sessions ON users.id = sessions.user_id WHERE sessions.token = ? AND sessions.expires_at > ? ''', (token, datetime.now())) row = cursor.fetchone() conn.close() return dict(row) if row else None except Exception as e: logger.error(f"Database error verifying token: {e}") return None def delete_session(token: str): """Securely remove a session token when the user logs out""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM sessions WHERE token = ?", (token,)) conn.commit() conn.close() except Exception as e: logger.error(f"Error deleting session: {e}")