import sqlite3 import os import logging import secrets from datetime import datetime, timedelta from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # Locate db correctly in the same directory DB_PATH = os.path.join(os.path.dirname(__file__), "localfood.db") def get_db_connection(): # Enable higher timeout and disable thread checks for FastAPI async compatibility conn = sqlite3.connect(DB_PATH, timeout=30.0, check_same_thread=False) conn.row_factory = sqlite3.Row # Enable Write-Ahead Log (WAL) mode for simultaneous read/write operations conn.execute('PRAGMA journal_mode=WAL') conn.execute('PRAGMA synchronous=NORMAL') return conn def create_tables(): """Initialize the SQLite database with required tables""" conn = None try: conn = get_db_connection() cursor = conn.cursor() # Create users table securely locally cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create sessions table for database-backed tokens cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, expires_at TIMESTAMP NOT NULL, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Create localized foods table based on Sprint 5 architecture cursor.execute(''' CREATE TABLE IF NOT EXISTS foods ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT, base_weight_g REAL DEFAULT 100.0, calories REAL DEFAULT 0.0, protein_g REAL DEFAULT 0.0, fat_g REAL DEFAULT 0.0, carbs_g REAL DEFAULT 0.0, fiber_g REAL DEFAULT 0.0, sugar_g REAL DEFAULT 0.0, sodium_mg REAL DEFAULT 0.0, vitamin_a_iu REAL DEFAULT 0.0, vitamin_c_mg REAL DEFAULT 0.0, calcium_mg REAL DEFAULT 0.0, iron_mg REAL DEFAULT 0.0, potassium_mg REAL DEFAULT 0.0, cholesterol_mg REAL DEFAULT 0.0, source TEXT DEFAULT 'System' ) ''') # Create chat history table for Sprint 6 persistence cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Create minimal user_profiles table for macro targets (US-07) cursor.execute(''' CREATE TABLE IF NOT EXISTS user_profiles ( user_id INTEGER PRIMARY KEY, target_calories INTEGER DEFAULT 2000, target_protein_g INTEGER DEFAULT 150, target_carbs_g INTEGER DEFAULT 200, target_fat_g INTEGER DEFAULT 65, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Create index for rapid fuzzy search compatibility cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_name ON foods(name COLLATE NOCASE)') conn.commit() logger.info("Database and tables initialized successfully.") except Exception as e: logger.error(f"Error initializing database: {e}") raise finally: if conn: conn.close() def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: """Retrieve user dictionary if they exist""" conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", (username,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Database error fetching user: {e}") return None finally: if conn: conn.close() def create_user(username: str, password_hash: str) -> Optional[int]: """Creates a user securely. Returns user_id if successful, None if username exists.""" conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, password_hash) ) user_id = cursor.lastrowid conn.commit() return user_id except sqlite3.IntegrityError: return None except Exception as e: logger.error(f"Database error during user creation: {e}") raise finally: if conn: conn.close() def create_session(user_id: int) -> str: """Create a secure 32-character session token in the DB valid for 7 days""" token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(days=7) conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)", (token, user_id, expires_at) ) conn.commit() return token except Exception as e: logger.error(f"Error creating session: {e}") raise finally: if conn: conn.close() def get_user_from_token(token: str) -> Optional[Dict[str, Any]]: """Verify a session token and return the associated user data if valid and not expired""" conn = None try: conn = get_db_connection() cursor = conn.cursor() # Find user if token exists and hasn't expired cursor.execute(''' SELECT users.* FROM users JOIN sessions ON users.id = sessions.user_id WHERE sessions.token = ? AND sessions.expires_at > ? ''', (token, datetime.now())) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Database error verifying token: {e}") return None finally: if conn: conn.close() def delete_session(token: str): """Securely remove a session token when the user logs out""" conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM sessions WHERE token = ?", (token,)) conn.commit() except Exception as e: logger.error(f"Error deleting session: {e}") finally: if conn: conn.close() def search_foods_by_name(query: str, limit: int = 15) -> list[Dict[str, Any]]: """Securely search for foods matching a string query with relevance-based ordering""" conn = None try: conn = get_db_connection() cursor = conn.cursor() # SQL Injection safe query utilizing LIKE parameterization # We prioritize: # 1. Items NOT in 'Baby Foods' # 2. Shorter names (usually more fundamental ingredients) # 3. Alphabetical order as a tie-breaker q = f"%{query}%" prefix_match = f"{query}%" cursor.execute(''' SELECT * FROM foods WHERE name LIKE ? ORDER BY CASE WHEN category = 'Baby Foods' THEN 1 ELSE 0 END, CASE WHEN name LIKE ? THEN 0 ELSE 1 END, LENGTH(name) ASC, name ASC LIMIT ? ''', (q, prefix_match, limit)) rows = cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f"Error searching foods: {e}") return [] finally: if conn: conn.close() def save_chat_message(user_id: int, role: str, content: str): """Persist a chat message to the database""" conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO chat_messages (user_id, role, content) VALUES (?, ?, ?)", (user_id, role, content) ) conn.commit() except Exception as e: logger.error(f"Error saving chat message: {e}") finally: if conn: conn.close() def get_user_chat_history(user_id: int, limit: int = 50) -> list[Dict[str, Any]]: """Retrieve the most recent chat messages for a user""" conn = None try: conn = get_db_connection() cursor = conn.cursor() # Order by created_at DESC to get recent ones, then reverse for display cursor.execute(''' SELECT role, content FROM chat_messages WHERE user_id = ? ORDER BY created_at ASC LIMIT ? ''', (user_id, limit)) rows = cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f"Error fetching chat history: {e}") return [] finally: if conn: conn.close() def get_user_profile(user_id: int) -> Optional[Dict[str, Any]]: """Fetch the user's profile containing macro targets. Inserts defaults if none exists.""" conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)) row = cursor.fetchone() if not row: # Create a default profile row if one does not exist cursor.execute(''' INSERT INTO user_profiles (user_id) VALUES (?) ''', (user_id,)) conn.commit() cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Error fetching user profile: {e}") return None finally: if conn: conn.close()