| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import sqlite3
- import os
- import logging
- import secrets
- from datetime import datetime, timedelta
- from typing import Optional, Dict, Any
- logger = logging.getLogger(__name__)
- # Locate db correctly in the same directory
- DB_PATH = os.path.join(os.path.dirname(__file__), "localfood.db")
- def get_db_connection():
- conn = sqlite3.connect(DB_PATH)
- conn.row_factory = sqlite3.Row
- return conn
- def create_tables():
- """Initialize the SQLite database with required tables"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
-
- # Create users table securely locally
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS users (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- username TEXT UNIQUE NOT NULL,
- password_hash TEXT NOT NULL,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- ''')
- # Create sessions table for database-backed tokens
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS sessions (
- token TEXT PRIMARY KEY,
- user_id INTEGER NOT NULL,
- expires_at TIMESTAMP NOT NULL,
- FOREIGN KEY (user_id) REFERENCES users (id)
- )
- ''')
-
- conn.commit()
- conn.close()
- logger.info("Database and tables initialized successfully.")
- except Exception as e:
- logger.error(f"Error initializing database: {e}")
- raise
- def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
- """Retrieve user dictionary if they exist"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
- row = cursor.fetchone()
- conn.close()
- return dict(row) if row else None
- except Exception as e:
- logger.error(f"Database error fetching user: {e}")
- return None
- def create_user(username: str, password_hash: str) -> Optional[int]:
- """Creates a user securely. Returns user_id if successful, None if username exists."""
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute(
- "INSERT INTO users (username, password_hash) VALUES (?, ?)",
- (username, password_hash)
- )
- user_id = cursor.lastrowid
- conn.commit()
- conn.close()
- return user_id
- except sqlite3.IntegrityError:
- return None
- except Exception as e:
- logger.error(f"Database error during user creation: {e}")
- raise
- def create_session(user_id: int) -> str:
- """Create a secure 32-character session token in the DB valid for 24h"""
- token = secrets.token_urlsafe(32)
- expires_at = datetime.now() + timedelta(hours=24)
-
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute(
- "INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
- (token, user_id, expires_at)
- )
- conn.commit()
- conn.close()
- return token
- except Exception as e:
- logger.error(f"Error creating session: {e}")
- raise
- def get_user_from_token(token: str) -> Optional[Dict[str, Any]]:
- """Verify a session token and return the associated user data if valid and not expired"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- # Find user if token exists and hasn't expired
- cursor.execute('''
- SELECT users.* FROM users
- JOIN sessions ON users.id = sessions.user_id
- WHERE sessions.token = ? AND sessions.expires_at > ?
- ''', (token, datetime.now()))
- row = cursor.fetchone()
- conn.close()
- return dict(row) if row else None
- except Exception as e:
- logger.error(f"Database error verifying token: {e}")
- return None
- def delete_session(token: str):
- """Securely remove a session token when the user logs out"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM sessions WHERE token = ?", (token,))
- conn.commit()
- conn.close()
- except Exception as e:
- logger.error(f"Error deleting session: {e}")
|