1
0

ingest_csv.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import pandas as pd
  2. import pymysql
  3. import myloginpath
  4. import os
  5. import sys
  6. def get_loader_connection():
  7. try:
  8. conf = myloginpath.parse('app_loader')
  9. return pymysql.connect(
  10. host=conf.get('host', '127.0.0.1'),
  11. user=conf.get('user'),
  12. password=conf.get('password'),
  13. database='food_db'
  14. )
  15. except Exception as e:
  16. print(f"❌ Failed to connect to MySQL via app_loader: {e}")
  17. print("Did you run: mysql_config_editor set --login-path=app_loader --host=127.0.0.1 --user=db_loader --password")
  18. sys.exit(1)
  19. def ingest_file(filename, conn):
  20. if not os.path.exists(filename):
  21. return False
  22. print(f"\n🚀 Found {filename}! Starting ingestion pipeline...")
  23. # We read the first few rows to grab the columns our table actually expects.
  24. # (assuming products table matches OpenFoodFacts core schema)
  25. expected_columns = [
  26. "code", "url", "creator", "created_t", "created_datetime", "last_modified_t",
  27. "last_modified_datetime", "product_name", "generic_name", "quantity", "packaging",
  28. "brands", "categories", "origins", "labels", "stores", "countries", "ingredients_text",
  29. "allergens", "traces"
  30. ]
  31. # Reduced chunk size to 1000 to prevent 'max_allowed_packet' and PyMySQL memory crash
  32. chunk_size = 1000
  33. total_processed = 0
  34. # Using chunking to stream into MySQL efficiently
  35. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip'):
  36. # Filter only the columns we mapped
  37. available_cols = [col for col in expected_columns if col in chunk.columns]
  38. df = chunk[available_cols]
  39. # Replace NaN with None so MySQL treats it as NULL
  40. df = df.where(pd.notnull(df), None)
  41. placeholders = ', '.join(['%s'] * len(available_cols))
  42. columns_str = ', '.join([f"`{col}`" for col in available_cols])
  43. # Use INSERT IGNORE to prevent crashing on duplicate primary keys (barcodes)
  44. sql = f"INSERT IGNORE INTO products ({columns_str}) VALUES ({placeholders})"
  45. with conn.cursor() as cursor:
  46. cursor.executemany(sql, df.values.tolist())
  47. conn.commit()
  48. total_processed += len(df)
  49. print(f" Inserted {total_processed} rows...")
  50. print(f"✅ Finished importing {filename}.")
  51. return True
  52. if __name__ == "__main__":
  53. print("Initiating OpenFoodFacts CSV Ingestion Process...")
  54. conn = get_loader_connection()
  55. processed_en = ingest_file('en.openfoodfacts.org.products.csv', conn)
  56. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', conn)
  57. if not processed_en and not processed_fr:
  58. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  59. print("Please download them directly into the root folder and run this script again.")
  60. conn.close()