| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import json
- import logging
- import httpx
- import bcrypt
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, HTTPException, Depends, Header
- from database import create_tables, create_user, get_user_by_username, create_session, get_user_from_token, delete_session, search_foods_by_name
- from fastapi.responses import HTMLResponse, StreamingResponse
- from fastapi.staticfiles import StaticFiles
- from pydantic import BaseModel
- from typing import List, Generator, Optional
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- create_tables()
- yield
- app = FastAPI(title="LocalFoodAI Chat", lifespan=lifespan)
- # Use direct bcrypt for better environment compatibility
- def get_password_hash(password: str):
- # Hash requires bytes
- pwd_bytes = password.encode('utf-8')
- salt = bcrypt.gensalt()
- hashed = bcrypt.hashpw(pwd_bytes, salt)
- return hashed.decode('utf-8')
- def verify_password(plain_password: str, hashed_password: str):
- # bcrypt.checkpw handles verification
- return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
- class UserCreate(BaseModel):
- username: str
- password: str
- class UserLogin(BaseModel):
- username: str
- password: str
- async def get_current_user(authorization: Optional[str] = Header(None)):
- if not authorization or not authorization.startswith("Bearer "):
- raise HTTPException(status_code=401, detail="Authentication required")
-
- token = authorization.split(" ")[1]
- user = get_user_from_token(token)
- if not user:
- raise HTTPException(status_code=401, detail="Invalid or expired session")
- return user
- OLLAMA_URL = "http://localhost:11434/api/chat"
- MODEL_NAME = "llama3.1:8b"
- # Common stopwords to strip before searching the food database
- _STOPWORDS = {
- 'how', 'many', 'much', 'calories', 'does', 'have', 'has', 'is', 'are',
- 'in', 'the', 'a', 'an', 'of', 'for', 'with', 'what', 'tell', 'me',
- 'about', 'nutritional', 'value', 'nutrition', 'macro', 'macros',
- 'protein', 'fat', 'carbs', 'fiber', 'can', 'you', 'i', 'want', 'need',
- 'eat', 'eating', 'food', 'meal', 'diet', 'healthy', 'make', 'cook',
- 'recipe', 'per', '100g', 'gram', 'grams', 'serving'
- }
- def extract_food_context(messages: list) -> str | None:
- """Scan the last user message for food keywords and enrich with local DB data."""
- # Find the last user message
- last_user_msg = None
- for msg in reversed(messages):
- role = msg.get('role', '') if isinstance(msg, dict) else msg.role
- content = msg.get('content', '') if isinstance(msg, dict) else msg.content
- if role == 'user':
- last_user_msg = content
- break
-
- if not last_user_msg:
- return None
-
- # Extract meaningful keywords by removing stopwords
- words = last_user_msg.lower().replace('?', '').replace(',', '').split()
- keywords = [w for w in words if w not in _STOPWORDS and len(w) > 2]
-
- if not keywords:
- return None
-
- # Try each keyword against the local food database, collect unique results
- found_items = {}
- for kw in keywords[:5]: # Limit to first 5 keywords for performance
- results = search_foods_by_name(kw, limit=3)
- for item in results:
- if item['name'] not in found_items:
- found_items[item['name']] = item
- if len(found_items) >= 5:
- break
-
- if not found_items:
- return None
-
- # Build a structured context block for the system prompt
- lines = [
- "[LocalFoodAI Database Context]",
- "The user's question relates to foods found in the local verified nutritional database.",
- "Use ONLY the following data for specific nutritional values (per 100g serving):",
- ""
- ]
- for item in found_items.values():
- line = (
- f"- {item['name']}: {item['calories']} kcal | "
- f"Protein: {item['protein_g']}g | Fat: {item['fat_g']}g | "
- f"Carbs: {item['carbs_g']}g | Fiber: {item['fiber_g']}g | "
- f"Sodium: {item['sodium_mg']}mg"
- )
- lines.append(line)
-
- lines.append("")
- lines.append("Always prioritize this local database data over your training memory for these specific foods.")
- return "\n".join(lines)
- # Mount static files to serve the frontend
- app.mount("/static", StaticFiles(directory="static"), name="static")
- class ChatMessage(BaseModel):
- role: str
- content: str
- class ChatRequest(BaseModel):
- messages: List[ChatMessage]
- @app.get("/", response_class=HTMLResponse)
- async def read_root():
- """Serve the chat interface HTML"""
- try:
- with open("static/index.html", "r", encoding="utf-8") as f:
- return HTMLResponse(content=f.read())
- except FileNotFoundError:
- return HTMLResponse(content="<h1>Welcome to LocalFoodAI</h1><p>static/index.html not found. Please create the frontend.</p>")
- @app.post("/api/register")
- async def register_user(user: UserCreate):
- if len(user.username.strip()) < 3:
- raise HTTPException(status_code=400, detail="Username must be at least 3 characters")
- if len(user.password.strip()) < 6:
- raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
-
- hashed_password = get_password_hash(user.password)
- user_id = create_user(user.username.strip(), hashed_password)
- if not user_id:
- raise HTTPException(status_code=400, detail="Username already exists")
-
- # Auto-login after registration
- token = create_session(user_id)
- return {"message": "User registered successfully", "token": token, "username": user.username.strip()}
- @app.post("/api/login")
- async def login_user(user: UserLogin):
- db_user = get_user_by_username(user.username.strip())
- if not db_user:
- raise HTTPException(status_code=401, detail="Invalid username or password")
-
- if not verify_password(user.password, db_user["password_hash"]):
- raise HTTPException(status_code=401, detail="Invalid username or password")
-
- token = create_session(db_user["id"])
- return {"status": "success", "username": db_user["username"], "token": token}
- @app.post("/api/logout")
- async def logout(authorization: Optional[str] = Header(None)):
- if authorization and authorization.startswith("Bearer "):
- token = authorization.split(" ")[1]
- delete_session(token)
- return {"message": "Logged out successfully"}
- @app.post("/chat")
- async def chat_endpoint(request: ChatRequest, current_user: dict = Depends(get_current_user)):
- """Proxy chat requests to the local Ollama instance with streaming support.
- Automatically enriches prompts with verified local SQLite nutritional data.
- """
- messages = [msg.model_dump() for msg in request.messages]
-
- # --- TG-35: Local SQL RAG Enrichment ---
- db_context = extract_food_context(messages)
- if db_context:
- logger.info(f"[RAG] Injecting local DB context for user '{current_user['username']}'")
- # Prepend as a system message so it acts as grounded knowledge
- messages = [{"role": "system", "content": db_context}] + messages
-
- payload = {
- "model": MODEL_NAME,
- "messages": messages,
- "stream": True # Enable streaming for a better UI experience
- }
-
- async def generate_response():
- try:
- async with httpx.AsyncClient() as client:
- async with client.stream("POST", OLLAMA_URL, json=payload, timeout=120.0) as response:
- if response.status_code != 200:
- error_detail = await response.aread()
- logger.error(f"Error communicating with Ollama: {error_detail}")
- yield f"data: {json.dumps({'error': 'Error communicating with local LLM.'})}\n\n"
- return
- async for line in response.aiter_lines():
- if line:
- data = json.loads(line)
- if "message" in data and "content" in data["message"]:
- content = data["message"]["content"]
- yield f"data: {json.dumps({'content': content})}\n\n"
- if data.get("done"):
- break
- except Exception as e:
- logger.error(f"Unexpected error during stream: {e}")
- yield f"data: {json.dumps({'error': str(e)})}\n\n"
- return StreamingResponse(generate_response(), media_type="text/event-stream")
- @app.get("/api/food/search")
- async def search_food(q: str, current_user: dict = Depends(get_current_user)):
- """API endpoint to search for food items securely using token authentication"""
- if not q or len(q.strip()) < 1:
- return {"results": []}
-
- logger.info(f"User {current_user['username']} searched for [{q}]")
- results = search_foods_by_name(q.strip(), limit=15)
- return {"results": results}
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
|