| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859 |
- # $Id$
- # $Author$
- # $log$
- #ident "@(#)LocalFoodAI:app.py:$Format:%D:%ci:%cN:%h$"
- import streamlit as st
- import pymysql
- import bcrypt
- import random
- import string
- import time
- import os
- import pandas as pd
- from snmp_notifier import notifier
- from unit_converter import UnitConverter
- from fpdf import FPDF
- import myloginpath
- import ollama
- import bcrypt
- import requests
- import string
- import random
- import smtplib
- from email.message import EmailMessage
- import pandas as pd
- from unit_converter import UnitConverter
- from snmp_notifier import notifier
- import time
- import threading
- def pull_model_bg():
- try: ollama.pull('llama3.2:1b')
- except: pass
- threading.Thread(target=pull_model_bg, daemon=True).start()
- def local_web_search(query: str) -> str:
- try:
- req = requests.get(f'http://127.0.0.1:8080/search', params={'q': query, 'format': 'json'})
- if req.status_code == 200:
- data = req.json()
- results = data.get('results', [])
- if not results: return f"No results found on the web for '{query}'."
- snippets = [f"Source: {r.get('url')}\nContent: {r.get('content')}" for r in results[:3]]
- return "\n\n".join(snippets)
- return "Search engine returned an error."
- except Exception as e: return f"Local search engine unreachable: {e}"
- search_tool_schema = {
- 'type': 'function',
- 'function': {
- 'name': 'local_web_search',
- 'description': 'Search the internet for info not in DB.',
- 'parameters': {'type': 'object', 'properties': {'query': {'type': 'string'}}, 'required': ['query']},
- },
- }
- def search_nutrition_db(query: str, user_eav=None) -> str:
- conn = get_db_connection('app_reader')
- if not conn: return "Database connection failed."
- try:
- with conn.cursor() as cursor:
- # Dynamically build strictly-enforced clinical SQL filters
- clinical_filters = ""
- if user_eav:
- for p in user_eav:
- name = p['name'].lower()
- val = p['value'].lower()
- if name in ['condition', 'illness']:
- if val == 'diabetes': clinical_filters += " AND m.sugars_100g < 5.0"
- elif 'kidney' in val: clinical_filters += " AND m.proteins_100g < 15.0"
- elif 'hypertension' in val: clinical_filters += " AND m.sodium_100g < 0.2"
- elif name in ['diet', 'religious', 'preference']:
- if val == 'kosher': clinical_filters += " AND c.ingredients_text NOT LIKE '%pork%' AND c.ingredients_text NOT LIKE '%shellfish%'"
- elif val == 'halal': clinical_filters += " AND c.ingredients_text NOT LIKE '%pork%' AND c.ingredients_text NOT LIKE '%wine%' AND c.ingredients_text NOT LIKE '%alcohol%'"
- elif val in ['christian', 'good friday', 'ash wednesday']: clinical_filters += " AND c.ingredients_text NOT LIKE '%meat%' AND c.ingredients_text NOT LIKE '%beef%' AND c.ingredients_text NOT LIKE '%chicken%' AND c.ingredients_text NOT LIKE '%pork%'"
- sql = f"""
- SELECT c.code, c.product_name, m.proteins_100g, m.fat_100g, m.carbohydrates_100g, m.sugars_100g
- FROM food_db.products_core c
- LEFT JOIN food_db.products_macros m ON c.code = m.code
- WHERE MATCH(c.product_name, c.ingredients_text) AGAINST(%s IN BOOLEAN MODE)
- AND c.product_name IS NOT NULL AND c.product_name != '' AND c.product_name != 'None'
- {clinical_filters}
- LIMIT 15
- """
- bool_query = " ".join([f"+{w}" for w in query.split()])
- cursor.execute(sql, (bool_query,))
- results = cursor.fetchall()
- if not results: return f"No database records found for '{query}'."
-
- snippets = []
- for r in results:
- snippets.append(f"- {r['product_name']}: Protein {r['proteins_100g']}g, Fat {r['fat_100g']}g, Carbs {r['carbohydrates_100g']}g, Sugars {r['sugars_100g']}g (per 100g)")
- return "\n".join(snippets)
- except Exception as e:
- return f"Database query failed: {e}"
- finally:
- conn.close()
- db_search_tool_schema = {
- 'type': 'function',
- 'function': {
- 'name': 'search_nutrition_db',
- 'description': 'Search the local medical nutrition database for product macros and ingredients. ALWAYS prioritize this over web search.',
- 'parameters': {'type': 'object', 'properties': {'query': {'type': 'string', 'description': 'The product or food name to search for (e.g. apple, chicken, bread)'}}, 'required': ['query']},
- },
- }
- def get_db_connection(login_path):
- try:
- import os
- db_host = os.environ.get('DB_HOST')
- # Check if environment variables exist for this login path
- db_user = os.environ.get(f'{login_path.upper()}_USER') or os.environ.get('DB_USER')
- db_pass = os.environ.get(f'{login_path.upper()}_PASS') or os.environ.get('DB_PASS')
- if db_host and db_user and db_pass:
- return pymysql.connect(
- host=db_host,
- user=db_user,
- password=db_pass,
- database='food_db',
- cursorclass=pymysql.cursors.DictCursor
- )
-
- conf = myloginpath.parse(login_path)
- if not conf or not conf.get('user'):
- st.error(f"⚠️ MySQL configuration missing for `{login_path}`. If you are testing locally on Windows, this app must be run on the Ubuntu server where `mysql_config_editor` is properly configured.")
- return None
-
- return pymysql.connect(
- host=conf.get('host', '127.0.0.1'),
- user=conf.get('user'),
- password=conf.get('password'),
- database='food_db',
- cursorclass=pymysql.cursors.DictCursor
- )
- except Exception as e:
- st.error(f"Connection Failed: {e}")
- return None
- def verify_login(username, password):
- conn = get_db_connection('app_auth')
- if not conn: return False
- with conn.cursor() as cursor:
- cursor.execute("SELECT password_hash FROM users WHERE username = %s LIMIT 1", (username,))
- result = cursor.fetchone()
- conn.close()
- if result: return bcrypt.checkpw(password.encode('utf-8'), result['password_hash'].encode('utf-8'))
- return False
- def get_user_id(username):
- conn = get_db_connection('app_auth')
- if not conn: return None
- with conn.cursor() as cursor:
- cursor.execute("SELECT id FROM users WHERE username = %s LIMIT 1", (username,))
- result = cursor.fetchone()
- conn.close()
- return result['id'] if result else None
- def get_eav_profile(username):
- uid = get_user_id(username)
- if not uid: return []
- conn = get_db_connection('app_auth')
- with conn.cursor() as cursor:
- cursor.execute("SELECT id, illness_health_condition_diet_dislikes_name as name, illness_health_condition_diet_dislikes_value as value FROM user_health_profiles WHERE user_id = %s", (uid,))
- res = cursor.fetchall()
- conn.close()
- return res
- def get_user_limit(username):
- conn = get_db_connection('app_auth')
- if not conn: return "50"
- with conn.cursor() as cursor:
- cursor.execute("SELECT search_limit FROM users WHERE username = %s LIMIT 1", (username,))
- result = cursor.fetchone()
- conn.close()
- return result['search_limit'] if (result and result['search_limit']) else "50"
- def register_user(username, password, email):
- conn = get_db_connection('app_auth')
- if not conn: return False
- hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- try:
- with conn.cursor() as cursor:
- cursor.execute("INSERT INTO users (username, password_hash, email) VALUES (%s, %s, %s)", (username, hashed, email))
- conn.commit()
- conn.close()
- send_email(email, "Welcome to Local Food AI", f"Hello {username}, your account was securely created!", to_name=username.title())
- return True
- except pymysql.err.IntegrityError:
- return False
- def send_email(to_email, subject, body, to_name="User"):
- msg = EmailMessage()
- msg.set_content(body)
- msg['Subject'] = subject
- msg['From'] = '"Clinical Food AI System" <security@localfoodai.com>'
- msg['To'] = f'"{to_name}" <{to_email}>'
-
- for attempt in range(5):
- try:
- s = smtplib.SMTP('localhost', 25)
- s.send_message(msg)
- s.quit()
- return True
- except Exception as e:
- if attempt == 4:
- return f"SMTP Delivery Failed: {str(e)}"
- time.sleep(2)
- return "Unknown Error Occurred"
- def reset_password(username, email):
- conn = get_db_connection('app_auth')
- if not conn: return False
- with conn.cursor() as cursor:
- cursor.execute("SELECT id, email FROM users WHERE username = %s", (username,))
- user = cursor.fetchone()
- if user and user['email'] == email:
- new_pass = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
- hashed = bcrypt.hashpw(new_pass.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
- cursor.execute("UPDATE users SET password_hash = %s WHERE id = %s", (hashed, user['id']))
- conn.commit()
- conn.close()
- status = send_email(email, "Password Reset", f"Your new temporary password is: {new_pass}", to_name=username.title())
- if status is True:
- return True
- return status
- return False
- # UI Theming
- def render_version():
- st.markdown("---")
- st.caption("🚀 Version: v1.3.0")
- st.caption(f"📅 Git ID: $Id$")
- st.set_page_config(page_title="Food AI Explorer", page_icon="🍔", layout="wide")
- st.markdown("""
- <style>
- @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;600&display=swap');
- html, body, [class*="css"] { font-family: 'Inter', sans-serif; background-color: #0b192c; color: #e2e8f0; }
- h1, h2, h3 { color: #38bdf8 !important; font-weight: 600; letter-spacing: 0.5px; }
- div[data-testid="stSidebar"] { background: rgba(11, 25, 44, 0.95) !important; backdrop-filter: blur(10px); border-right: 1px solid #1e293b; }
- .stButton>button { background: linear-gradient(135deg, #0ea5e9, #0284c7); color: white; border: none; border-radius: 6px; }
- .stButton>button:hover { transform: scale(1.02); }
- .stTextInput>div>div>input, .stNumberInput>div>div>input, .stSelectbox>div>div>div { background-color: #0f172a; color: #f8fafc; border: 1px solid #38bdf8; }
- </style>
- """, unsafe_allow_html=True)
- if "authenticated_user" not in st.session_state:
- st.session_state["authenticated_user"] = None
- with st.sidebar:
- st.title("User Portal 🔐")
- render_version()
-
- with st.expander("🛠️ Diagnostic: App Database View"):
- conn = get_db_connection('app_auth')
- if conn:
- with conn.cursor() as c:
- c.execute("DESCRIBE users;")
- st.json(c.fetchall())
- c.execute("SELECT DATABASE(), CURRENT_USER();")
- st.json(c.fetchall())
- conn.close()
-
- if st.session_state["authenticated_user"]:
- st.success(f"Logged in as: {st.session_state['authenticated_user']}")
- if st.button("Logout"):
- st.session_state["authenticated_user"] = None
- st.rerun()
-
- eav_data = get_eav_profile(st.session_state["authenticated_user"])
- uid = get_user_id(st.session_state["authenticated_user"])
- user_lim = get_user_limit(st.session_state["authenticated_user"])
-
- with st.expander("⚙️ Account Preferences"):
- opts = ["10", "20", "50", "100", "All"]
- idx = opts.index(user_lim) if user_lim in opts else 2
- new_lim = st.selectbox("Default Search Limit", opts, index=idx)
- if new_lim != user_lim:
- conn = get_db_connection('app_auth')
- with conn.cursor() as c:
- c.execute("UPDATE users SET search_limit = %s WHERE id = %s", (new_lim, uid))
- conn.commit()
- st.rerun()
- with st.expander("➕ Add Condition / Diet"):
- new_cat = st.selectbox("Category", ["Condition", "Illness", "Diet", "Dislike", "Allergy"])
-
- if new_cat == "Condition":
- new_val = st.selectbox("Value", ["Pregnant", "Breastfeeding", "Low Fat"])
- elif new_cat == "Illness":
- new_val = st.selectbox("Value", ["Diabetes", "Hypertension", "Kidney Disease", "Osteoporosis", "Scurvy", "Anemia"])
- elif new_cat == "Diet":
- new_val = st.selectbox("Value", ["Vegan", "Vegetarian", "Kosher", "Halal", "Christian", "Good Friday", "Ash Wednesday", "Keto", "Paleo"])
- else:
- new_val = st.text_input("Value (e.g. 'peanuts', 'broccoli')").strip()
-
- new_val_clean = new_val.lower()
-
- if st.button("Add to Profile") and new_val_clean and uid:
- conn = get_db_connection('app_auth')
- with conn.cursor() as c:
- c.execute("INSERT INTO user_health_profiles (user_id, illness_health_condition_diet_dislikes_name, illness_health_condition_diet_dislikes_value) VALUES (%s, %s, %s)", (uid, new_cat.lower(), new_val_clean))
- conn.commit()
- st.rerun()
-
- if eav_data:
- st.markdown("#### Active Flags")
- for e in eav_data:
- col1, col2 = st.columns([4, 1])
- col1.info(f"**{e['name']}:** {e['value'].title()}")
- if col2.button("X", key=f"del_eav_{e['id']}"):
- conn = get_db_connection('app_auth')
- with conn.cursor() as c:
- c.execute("DELETE FROM user_health_profiles WHERE id = %s", (e['id'],))
- conn.commit()
- st.rerun()
- else:
- tab1, tab2, tab3 = st.tabs(["Login", "Register", "Reset"])
- with tab1:
- l_user = st.text_input("Username", key="l_user").strip()
- l_pass = st.text_input("Password", type="password", key="l_pass")
- if st.button("Login"):
- if verify_login(l_user, l_pass):
- notifier.send_alert(f"User Login Success: {l_user}")
- st.session_state["authenticated_user"] = l_user
- st.rerun()
- else:
- notifier.send_alert(f"User Login Failed: {l_user}")
- st.error("Invalid login.")
- with tab2:
- r_user = st.text_input("Username", key="r_user")
- r_email = st.text_input("Email Address", key="r_email")
- r_pass = st.text_input("Password", type="password", key="r_pass")
- if st.button("Register"):
- if len(r_pass) < 6: st.error("Password too short.")
- elif register_user(r_user, r_pass, r_email): st.success("Registered safely!")
- else: st.error("Username exists.")
- with tab3:
- f_user = st.text_input("Username", key="f_user")
- f_email = st.text_input("Registered Email", key="f_email")
- if st.button("Send Reset Link"):
- status = reset_password(f_user, f_email)
- if status is True:
- st.success("Password reset emailed.")
- else:
- st.error(f"Failed: {status}")
- if not st.session_state["authenticated_user"]:
- st.title("🍔 Food AI Medical Explorer")
- st.info("Please login to interrogate the Clinical Data.")
- st.stop()
- st.title("🍔 Food AI Clinical Explorer")
- conn_reader = get_db_connection('app_reader')
- tab_chat, tab_explore, tab_plate, tab_planner = st.tabs(["💬 AI Chat", "🔬 Clinical Search", "🍽️ My Plate Builder", "🤖 AI Meal Planner"])
- import re
- with tab_chat:
- c1, c2 = st.columns([4, 1])
- c1.subheader("Chat with the Context")
- if c2.button("🧹 Clear Chat"):
- st.session_state["messages"] = [{"role": "assistant", "content": "How can I help you analyze the food data today?"}]
- st.rerun()
- st.info("""
- ℹ️ **How to use this feature (Examples)**
- **Your active conditions (e.g. Pregnant, Diabetic) are automatically sent to the AI in the background. You do not need to type them out.**
-
- *Examples:*
- 1. "I am pregnant, diabetic, and have kidney problems. Can I eat sushi?"
- 2. "What is a safe snack to stabilize my blood sugar without hurting my kidneys?"
- 3. "Can I drink milk? I need calcium for the baby."
- 4. "Is it safe to eat a large steak for iron?"
- 5. "What foods are strictly forbidden for me?"
- """)
- if "messages" not in st.session_state:
- st.session_state["messages"] = [{"role": "assistant", "content": "How can I help you analyze the food data today?"}]
- # Display chat history, filtering out TOOL_CALLS
- for msg in st.session_state.messages:
- if msg["role"] == "tool": continue
- display_text = re.sub(r'\[TOOL_CALLS\]\s*\[.*?\]', '', msg["content"]).strip()
- if display_text:
- st.chat_message(msg["role"]).write(display_text)
- if prompt := st.chat_input("Ask a clinical question about your food..."):
- st.session_state.messages.append({"role": "user", "content": prompt})
- st.chat_message("user").write(prompt)
-
- user_eav = get_eav_profile(st.session_state["authenticated_user"])
- profile_text = ", ".join([f"{p['name']}: {p['value']}" for p in user_eav]) if user_eav else "None"
-
- sys_prompt = f"""You are a helpful medical data analyst AI.
- Health profile: {profile_text}.
- Act as a specialized clinical dietitian. Provide a direct answer. Skip all thinking, reasoning, and pleasantries.
- Use this database context if relevant to the user's question: {search_nutrition_db(prompt)}
- """
-
- try:
- temp_messages = [{"role": "system", "content": sys_prompt}] + [m for m in st.session_state.messages if m["role"] != "tool"]
- response_stream = ollama.chat(model='llama3.2:1b', messages=temp_messages, stream=True)
-
- with st.chat_message("assistant"):
- ai_reply = st.write_stream(chunk['message']['content'] for chunk in response_stream)
-
- st.session_state.messages.append({"role": "assistant", "content": ai_reply})
- except Exception as e:
- ai_reply = f"Hold on! Engine execution fault: {e}"
- st.session_state.messages.append({"role": "assistant", "content": ai_reply})
- st.chat_message("assistant").write(ai_reply)
- def highlight_medical_warnings(row):
- try:
- val = str(row.get('Medical Warning', ''))
- if '⚠️' in val: return ['background-color: rgba(255, 0, 0, 0.4); color: white;'] * len(row)
- if '💚' in val: return ['background-color: rgba(0, 255, 0, 0.3); color: white;'] * len(row)
- except: pass
- return [''] * len(row)
- with tab_explore:
- st.subheader("Clinical Data Search")
- st.info("""
- ℹ️ **How to use this feature (Examples)**
- **Your active conditions are automatically flagged (⚠️ or 💚) in the search results.**
-
- *Example Searches:*
- 1. `Cereal` *(Checks for high sugar & hidden phosphorus)*
- 2. `Cheese` *(Checks for unpasteurized pregnancy risks & high sodium)*
- 3. `Fruit Juice` *(Checks for high sugar spikes)*
- 4. `Deli Meat` *(Checks for Listeria risk & extreme sodium)*
- 5. `White Rice` *(Safe for kidneys but flags high glycemic index)*
- """)
- sq = st.text_input("Search Product Name or Ingredient")
- cols = st.columns(5)
- min_pro = cols[0].number_input("Min Protein (g)", 0, 1000, 0)
- min_fat = cols[1].number_input("Min Fat (g)", 0, 1000, 0)
- min_carb = cols[2].number_input("Min Carbs (g)", 0, 1000, 0)
- max_sug = cols[3].number_input("Max Sugar (g)", 0, 1000, 1000)
-
- # Load dynamically fetched limit to prevent Pandas Styler crash
- pd.set_option("styler.render.max_elements", 5000000)
- opts = [10, 50, 100, 500, 1000]
-
- user_lim_str = get_user_limit(st.session_state["authenticated_user"])
- user_lim_val = 1000 if user_lim_str == "All" else int(user_lim_str)
- if user_lim_val not in opts: user_lim_val = 50
- idx = opts.index(user_lim_val)
- limit_rc = cols[4].selectbox("Limit Results", opts, index=idx)
-
- if st.button("Search Database") and sq and conn_reader:
- notifier.send_alert(f"Medical DB Search Executed: {sq}")
- with st.spinner("Processing massive clinical query..."):
- try:
- with conn_reader.cursor() as cursor:
- l_str = "" if limit_rc == "All" else f"LIMIT {limit_rc}"
- query = f"""
- SELECT c.code, c.product_name, c.generic_name, c.brands, c.ingredients_text,
- a.allergens,
- m.`energy-kcal_100g`, m.proteins_100g, m.fat_100g, m.carbohydrates_100g, m.sugars_100g, m.fiber_100g, m.sodium_100g, m.salt_100g, m.cholesterol_100g,
- v.`vitamin-a_100g`, v.`vitamin-b1_100g`, v.`vitamin-b2_100g`, v.`vitamin-pp_100g`, v.`vitamin-b6_100g`, v.`vitamin-b9_100g`, v.`vitamin-b12_100g`, v.`vitamin-c_100g`, v.`vitamin-d_100g`, v.`vitamin-e_100g`, v.`vitamin-k_100g`,
- min.calcium_100g, min.iron_100g, min.magnesium_100g, min.potassium_100g, min.zinc_100g
- FROM (
- SELECT code, product_name, generic_name, brands, ingredients_text
- FROM food_db.products_core
- WHERE MATCH(product_name, ingredients_text) AGAINST(%s IN BOOLEAN MODE)
- AND product_name IS NOT NULL AND product_name != '' AND product_name != 'None'
- {l_str}
- ) c
- LEFT JOIN food_db.products_allergens a ON c.code = a.code
- LEFT JOIN food_db.products_macros m ON c.code = m.code
- LEFT JOIN food_db.products_vitamins v ON c.code = v.code
- LEFT JOIN food_db.products_minerals min ON c.code = min.code
- WHERE (m.proteins_100g >= %s OR m.proteins_100g IS NULL)
- AND (m.fat_100g >= %s OR m.fat_100g IS NULL)
- AND (m.carbohydrates_100g >= %s OR m.carbohydrates_100g IS NULL)
- AND (m.sugars_100g <= %s OR m.sugars_100g IS NULL)
- """
- sq_bool = " ".join([f"+{w}" for w in sq.split()])
- start_time = time.time()
- cursor.execute(query, (sq_bool, min_pro, min_fat, min_carb, max_sug))
- results = cursor.fetchall()
- elapsed = time.time() - start_time
- st.caption(f"⏱️ DB Query Executed in {elapsed:.3f} seconds")
-
- if results:
- # Fetch EAV Medical Profile
- eav_profile = get_eav_profile(st.session_state["authenticated_user"])
- df = pd.DataFrame(results)
-
- st.markdown("### 🛠️ Dynamic Column Display")
- default_columns = [
- 'code', 'product_name', 'generic_name', 'brands', 'allergens', 'ingredients_text',
- 'proteins_100g', 'fat_100g', 'carbohydrates_100g', 'sugars_100g', 'sodium_100g', 'energy-kcal_100g',
- 'vitamin-c_100g', 'iron_100g', 'calcium_100g'
- ]
- all_fetched_cols = list(df.columns)
- valid_defaults = [c for c in default_columns if c in all_fetched_cols]
-
- if "selected_columns" not in st.session_state or st.button("Reset Default Columns"):
- st.session_state["selected_columns"] = valid_defaults
- st.rerun()
-
- chosen_cols = st.multiselect("Customize Dataset View", all_fetched_cols, default=st.session_state["selected_columns"], key="multi_cols")
- st.session_state["selected_columns"] = chosen_cols
-
- # Filter dataframe gracefully, but we retain a copy for background analytics
- df_display = df[chosen_cols].copy()
- warnings_col = []
-
- for idx, row in df.iterrows():
- warns = []
- ing_text = str(row['ingredients_text']).lower()
- all_text = str(row['allergens']).lower()
-
- for param in eav_profile:
- cat = param['name'].lower()
- val = param['value']
-
- # Disease Analytics
- if cat == 'illness':
- if val == 'diabetes' and pd.notnull(row.get('sugars_100g')) and float(row['sugars_100g']) > 10.0:
- warns.append("⚠️ High Sugar (Diabetes)")
- if (val == 'hypertension' or val == 'high bp') and pd.notnull(row.get('sodium_100g')) and float(row['sodium_100g']) > 1.5:
- warns.append("⚠️ High Salt (Hypertension)")
- if val == 'scurvy' and pd.notnull(row.get('vitamin-c_100g')) and float(row['vitamin-c_100g']) > 0.005:
- warns.append("💚 High Vitamin C (Scurvy Recommended)")
- if val == 'anemia' and pd.notnull(row.get('iron_100g')) and float(row['iron_100g']) > 0.002:
- warns.append("💚 High Iron (Anemia Recommended)")
-
- # Condition Analytics
- if cat == 'condition':
- if val == 'pregnant':
- if ('cru' in ing_text or 'raw' in ing_text or 'viande crue' in ing_text):
- warns.append("⚠️ Raw Foods (Pregnancy Toxoplasmosis)")
- if pd.notnull(row.get('iron_100g')) and float(row['iron_100g']) > 0.002:
- warns.append("💚 Med-High Iron (Pregnancy Health)")
- if val == 'low fat' and pd.notnull(row.get('fat_100g')) and float(row['fat_100g']) > 20.0:
- warns.append("⚠️ High Fat")
- if val == 'osteoporosis' and pd.notnull(row.get('calcium_100g')) and float(row['calcium_100g']) > 0.1:
- warns.append("💚 High Calcium (Bone Health)")
-
- if eav_data:
- ing_text = str(row.get('ingredients_text', '')).lower()
- all_text = str(row.get('allergens', '')).lower()
- product_name_text = str(row.get('product_name', '')).lower()
-
- for e in eav_data:
- cat = str(e['name']).lower()
- val = str(e['value']).lower()
-
- # Clinical Trace Checks...
- if cat == 'condition' and (val == 'pregnant' or val == 'pregnancy' or val == 'breastfeeding'):
- # Forbidden / High Risk (Toxoplasmosis & Listeria)
- if any(x in ing_text or x in product_name_text for x in ['cru', 'raw', 'viande crue', 'sushi', 'sashimi', 'poisson cru']):
- warns.append("⚠️ Forbidden: Raw Meat/Fish (Toxoplasmosis/Parasite Risk)")
- if any(x in ing_text or x in product_name_text for x in ['lait cru', 'unpasteurized', 'non pasteurisé']):
- warns.append("⚠️ Forbidden: Unpasteurized Dairy (Listeria Risk)")
- if any(x in ing_text or x in product_name_text for x in ['alcool', 'wine', 'alcohol', 'beer']):
- warns.append("⚠️ Forbidden: Contains Alcohol")
-
- # Recommended (Iron & Calcium)
- if float(row.get('iron_100g', 0) or 0) > 0.003:
- warns.append("💚 Recommended: High Iron (Pregnancy Health)")
- if float(row.get('calcium_100g', 0) or 0) > 0.120:
- warns.append("💚 Recommended: High Calcium (Bone Health / Breastfeeding)")
-
- if cat == 'illness' and val == 'osteoporosis':
- if float(row.get('calcium_100g', 0) or 0) < 0.120:
- warns.append("⚠️ Low Calcium (Osteoporosis Risk)")
- else:
- warns.append("💚 Recommended (High Calcium)")
-
- if cat == 'illness' and val == 'scurvy':
- if float(row.get('vitamin-c_100g', 0) or 0) < 0.010:
- warns.append("⚠️ Low Vitamin C (Scurvy Risk)")
- else:
- warns.append("💚 Recommended (High Vitamin C)")
-
- if cat == 'diet' and val in ['vegan', 'vegetarian']:
- if any(x in ing_text for x in ['meat', 'beef', 'chicken', 'fish', 'gelatin', 'whey', 'pork', 'porc', 'poulet']):
- warns.append("⚠️ Contains Animal Products")
- if cat == 'diet' and val == 'halal':
- if any(x in ing_text for x in ['pork', 'pig', 'porc', 'wine', 'alcohol', 'beer', 'vin']):
- warns.append("⚠️ Probable Haram Ingredients (e.g. Pork/Wine)")
-
- if cat in ['dislike', 'allergy']:
- if val in ing_text or val in all_text or val in product_name_text:
- warns.append(f"⚠️ Contains: {val.upper()}")
-
- warnings_col.append(" | ".join(list(set(warns))) if warns else "✅ Safe for Profile")
-
- df_display.insert(0, 'Medical Warning', warnings_col)
- styled_df = df_display.style.apply(highlight_medical_warnings, axis=1)
- st.success(f"Analysed {len(results)} records utilizing dynamic Partitions!")
- st.dataframe(styled_df, use_container_width=True)
-
- if st.button("🤖 Ask AI to Evaluate This Table"):
- with st.spinner("AI is dynamically evaluating these records against your profile..."):
- user_eav = get_eav_profile(st.session_state["authenticated_user"])
- profile_text = ", ".join([f"{p['name']}: {p['value']}" for p in user_eav]) if user_eav else "None"
- eval_prompt = f"The user has this profile: {profile_text}. Evaluate these foods and state which are highly recommended or strictly forbidden: {df_display.to_dict('records')}. Provide a direct answer. Skip all thinking, reasoning, and pleasantries."
- try:
- response_stream = ollama.chat(model='llama3.2:1b', messages=[{'role': 'user', 'content': eval_prompt}], stream=True)
- st.write_stream(chunk['message']['content'] for chunk in response_stream)
- except Exception as e:
- error_msg = str(e).lower()
- if "404" in error_msg or "not found" in error_msg:
- st.warning("⚠️ The AI engine is currently downloading its core models in the background. Please wait a minute and try again!")
- else:
- st.error(f"AI Evaluation Failed: {e}")
- else:
- st.warning("No products found matching those strict terms.")
- except Exception as e: st.error(f"SQL/Pandas Error: {e}")
- with tab_plate:
- st.subheader("🍽️ My Plate Builder")
- st.info("""
- ℹ️ **How to use this feature (Examples & Logic)**
- **Plate Builder Logic:**
- 1. Create a New Plate.
- 2. Search for exact food words (e.g. 'chicken', 'egg').
- 3. Add the food with a specific portion (e.g. '150g').
- 4. The system calculates the combined macros.
- 5. Use the 🗑️ buttons to delete incorrect items or entire plates.
-
- *Example Plates:*
- 1. `150g White Rice` + `50g Chicken Breast` + `100g Green Beans`
- 2. `200g Potatoes` + `100g Tomatoes` + `100g Beef`
- 3. `100g Spinach Salad` + `50g Feta Cheese`
- 4. `200g Lentils` + `100g Quinoa`
- 5. `100g Apple` + `30g Almonds`
- """)
- uid = get_user_id(st.session_state["authenticated_user"])
- conn = get_db_connection('app_auth')
- if conn and uid:
- with conn.cursor() as cursor:
- cursor.execute("SELECT id, plate_name FROM plates WHERE user_id = %s", (uid,))
- plates = cursor.fetchall()
-
- with st.expander("➕ Create a New Plate"):
- new_plate_name = st.text_input("Plate Name")
- if st.button("Create Plate"):
- cursor.execute("INSERT INTO plates (user_id, plate_name) VALUES (%s, %s)", (uid, new_plate_name))
- conn.commit()
- st.session_state["active_plate"] = new_plate_name
- st.rerun()
- if plates:
- colA, colB = st.columns([4, 1])
- plate_names = [p['plate_name'] for p in plates]
- default_idx = plate_names.index(st.session_state["active_plate"]) if "active_plate" in st.session_state and st.session_state["active_plate"] in plate_names else 0
- selected_plate = colA.selectbox("Select Active Plate", plate_names, index=default_idx)
- st.session_state["active_plate"] = selected_plate
- active_p_id = next(p['id'] for p in plates if p['plate_name'] == selected_plate)
-
- if colB.button("🗑️ Delete Plate"):
- cursor.execute("DELETE FROM plates WHERE id = %s", (active_p_id,))
- conn.commit()
- if "active_plate" in st.session_state: del st.session_state["active_plate"]
- st.rerun()
-
- cursor.execute("""
- SELECT i.id, i.product_code, MAX(i.quantity_grams) as quantity_grams, MAX(p.product_name) as product_name, MAX(m.proteins_100g) as proteins_100g, MAX(m.fat_100g) as fat_100g, MAX(m.carbohydrates_100g) as carbohydrates_100g
- FROM plate_items i LEFT JOIN products_core p ON i.product_code = p.code LEFT JOIN products_macros m ON i.product_code = m.code WHERE i.plate_id = %s
- GROUP BY i.id, i.product_code
- """, (active_p_id,))
- items = cursor.fetchall()
- if items:
- for i in items:
- c1, c2 = st.columns([5, 1])
- c1.markdown(f"<li><b>{i['quantity_grams']}g</b> of {i['product_name']} (Pro: {i['proteins_100g'] or 0}g)</li>", unsafe_allow_html=True)
- if c2.button("🗑️", key=f"del_item_{i['id']}"):
- cursor.execute("DELETE FROM plate_items WHERE id = %s", (i['id'],))
- conn.commit()
- st.rerun()
-
- total_pro = sum((float(i['proteins_100g'] or 0) * (float(i['quantity_grams'])/100.0)) for i in items)
- total_fat = sum((float(i['fat_100g'] or 0) * (float(i['quantity_grams'])/100.0)) for i in items)
- total_carb = sum((float(i['carbohydrates_100g'] or 0) * (float(i['quantity_grams'])/100.0)) for i in items)
- st.info(f"**Total Protein:** {total_pro:.1f}g | **Total Fat:** {total_fat:.1f}g | **Total Carbs:** {total_carb:.1f}g")
-
- st.markdown("---")
- st.markdown("#### ➕ Add Food to Plate")
- add_search = st.text_input("Search Exact Product Name (e.g. 'chicken', 'egg')")
-
- col_scope, col_comp = st.columns(2)
- search_scope = col_scope.radio("Search Scope", ["Auto (Cascaded)", "Product Name Only", "Both (Product & Ingredients)", "Ingredients Only"], horizontal=True)
- comp_reqs = col_comp.multiselect("Require Nutrients (Sorts by highest)", ["Iron", "Vitamin C", "Calcium", "Proteins", "Fiber"])
-
- if add_search:
- bool_search = " ".join([f"+{w}" for w in add_search.split()])
- start_time = time.time()
-
- def execute_search(match_col_override=None):
- m_col = "product_name"
- if match_col_override: m_col = match_col_override
- elif "Both" in search_scope: m_col = "product_name, ingredients_text"
- elif "Ingredients" in search_scope: m_col = "ingredients_text"
-
- join_min = "LEFT JOIN food_db.products_minerals min ON c.code = min.code" if any(n in comp_reqs for n in ["Iron", "Calcium"]) else ""
- join_vit = "LEFT JOIN food_db.products_vitamins v ON c.code = v.code" if "Vitamin C" in comp_reqs else ""
-
- r_clauses, o_clauses = [], []
- if "Iron" in comp_reqs: r_clauses.append("min.iron_100g > 0"); o_clauses.append("min.iron_100g DESC")
- if "Vitamin C" in comp_reqs: r_clauses.append("v.`vitamin-c_100g` > 0"); o_clauses.append("v.`vitamin-c_100g` DESC")
- if "Calcium" in comp_reqs: r_clauses.append("min.calcium_100g > 0"); o_clauses.append("min.calcium_100g DESC")
- if "Proteins" in comp_reqs: r_clauses.append("m.proteins_100g > 0"); o_clauses.append("m.proteins_100g DESC")
- if "Fiber" in comp_reqs: r_clauses.append("m.fiber_100g > 0"); o_clauses.append("m.fiber_100g DESC")
-
- wh_comp = " AND " + " AND ".join(r_clauses) if r_clauses else ""
- order_by = "ORDER BY " + ", ".join(o_clauses) if o_clauses else ""
-
- sql = f"""
- SELECT c.code, c.product_name
- FROM (
- SELECT code, product_name
- FROM food_db.products_core
- WHERE MATCH({m_col}) AGAINST(%s IN BOOLEAN MODE)
- AND product_name IS NOT NULL AND product_name != '' AND product_name != 'None'
- LIMIT 100
- ) c
- JOIN food_db.products_macros m ON c.code = m.code
- {join_min}
- {join_vit}
- WHERE m.proteins_100g IS NOT NULL AND m.fat_100g IS NOT NULL AND m.carbohydrates_100g IS NOT NULL
- {wh_comp}
- {order_by}
- LIMIT 15
- """
- cursor.execute(sql, (bool_search,))
- return cursor.fetchall()
- search_res = execute_search()
-
- if not search_res and search_scope == "Auto (Cascaded)":
- st.warning("No product found in names, so I am looking into the ingredients...")
- search_res = execute_search("ingredients_text")
-
- elapsed = time.time() - start_time
- st.caption(f"⏱️ Plate Search Executed in {elapsed:.3f} seconds")
- if search_res:
- options = {f"{r['product_name']} ({r['code']})": r for r in search_res}
- selected_str = st.selectbox("Select Product", list(options.keys()))
- selected_product = options[selected_str]
-
- add_amount_str = st.text_input("Portion Quantity (e.g., '100g', '2 tbsp', '1.5 cups', '1 pinch')", value="100g")
-
- if st.button("Add Item to Plate"):
- # Use UnitConverter to parse
- grams = UnitConverter.parse_and_convert(add_amount_str, product_name=selected_product['product_name'])
- if grams is not None:
- cursor.execute("INSERT INTO plate_items (plate_id, product_code, quantity_grams) VALUES (%s, %s, %s)",
- (active_p_id, selected_product['code'], grams))
- conn.commit()
- st.success(f"Added {grams}g of {selected_product['product_name']}!")
- st.rerun()
- else:
- st.error("Could not parse unit. Please use format like '100g' or '1 cup'.")
- else:
- st.warning("No products found.")
- with tab_planner:
- st.subheader("🤖 AI Meal Planner")
- st.info("""
- ℹ️ **How to use this feature (Examples)**
- **Your active conditions are automatically applied to the generated menu.**
-
- *Example Prompts:*
- 1. "Generate a full day meal plan for me. I am pregnant, diabetic, and have kidney disease."
- 2. "Plan a pregnancy-safe dinner that won't spike my blood sugar."
- 3. "I need a high-iron lunch that is safe for my kidneys."
- 4. "Plan a breakfast without dairy that is kidney-friendly."
- 5. "Give me a 3-day meal prep plan ensuring no raw fish, controlled protein portions, and steady complex carbs."
- """)
- p_col1, p_col2, p_col3 = st.columns(3)
- target_cal = p_col1.number_input("Target Daily Calories (kcal)", 1000, 5000, 2000, 50)
- diet_pref = p_col2.selectbox("Dietary Preference", ["Omnivore", "Vegetarian", "Vegan", "Keto", "Paleo"])
- meal_count = p_col3.slider("Number of Meals", 2, 6, 3)
- extra_notes = st.text_input("Any additional allergies or goals?")
-
- if st.button("Generate Professional Menu"):
- with st.spinner("Executing Lightning-Fast Context RAG..."):
- user_eav = get_eav_profile(st.session_state["authenticated_user"])
- profile_text = ", ".join([f"{p['name']}: {p['value']}" for p in user_eav]) if user_eav else "None"
-
- # Pre-fetch database context directly without using AI tools!
- # Enforce the strict clinical constraints directly via SQL
- db_context = search_nutrition_db(diet_pref, user_eav)
-
- sys_prompt = f"""You are a professional clinical Dietitian planner. Target: {target_cal}kcal over {meal_count} meals.
- Dietary constraint: {diet_pref}. Additional notes: {extra_notes}.
- Health profile: {profile_text}.
-
- CRITICAL INSTRUCTIONS:
- - You MUST formulate the menu using ONLY the following real database items retrieved for you: {db_context}
- - Output the menu beautifully formatted as a Markdown Table.
- - Columns MUST be: | Meal Time | Exact Food | Portion Size | Calories | Protein |
- - Do NOT output JSON. Do NOT use tool calls.
- - Provide a direct answer. Skip all thinking, reasoning, and pleasantries.
- """
-
- temp_messages = [{'role': 'system', 'content': sys_prompt}, {'role': 'user', 'content': 'Generate my meal plan as a markdown table.'}]
-
- # Stream the response instantly!
- try:
- response_stream = ollama.chat(model='llama3.2:1b', messages=temp_messages, stream=True)
- ai_reply = st.write_stream(chunk['message']['content'] for chunk in response_stream)
-
- # PDF Generation
- def generate_pdf(text):
- pdf = FPDF()
- pdf.add_page()
- pdf.set_font("Helvetica", 'B', 16)
- pdf.cell(0, 10, "Strict Clinical Meal Plan", ln=True, align='C')
- pdf.set_font("Helvetica", size=10)
- pdf.ln(5)
-
- for line in text.split('\n'):
- line = line.strip()
- if not line:
- pdf.ln(5)
- continue
-
- if line.startswith('|') and line.endswith('|'):
- if '---' in line: continue
- cols = [col.strip() for col in line.strip('|').split('|')]
- col_width = 190 / max(1, len(cols))
- for col in cols:
- # encode to ignore unicode errors in basic PDF fonts
- clean_col = str(col).encode('latin-1', 'replace').decode('latin-1')
- pdf.cell(col_width, 10, clean_col, border=1)
- pdf.ln()
- else:
- clean_line = str(line).encode('latin-1', 'replace').decode('latin-1')
- pdf.multi_cell(0, 8, clean_line)
-
- return pdf.output()
-
- st.download_button(
- label="📄 Download PDF Export",
- data=generate_pdf(ai_reply),
- file_name="Clinical_Meal_Plan.pdf",
- mime="application/pdf",
- type="primary"
- )
-
- except Exception as e:
- error_msg = str(e).lower()
- if "404" in error_msg or "not found" in error_msg:
- st.warning("⚠️ The AI engine is currently downloading its core models in the background. Please wait a minute and try again!")
- else:
- st.error(f"AI Generation Failed: {e}")
- if conn_reader: conn_reader.close()
|