import json import logging import httpx import bcrypt from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Depends, Header from database import create_tables, create_user, get_user_by_username, create_session, get_user_from_token, delete_session, search_foods_by_name from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Generator, Optional logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): create_tables() yield app = FastAPI(title="LocalFoodAI Chat", lifespan=lifespan) # Use direct bcrypt for better environment compatibility def get_password_hash(password: str): # Hash requires bytes pwd_bytes = password.encode('utf-8') salt = bcrypt.gensalt() hashed = bcrypt.hashpw(pwd_bytes, salt) return hashed.decode('utf-8') def verify_password(plain_password: str, hashed_password: str): # bcrypt.checkpw handles verification return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) class UserCreate(BaseModel): username: str password: str class UserLogin(BaseModel): username: str password: str async def get_current_user(authorization: Optional[str] = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Authentication required") token = authorization.split(" ")[1] user = get_user_from_token(token) if not user: raise HTTPException(status_code=401, detail="Invalid or expired session") return user OLLAMA_URL = "http://localhost:11434/api/chat" MODEL_NAME = "llama3.1:8b" # Common stopwords to strip before searching the food database _STOPWORDS = { 'how', 'many', 'much', 'calories', 'does', 'have', 'has', 'is', 'are', 'in', 'the', 'a', 'an', 'of', 'for', 'with', 'what', 'tell', 'me', 'about', 'nutritional', 'value', 'nutrition', 'macro', 'macros', 'protein', 'fat', 'carbs', 'fiber', 'can', 'you', 'i', 'want', 'need', 'eat', 'eating', 'food', 'meal', 'diet', 'healthy', 'make', 'cook', 'recipe', 'per', '100g', 'gram', 'grams', 'serving' } def extract_food_context(messages: list) -> str | None: """Scan the last user message for food keywords and enrich with local DB data.""" # Find the last user message last_user_msg = None for msg in reversed(messages): role = msg.get('role', '') if isinstance(msg, dict) else msg.role content = msg.get('content', '') if isinstance(msg, dict) else msg.content if role == 'user': last_user_msg = content break if not last_user_msg: return None # Extract meaningful keywords by removing stopwords words = last_user_msg.lower().replace('?', '').replace(',', '').split() keywords = [w for w in words if w not in _STOPWORDS and len(w) > 2] if not keywords: return None # Try each keyword against the local food database, collect unique results found_items = {} for kw in keywords[:5]: # Limit to first 5 keywords for performance results = search_foods_by_name(kw, limit=3) for item in results: if item['name'] not in found_items: found_items[item['name']] = item if len(found_items) >= 5: break if not found_items: return None # Build a structured context block for the system prompt lines = [ "[LocalFoodAI Database Context]", "The user's question relates to foods found in the local verified nutritional database.", "Use ONLY the following data for specific nutritional values (per 100g serving):", "" ] for item in found_items.values(): line = ( f"- {item['name']}: {item['calories']} kcal | " f"Protein: {item['protein_g']}g | Fat: {item['fat_g']}g | " f"Carbs: {item['carbs_g']}g | Fiber: {item['fiber_g']}g | " f"Sodium: {item['sodium_mg']}mg" ) lines.append(line) lines.append("") lines.append("Always prioritize this local database data over your training memory for these specific foods.") return "\n".join(lines) # Mount static files to serve the frontend app.mount("/static", StaticFiles(directory="static"), name="static") class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[ChatMessage] @app.get("/", response_class=HTMLResponse) async def read_root(): """Serve the chat interface HTML""" try: with open("static/index.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) except FileNotFoundError: return HTMLResponse(content="
static/index.html not found. Please create the frontend.
") @app.post("/api/register") async def register_user(user: UserCreate): if len(user.username.strip()) < 3: raise HTTPException(status_code=400, detail="Username must be at least 3 characters") if len(user.password.strip()) < 6: raise HTTPException(status_code=400, detail="Password must be at least 6 characters") hashed_password = get_password_hash(user.password) user_id = create_user(user.username.strip(), hashed_password) if not user_id: raise HTTPException(status_code=400, detail="Username already exists") # Auto-login after registration token = create_session(user_id) return {"message": "User registered successfully", "token": token, "username": user.username.strip()} @app.post("/api/login") async def login_user(user: UserLogin): db_user = get_user_by_username(user.username.strip()) if not db_user: raise HTTPException(status_code=401, detail="Invalid username or password") if not verify_password(user.password, db_user["password_hash"]): raise HTTPException(status_code=401, detail="Invalid username or password") token = create_session(db_user["id"]) return {"status": "success", "username": db_user["username"], "token": token} @app.post("/api/logout") async def logout(authorization: Optional[str] = Header(None)): if authorization and authorization.startswith("Bearer "): token = authorization.split(" ")[1] delete_session(token) return {"message": "Logged out successfully"} @app.post("/chat") async def chat_endpoint(request: ChatRequest, current_user: dict = Depends(get_current_user)): """Proxy chat requests to the local Ollama instance with streaming support. Automatically enriches prompts with verified local SQLite nutritional data. """ messages = [msg.model_dump() for msg in request.messages] # --- TG-35: Local SQL RAG Enrichment --- db_context = extract_food_context(messages) if db_context: logger.info(f"[RAG] Injecting local DB context for user '{current_user['username']}'") # Prepend as a system message so it acts as grounded knowledge messages = [{"role": "system", "content": db_context}] + messages payload = { "model": MODEL_NAME, "messages": messages, "stream": True # Enable streaming for a better UI experience } async def generate_response(): try: async with httpx.AsyncClient() as client: async with client.stream("POST", OLLAMA_URL, json=payload, timeout=120.0) as response: if response.status_code != 200: error_detail = await response.aread() logger.error(f"Error communicating with Ollama: {error_detail}") yield f"data: {json.dumps({'error': 'Error communicating with local LLM.'})}\n\n" return async for line in response.aiter_lines(): if line: data = json.loads(line) if "message" in data and "content" in data["message"]: content = data["message"]["content"] yield f"data: {json.dumps({'content': content})}\n\n" if data.get("done"): break except Exception as e: logger.error(f"Unexpected error during stream: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" return StreamingResponse(generate_response(), media_type="text/event-stream") @app.get("/api/food/search") async def search_food(q: str, current_user: dict = Depends(get_current_user)): """API endpoint to search for food items securely using token authentication""" if not q or len(q.strip()) < 1: return {"results": []} logger.info(f"User {current_user['username']} searched for [{q}]") results = search_foods_by_name(q.strip(), limit=15) return {"results": results} if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)