import sqlite3 import os import logging import secrets from datetime import datetime, timedelta from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # Locate db correctly in the same directory DB_PATH = os.path.join(os.path.dirname(__file__), "localfood.db") def get_db_connection(): # Enable higher timeout and disable thread checks for FastAPI async compatibility conn = sqlite3.connect(DB_PATH, timeout=20.0, check_same_thread=False) conn.row_factory = sqlite3.Row # Enable Write-Ahead Log (WAL) mode for simultaneous read/write operations conn.execute('pragma journal_mode=wal') return conn def create_tables(): """Initialize the SQLite database with required tables""" try: conn = get_db_connection() cursor = conn.cursor() # Create users table securely locally cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create sessions table for database-backed tokens cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, expires_at TIMESTAMP NOT NULL, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Create localized foods table based on Sprint 5 architecture cursor.execute(''' CREATE TABLE IF NOT EXISTS foods ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT, base_weight_g REAL DEFAULT 100.0, calories REAL DEFAULT 0.0, protein_g REAL DEFAULT 0.0, fat_g REAL DEFAULT 0.0, carbs_g REAL DEFAULT 0.0, fiber_g REAL DEFAULT 0.0, sugar_g REAL DEFAULT 0.0, sodium_mg REAL DEFAULT 0.0, vitamin_a_iu REAL DEFAULT 0.0, vitamin_c_mg REAL DEFAULT 0.0, calcium_mg REAL DEFAULT 0.0, iron_mg REAL DEFAULT 0.0, potassium_mg REAL DEFAULT 0.0, cholesterol_mg REAL DEFAULT 0.0, source TEXT DEFAULT 'System' ) ''') # Create index for rapid fuzzy search compatibility cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_name ON foods(name COLLATE NOCASE)') conn.commit() conn.close() logger.info("Database and tables initialized successfully.") except Exception as e: logger.error(f"Error initializing database: {e}") raise def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: """Retrieve user dictionary if they exist""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", (username,)) row = cursor.fetchone() conn.close() return dict(row) if row else None except Exception as e: logger.error(f"Database error fetching user: {e}") return None def create_user(username: str, password_hash: str) -> Optional[int]: """Creates a user securely. Returns user_id if successful, None if username exists.""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, password_hash) ) user_id = cursor.lastrowid conn.commit() conn.close() return user_id except sqlite3.IntegrityError: return None except Exception as e: logger.error(f"Database error during user creation: {e}") raise def create_session(user_id: int) -> str: """Create a secure 32-character session token in the DB valid for 24h""" token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(hours=24) try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)", (token, user_id, expires_at) ) conn.commit() conn.close() return token except Exception as e: logger.error(f"Error creating session: {e}") raise def get_user_from_token(token: str) -> Optional[Dict[str, Any]]: """Verify a session token and return the associated user data if valid and not expired""" try: conn = get_db_connection() cursor = conn.cursor() # Find user if token exists and hasn't expired cursor.execute(''' SELECT users.* FROM users JOIN sessions ON users.id = sessions.user_id WHERE sessions.token = ? AND sessions.expires_at > ? ''', (token, datetime.now())) row = cursor.fetchone() conn.close() return dict(row) if row else None except Exception as e: logger.error(f"Database error verifying token: {e}") return None def delete_session(token: str): """Securely remove a session token when the user logs out""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM sessions WHERE token = ?", (token,)) conn.commit() conn.close() except Exception as e: logger.error(f"Error deleting session: {e}") def search_foods_by_name(query: str, limit: int = 15) -> list[Dict[str, Any]]: """Securely search for foods matching a string query using fuzzy matching""" try: conn = get_db_connection() cursor = conn.cursor() # SQL Injection safe query utilizing LIKE parameterization # COLLATE NOCASE search inherently supported by index on table creation q = f"%{query}%" cursor.execute("SELECT * FROM foods WHERE name LIKE ? LIMIT ?", (q, limit)) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] except Exception as e: logger.error(f"Error searching foods: {e}") return []