mega_seed_usda.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import sqlite3
  2. import os
  3. import csv
  4. import sys
  5. # Define path to the unzipped SR Legacy data
  6. SR_PATH = "sr28data"
  7. DB_PATH = "localfood.db"
  8. # Nutrient IDs to extract (USDA SR28 IDs)
  9. NUTRIENT_MAP = {
  10. '208': 'calories',
  11. '203': 'protein_g',
  12. '204': 'fat_g',
  13. '205': 'carbs_g',
  14. '291': 'fiber_g',
  15. '269': 'sugar_g',
  16. '307': 'sodium_mg',
  17. '301': 'calcium_mg',
  18. '303': 'iron_mg',
  19. '306': 'potassium_mg',
  20. '318': 'vitamin_a_iu',
  21. '401': 'vitamin_c_mg',
  22. '601': 'cholesterol_mg'
  23. }
  24. def parse_usda_line(line):
  25. """USDA SR Legacy files are ^-delimited with ~ around strings"""
  26. return [item.strip('~') for item in line.strip().split('^')]
  27. def run_seeding():
  28. print("Starting Mega-Seeding from USDA SR Legacy...")
  29. if not os.path.exists(SR_PATH):
  30. print(f"Error: {SR_PATH} directory not found.")
  31. return
  32. # 0. Load Food Groups (ID -> Group Name)
  33. food_groups = {}
  34. print("Reading food groups...")
  35. with open(os.path.join(SR_PATH, "FD_GROUP.txt"), "r", encoding="iso-8859-1") as f:
  36. for line in f:
  37. parts = parse_usda_line(line)
  38. group_id = parts[0]
  39. group_name = parts[1]
  40. food_groups[group_id] = group_name
  41. # 1. Load Food Descriptions (NDB_No -> Name, Group_ID)
  42. food_info = {}
  43. print("Reading food descriptions...")
  44. with open(os.path.join(SR_PATH, "FOOD_DES.txt"), "r", encoding="iso-8859-1") as f:
  45. for line in f:
  46. parts = parse_usda_line(line)
  47. ndb_no = parts[0]
  48. group_id = parts[1]
  49. long_desc = parts[2]
  50. food_info[ndb_no] = {
  51. 'name': long_desc,
  52. 'category': food_groups.get(group_id, "Unknown")
  53. }
  54. # 2. Load Nutrient Data
  55. # Structure: ndb_no -> {nutrient_id: value}
  56. nutrient_data = {}
  57. print("Reading nutrient data (this may take a moment)...")
  58. with open(os.path.join(SR_PATH, "NUT_DATA.txt"), "r", encoding="iso-8859-1") as f:
  59. for line in f:
  60. parts = parse_usda_line(line)
  61. ndb_no = parts[0]
  62. nutr_no = parts[1]
  63. val = float(parts[2]) if parts[2] else 0.0
  64. if nutr_no in NUTRIENT_MAP:
  65. if ndb_no not in nutrient_data:
  66. nutrient_data[ndb_no] = {}
  67. nutrient_data[ndb_no][NUTRIENT_MAP[nutr_no]] = val
  68. # 3. Insert into localfood.db
  69. print(f"Ingesting into {DB_PATH}...")
  70. conn = sqlite3.connect(DB_PATH)
  71. cursor = conn.cursor()
  72. # First, clear existing foods to avoid duplicates if re-running
  73. cursor.execute("DELETE FROM foods")
  74. count = 0
  75. for ndb_no, info in food_info.items():
  76. macros = nutrient_data.get(ndb_no, {})
  77. # Only add if we have at least some nutritional info
  78. if not macros: continue
  79. cursor.execute('''
  80. INSERT INTO foods (
  81. name, category, calories, protein_g, fat_g, carbs_g, fiber_g, sugar_g,
  82. sodium_mg, calcium_mg, iron_mg, potassium_mg, vitamin_a_iu, vitamin_c_mg,
  83. cholesterol_mg, source
  84. ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  85. ''', (
  86. info['name'],
  87. info['category'],
  88. macros.get('calories', 0.0),
  89. macros.get('protein_g', 0.0),
  90. macros.get('fat_g', 0.0),
  91. macros.get('carbs_g', 0.0),
  92. macros.get('fiber_g', 0.0),
  93. macros.get('sugar_g', 0.0),
  94. macros.get('sodium_mg', 0.0),
  95. macros.get('calcium_mg', 0.0),
  96. macros.get('iron_mg', 0.0),
  97. macros.get('potassium_mg', 0.0),
  98. macros.get('vitamin_a_iu', 0.0),
  99. macros.get('vitamin_c_mg', 0.0),
  100. macros.get('cholesterol_mg', 0.0),
  101. f"USDA-{ndb_no}"
  102. ))
  103. count += 1
  104. if count % 1000 == 0:
  105. print(f"Inserted {count} items...")
  106. conn.commit()
  107. conn.close()
  108. print(f"SUCCESS: Successfully seeded {count} high-quality items into the local database!")
  109. if __name__ == "__main__":
  110. run_seeding()