ingest_csv.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #!/usr/bin/env python3
  2. import pandas as pd
  3. import myloginpath
  4. import urllib.parse
  5. from sqlalchemy import create_engine, text
  6. from sqlalchemy.types import VARCHAR, DOUBLE
  7. from sqlalchemy.dialects.mysql import LONGTEXT
  8. import os
  9. import sys
  10. from snmp_notifier import notifier
  11. def get_loader_engine():
  12. try:
  13. import os
  14. db_host = os.environ.get('DB_HOST')
  15. db_user = os.environ.get('DB_USER')
  16. db_pass = os.environ.get('DB_PASS')
  17. if db_host and db_user and db_pass:
  18. password = urllib.parse.quote_plus(db_pass)
  19. conn_str = f"mysql+pymysql://{db_user}:{password}@{db_host}/food_db?charset=utf8mb4"
  20. return create_engine(conn_str)
  21. conf = myloginpath.parse('app_loader')
  22. user = conf.get('user')
  23. password = urllib.parse.quote_plus(conf.get('password'))
  24. host = conf.get('host', '127.0.0.1')
  25. database = 'food_db'
  26. conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
  27. return create_engine(conn_str)
  28. except Exception as e:
  29. print(f"❌ Failed to parse myloginpath or create engine: {e}")
  30. sys.exit(1)
  31. def ingest_file(filename, engine):
  32. if not os.path.exists(filename):
  33. print(f"File {filename} not found locally.")
  34. return False
  35. print(f"\n🚀 Found {filename}! Starting grouped vertical partition ingestion...")
  36. chunk_size = 10000
  37. total_processed = 0
  38. # Define the groupings
  39. groups = {
  40. 'products_core': ['code', 'product_name', 'generic_name', 'brands', 'ingredients_text'],
  41. 'products_allergens': ['code', 'allergens'],
  42. 'products_macros': ['code', 'energy-kcal_100g', 'proteins_100g', 'fat_100g', 'carbohydrates_100g', 'sugars_100g', 'fiber_100g', 'sodium_100g', 'salt_100g', 'cholesterol_100g'],
  43. 'products_vitamins': ['code', 'vitamin-a_100g', 'vitamin-b1_100g', 'vitamin-b2_100g', 'vitamin-pp_100g', 'vitamin-b6_100g', 'vitamin-b9_100g', 'vitamin-b12_100g', 'vitamin-c_100g', 'vitamin-d_100g', 'vitamin-e_100g', 'vitamin-k_100g'],
  44. 'products_minerals': ['code', 'calcium_100g', 'iron_100g', 'magnesium_100g', 'potassium_100g', 'zinc_100g']
  45. }
  46. # Pre-calculate what to read
  47. all_required_cols = list(set([col for cols in groups.values() for col in cols]))
  48. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
  49. try:
  50. # Drop rows with missing codes
  51. if 'code' not in chunk.columns:
  52. continue
  53. df = chunk.dropna(subset=['code']).drop_duplicates(subset=['code']).copy()
  54. # Ensure all required columns exist in the chunk (fill with None if missing)
  55. for col in all_required_cols:
  56. if col not in df.columns:
  57. df[col] = None
  58. for table_name, columns in groups.items():
  59. slice_df = df[columns].copy()
  60. # Cast datatypes: core and allergens are TEXT, others are DOUBLE
  61. if table_name in ['products_core', 'products_allergens']:
  62. sql_dtypes = {col: LONGTEXT() for col in columns if col != 'code'}
  63. sql_dtypes['code'] = VARCHAR(255)
  64. else:
  65. # Convert to numeric (double) safely
  66. for col in columns:
  67. if col != 'code':
  68. slice_df[col] = pd.to_numeric(slice_df[col], errors='coerce')
  69. sql_dtypes = {col: DOUBLE() for col in columns if col != 'code'}
  70. sql_dtypes['code'] = VARCHAR(255)
  71. # Write to temp table
  72. temp_name = f"temp_{table_name}"
  73. slice_df.to_sql(temp_name, con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
  74. # UPSERT into final table with Primary Key enforcement
  75. with engine.begin() as conn:
  76. # Ensure temp table has a primary key on code so LIKE copies it, or alter it later
  77. conn.execute(text(f"ALTER TABLE {temp_name} ADD PRIMARY KEY (code);"))
  78. conn.execute(text(f"CREATE TABLE IF NOT EXISTS {table_name} LIKE {temp_name}"))
  79. cols_str = ", ".join([f"`{c}`" for c in columns])
  80. # Generate ON DUPLICATE KEY UPDATE clause with COALESCE to fill nulls
  81. update_cols = ", ".join([f"`{c}` = COALESCE(`{table_name}`.`{c}`, VALUES(`{c}`))" for c in columns if c != 'code'])
  82. if update_cols:
  83. upsert_query = f"INSERT INTO {table_name} ({cols_str}) SELECT {cols_str} FROM {temp_name} ON DUPLICATE KEY UPDATE {update_cols}"
  84. else:
  85. upsert_query = f"INSERT IGNORE INTO {table_name} ({cols_str}) SELECT {cols_str} FROM {temp_name}"
  86. conn.execute(text(upsert_query))
  87. conn.execute(text(f"DROP TABLE IF EXISTS {temp_name}"))
  88. total_processed += len(df)
  89. print(f" Successfully appended {total_processed} rows into grouped tables...", end="\r")
  90. if total_processed % 50000 == 0:
  91. notifier.send_alert(f"Ingestion Milestone: {total_processed} rows processed")
  92. except BaseException as e:
  93. notifier.send_alert(f"Ingestion Exception: {str(e)}")
  94. print(f"\n [Warning] Chunk skipped due to error: {e}")
  95. notifier.send_alert(f"Ingestion Finished: {filename}")
  96. print(f"\n✅ Finished importing {filename}.")
  97. return True
  98. if __name__ == "__main__":
  99. print("Initiating OpenFoodFacts Grouped Vertical Ingestion Process...")
  100. engine = get_loader_engine()
  101. processed_en = ingest_file('data/en.openfoodfacts.org.products.csv', engine)
  102. processed_fr = ingest_file('data/fr.openfoodfacts.org.products.csv', engine)
  103. if not processed_en and not processed_fr:
  104. print("\n❌ Could not find CSVs.")
  105. else:
  106. print("\n🎉 Full database reload complete! Ready for AI RAG.")