1
0

ingest_csv.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import pandas as pd
  2. import pymysql
  3. import myloginpath
  4. import os
  5. import sys
  6. def get_loader_connection():
  7. try:
  8. conf = myloginpath.parse('app_loader')
  9. return pymysql.connect(
  10. host=conf.get('host', '127.0.0.1'),
  11. user=conf.get('user'),
  12. password=conf.get('password'),
  13. database='food_db',
  14. local_infile=True
  15. )
  16. except Exception as e:
  17. print(f"❌ Failed to connect to MySQL via app_loader: {e}")
  18. print("Did you run: mysql_config_editor set --login-path=app_loader --host=127.0.0.1 --user=db_loader --password")
  19. sys.exit(1)
  20. def ingest_file(filename, conn):
  21. if not os.path.exists(filename):
  22. return False
  23. print(f"\n🚀 Found {filename}! Starting ingestion pipeline...")
  24. # We read the first few rows to grab the columns our table actually expects.
  25. # (assuming products table matches OpenFoodFacts core schema)
  26. expected_columns = [
  27. "code", "url", "creator", "created_t", "created_datetime", "last_modified_t",
  28. "last_modified_datetime", "product_name", "generic_name", "quantity", "packaging",
  29. "brands", "categories", "origins", "labels", "stores", "countries", "ingredients_text",
  30. "allergens", "traces"
  31. ]
  32. chunk_size = 50000
  33. total_processed = 0
  34. # Using chunking to stream into MySQL efficiently
  35. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip'):
  36. # Filter only the columns we mapped
  37. available_cols = [col for col in expected_columns if col in chunk.columns]
  38. df = chunk[available_cols]
  39. # Replace NaN with None so MySQL treats it as NULL
  40. df = df.where(pd.notnull(df), None)
  41. placeholders = ', '.join(['%s'] * len(available_cols))
  42. columns_str = ', '.join([f"`{col}`" for col in available_cols])
  43. # Use INSERT IGNORE to prevent crashing on duplicate primary keys (barcodes)
  44. sql = f"INSERT IGNORE INTO products ({columns_str}) VALUES ({placeholders})"
  45. with conn.cursor() as cursor:
  46. cursor.executemany(sql, df.values.tolist())
  47. conn.commit()
  48. total_processed += len(df)
  49. print(f" Inserted {total_processed} rows...")
  50. print(f"✅ Finished importing {filename}.")
  51. return True
  52. if __name__ == "__main__":
  53. print("Initiating OpenFoodFacts CSV Ingestion Process...")
  54. conn = get_loader_connection()
  55. processed_en = ingest_file('en.openfoodfacts.org.products.csv', conn)
  56. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', conn)
  57. if not processed_en and not processed_fr:
  58. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  59. print("Please download them directly into the root folder and run this script again.")
  60. conn.close()