mega_seed_usda.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import sqlite3
  2. import os
  3. import csv
  4. import sys
  5. # Define path to the unzipped SR Legacy data
  6. SR_PATH = "sr28data"
  7. DB_PATH = "localfood.db"
  8. # Nutrient IDs to extract (USDA SR28 IDs)
  9. NUTRIENT_MAP = {
  10. '208': 'calories',
  11. '203': 'protein_g',
  12. '204': 'fat_g',
  13. '205': 'carbs_g',
  14. '291': 'fiber_g',
  15. '269': 'sugar_g',
  16. '307': 'sodium_mg'
  17. }
  18. def parse_usda_line(line):
  19. """USDA SR Legacy files are ^-delimited with ~ around strings"""
  20. return [item.strip('~') for item in line.strip().split('^')]
  21. def run_seeding():
  22. print("Starting Mega-Seeding from USDA SR Legacy...")
  23. if not os.path.exists(SR_PATH):
  24. print(f"Error: {SR_PATH} directory not found.")
  25. return
  26. # 0. Load Food Groups (ID -> Group Name)
  27. food_groups = {}
  28. print("Reading food groups...")
  29. with open(os.path.join(SR_PATH, "FD_GROUP.txt"), "r", encoding="iso-8859-1") as f:
  30. for line in f:
  31. parts = parse_usda_line(line)
  32. group_id = parts[0]
  33. group_name = parts[1]
  34. food_groups[group_id] = group_name
  35. # 1. Load Food Descriptions (NDB_No -> Name, Group_ID)
  36. food_info = {}
  37. print("Reading food descriptions...")
  38. with open(os.path.join(SR_PATH, "FOOD_DES.txt"), "r", encoding="iso-8859-1") as f:
  39. for line in f:
  40. parts = parse_usda_line(line)
  41. ndb_no = parts[0]
  42. group_id = parts[1]
  43. long_desc = parts[2]
  44. food_info[ndb_no] = {
  45. 'name': long_desc,
  46. 'category': food_groups.get(group_id, "Unknown")
  47. }
  48. # 2. Load Nutrient Data
  49. # Structure: ndb_no -> {nutrient_id: value}
  50. nutrient_data = {}
  51. print("Reading nutrient data (this may take a moment)...")
  52. with open(os.path.join(SR_PATH, "NUT_DATA.txt"), "r", encoding="iso-8859-1") as f:
  53. for line in f:
  54. parts = parse_usda_line(line)
  55. ndb_no = parts[0]
  56. nutr_no = parts[1]
  57. val = float(parts[2]) if parts[2] else 0.0
  58. if nutr_no in NUTRIENT_MAP:
  59. if ndb_no not in nutrient_data:
  60. nutrient_data[ndb_no] = {}
  61. nutrient_data[ndb_no][NUTRIENT_MAP[nutr_no]] = val
  62. # 3. Insert into localfood.db
  63. print(f"Ingesting into {DB_PATH}...")
  64. conn = sqlite3.connect(DB_PATH)
  65. cursor = conn.cursor()
  66. # First, clear existing foods to avoid duplicates if re-running
  67. cursor.execute("DELETE FROM foods")
  68. count = 0
  69. for ndb_no, info in food_info.items():
  70. macros = nutrient_data.get(ndb_no, {})
  71. # Only add if we have at least some nutritional info
  72. if not macros: continue
  73. cursor.execute('''
  74. INSERT INTO foods (
  75. name, category, calories, protein_g, fat_g, carbs_g, fiber_g, sugar_g, sodium_mg, source
  76. ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  77. ''', (
  78. info['name'],
  79. info['category'],
  80. macros.get('calories', 0.0),
  81. macros.get('protein_g', 0.0),
  82. macros.get('fat_g', 0.0),
  83. macros.get('carbs_g', 0.0),
  84. macros.get('fiber_g', 0.0),
  85. macros.get('sugar_g', 0.0),
  86. macros.get('sodium_mg', 0.0),
  87. f"USDA-{ndb_no}"
  88. ))
  89. count += 1
  90. if count % 1000 == 0:
  91. print(f"Inserted {count} items...")
  92. conn.commit()
  93. conn.close()
  94. print(f"SUCCESS: Successfully seeded {count} high-quality items into the local database!")
  95. if __name__ == "__main__":
  96. run_seeding()