| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import sqlite3
- import os
- import csv
- import sys
- # Define path to the unzipped SR Legacy data
- SR_PATH = "sr28data"
- DB_PATH = "localfood.db"
- # Nutrient IDs to extract (USDA SR28 IDs)
- NUTRIENT_MAP = {
- '208': 'calories',
- '203': 'protein_g',
- '204': 'fat_g',
- '205': 'carbs_g',
- '291': 'fiber_g',
- '269': 'sugar_g',
- '307': 'sodium_mg'
- }
- def parse_usda_line(line):
- """USDA SR Legacy files are ^-delimited with ~ around strings"""
- return [item.strip('~') for item in line.strip().split('^')]
- def run_seeding():
- print("Starting Mega-Seeding from USDA SR Legacy...")
-
- if not os.path.exists(SR_PATH):
- print(f"Error: {SR_PATH} directory not found.")
- return
- # 0. Load Food Groups (ID -> Group Name)
- food_groups = {}
- print("Reading food groups...")
- with open(os.path.join(SR_PATH, "FD_GROUP.txt"), "r", encoding="iso-8859-1") as f:
- for line in f:
- parts = parse_usda_line(line)
- group_id = parts[0]
- group_name = parts[1]
- food_groups[group_id] = group_name
- # 1. Load Food Descriptions (NDB_No -> Name, Group_ID)
- food_info = {}
- print("Reading food descriptions...")
- with open(os.path.join(SR_PATH, "FOOD_DES.txt"), "r", encoding="iso-8859-1") as f:
- for line in f:
- parts = parse_usda_line(line)
- ndb_no = parts[0]
- group_id = parts[1]
- long_desc = parts[2]
- food_info[ndb_no] = {
- 'name': long_desc,
- 'category': food_groups.get(group_id, "Unknown")
- }
- # 2. Load Nutrient Data
- # Structure: ndb_no -> {nutrient_id: value}
- nutrient_data = {}
- print("Reading nutrient data (this may take a moment)...")
- with open(os.path.join(SR_PATH, "NUT_DATA.txt"), "r", encoding="iso-8859-1") as f:
- for line in f:
- parts = parse_usda_line(line)
- ndb_no = parts[0]
- nutr_no = parts[1]
- val = float(parts[2]) if parts[2] else 0.0
-
- if nutr_no in NUTRIENT_MAP:
- if ndb_no not in nutrient_data:
- nutrient_data[ndb_no] = {}
- nutrient_data[ndb_no][NUTRIENT_MAP[nutr_no]] = val
- # 3. Insert into localfood.db
- print(f"Ingesting into {DB_PATH}...")
- conn = sqlite3.connect(DB_PATH)
- cursor = conn.cursor()
-
- # First, clear existing foods to avoid duplicates if re-running
- cursor.execute("DELETE FROM foods")
-
- count = 0
- for ndb_no, info in food_info.items():
- macros = nutrient_data.get(ndb_no, {})
-
- # Only add if we have at least some nutritional info
- if not macros: continue
-
- cursor.execute('''
- INSERT INTO foods (
- name, category, calories, protein_g, fat_g, carbs_g, fiber_g, sugar_g, sodium_mg, source
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- ''', (
- info['name'],
- info['category'],
- macros.get('calories', 0.0),
- macros.get('protein_g', 0.0),
- macros.get('fat_g', 0.0),
- macros.get('carbs_g', 0.0),
- macros.get('fiber_g', 0.0),
- macros.get('sugar_g', 0.0),
- macros.get('sodium_mg', 0.0),
- f"USDA-{ndb_no}"
- ))
- count += 1
- if count % 1000 == 0:
- print(f"Inserted {count} items...")
- conn.commit()
- conn.close()
- print(f"SUCCESS: Successfully seeded {count} high-quality items into the local database!")
- if __name__ == "__main__":
- run_seeding()
|