ingest_csv.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import pandas as pd
  2. import myloginpath
  3. import urllib.parse
  4. from sqlalchemy import create_engine
  5. import os
  6. import sys
  7. def get_loader_engine():
  8. try:
  9. conf = myloginpath.parse('app_loader')
  10. user = conf.get('user')
  11. password = urllib.parse.quote_plus(conf.get('password'))
  12. host = conf.get('host', '127.0.0.1')
  13. database = 'food_db'
  14. # Build strict SQLAlchemy PyMySQL string
  15. conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
  16. return create_engine(conn_str)
  17. except Exception as e:
  18. print(f"❌ Failed to parse myloginpath or create engine: {e}")
  19. sys.exit(1)
  20. def ingest_file(filename, engine):
  21. if not os.path.exists(filename):
  22. print(f"File {filename} not found locally.")
  23. return False
  24. print(f"\n🚀 Found {filename}! Starting ingestion via SQLAlchemy pipeline...")
  25. expected_columns = [
  26. "code", "url", "creator", "created_t", "created_datetime", "last_modified_t",
  27. "last_modified_datetime", "product_name", "generic_name", "quantity", "packaging",
  28. "brands", "categories", "origins", "labels", "stores", "countries", "ingredients_text",
  29. "allergens", "traces"
  30. ]
  31. chunk_size = 5000
  32. total_processed = 0
  33. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip'):
  34. # Filter explicitly to schema
  35. available_cols = [col for col in expected_columns if col in chunk.columns]
  36. df = chunk[available_cols]
  37. # Pandas to_sql safely transforms NaNs to SQL NULLs internally
  38. try:
  39. # We use 'append' because the products table already exists with primary keys
  40. # To handle duplicate 'code' primary keys effortlessly, we drop duplicates from the dataframe before insert
  41. # Or depend on PyMySQL. But pandas natively crashes on dupes unless managed.
  42. df = df.drop_duplicates(subset=['code'])
  43. df.to_sql('products', con=engine, if_exists='append', index=False)
  44. total_processed += len(df)
  45. print(f" Successfully appended {total_processed} rows...")
  46. except BaseException as e:
  47. # If a strict primary key duplicate existed in DB already from a previous chunk, ignore row crashes
  48. if "Duplicate entry" in str(e):
  49. pass
  50. else:
  51. print(f" [Warning] Chunk skipped due to internal structural error: {e}")
  52. print(f"✅ Finished importing {filename}.")
  53. return True
  54. if __name__ == "__main__":
  55. print("Initiating OpenFoodFacts CSV Ingestion Process...")
  56. engine = get_loader_engine()
  57. processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
  58. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', engine)
  59. if not processed_en and not processed_fr:
  60. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  61. print("Please download them directly into the root folder and run this script again.")