ingest_csv.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import pandas as pd
  2. import myloginpath
  3. import urllib.parse
  4. from sqlalchemy import create_engine, text
  5. import os
  6. import sys
  7. def get_loader_engine():
  8. try:
  9. conf = myloginpath.parse('app_loader')
  10. user = conf.get('user')
  11. password = urllib.parse.quote_plus(conf.get('password'))
  12. host = conf.get('host', '127.0.0.1')
  13. database = 'food_db'
  14. # Build strict SQLAlchemy PyMySQL string
  15. conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
  16. return create_engine(conn_str)
  17. except Exception as e:
  18. print(f"❌ Failed to parse myloginpath or create engine: {e}")
  19. sys.exit(1)
  20. def ingest_file(filename, engine):
  21. if not os.path.exists(filename):
  22. print(f"File {filename} not found locally.")
  23. return False
  24. print(f"\n🚀 Found {filename}! Starting extreme batch ingestion into unified table...")
  25. chunk_size = 10000
  26. total_processed = 0
  27. required_columns = [
  28. 'code', 'product_name', 'generic_name', 'brands', 'allergens', 'ingredients_text',
  29. 'proteins_100g', 'fat_100g', 'carbohydrates_100g', 'sugars_100g', 'sodium_100g', 'salt_100g',
  30. 'energy-kcal_100g', 'vitamin-a_100g', 'vitamin-d_100g', 'vitamin-e_100g', 'vitamin-k_100g',
  31. 'vitamin-c_100g', 'vitamin-b1_100g', 'vitamin-b2_100g', 'vitamin-pp_100g', 'vitamin-b6_100g',
  32. 'vitamin-b9_100g', 'vitamin-b12_100g', 'calcium_100g', 'iron_100g', 'magnesium_100g',
  33. 'zinc_100g', 'potassium_100g', 'cholesterol_100g', 'fiber_100g'
  34. ]
  35. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
  36. try:
  37. # Filter to only the columns that actually exist in this chunk and are in required_columns
  38. available_cols = [c for c in required_columns if c in chunk.columns]
  39. df = chunk[available_cols].copy()
  40. if 'code' not in df.columns:
  41. continue
  42. # Drop missing codes and local duplicates
  43. df.dropna(subset=['code'], inplace=True)
  44. df.drop_duplicates(subset=['code'], inplace=True)
  45. # Ensure all required columns exist in the dataframe (fill missing with None)
  46. for col in required_columns:
  47. if col not in df.columns:
  48. df[col] = None
  49. # Reorder columns to exactly match the target table schema
  50. df = df[required_columns]
  51. # Write chunk to a temporary table
  52. df.to_sql('temp_products', con=engine, if_exists='replace', index=False)
  53. # Use INSERT IGNORE to append to the main table, skipping any global duplicate codes
  54. with engine.begin() as connection:
  55. connection.execute(text("INSERT IGNORE INTO products SELECT * FROM temp_products"))
  56. total_processed += len(df)
  57. print(f" Successfully appended {total_processed} rows into unified schema...", end="\r")
  58. except BaseException as e:
  59. print(f"\n [Warning] Chunk skipped due to error: {e}")
  60. # Cleanup temp table
  61. with engine.begin() as connection:
  62. connection.execute(text("DROP TABLE IF EXISTS temp_products"))
  63. print(f"\n✅ Finished importing {filename}.")
  64. return True
  65. if __name__ == "__main__":
  66. print("Initiating OpenFoodFacts CSV Unified Ingestion Process...")
  67. engine = get_loader_engine()
  68. processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
  69. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', engine)
  70. if not processed_en and not processed_fr:
  71. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  72. print("Please download them directly into the root folder and run this script again.")
  73. else:
  74. print("\n🎉 Full database reload complete! Ready for AI RAG.")