1
0

ingest_csv.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import pandas as pd
  2. import myloginpath
  3. import urllib.parse
  4. from sqlalchemy import create_engine, text
  5. from sqlalchemy.types import VARCHAR, TEXT, DOUBLE
  6. import os
  7. import sys
  8. def get_loader_engine():
  9. try:
  10. conf = myloginpath.parse('app_loader')
  11. user = conf.get('user')
  12. password = urllib.parse.quote_plus(conf.get('password'))
  13. host = conf.get('host', '127.0.0.1')
  14. database = 'food_db'
  15. # Build strict SQLAlchemy PyMySQL string
  16. conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
  17. return create_engine(conn_str)
  18. except Exception as e:
  19. print(f"❌ Failed to parse myloginpath or create engine: {e}")
  20. sys.exit(1)
  21. def ingest_file(filename, engine):
  22. if not os.path.exists(filename):
  23. print(f"File {filename} not found locally.")
  24. return False
  25. print(f"\n🚀 Found {filename}! Starting extreme batch ingestion for ALL columns...")
  26. chunk_size = 10000
  27. total_processed = 0
  28. is_first_chunk = True
  29. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
  30. try:
  31. df = chunk.copy()
  32. if 'code' not in df.columns:
  33. continue
  34. # Drop missing codes and local duplicates
  35. df.dropna(subset=['code'], inplace=True)
  36. df.drop_duplicates(subset=['code'], inplace=True)
  37. # Map datatypes dynamically to avoid InnoDB row size limits
  38. # Code is VARCHAR(50), everything else is TEXT (strings) or DOUBLE (if we were casting, but we read as str)
  39. # Since we read dtype=str, pandas will default all to TEXT which is perfect for Off-Page storage.
  40. sql_dtypes = {col: TEXT() for col in df.columns}
  41. sql_dtypes['code'] = VARCHAR(50)
  42. if is_first_chunk:
  43. # 1. Initialize the target table with the exact schema from the first chunk
  44. df.head(0).to_sql('products', con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
  45. # 2. Add Primary Key immediately
  46. with engine.begin() as conn:
  47. conn.execute(text("ALTER TABLE products ADD PRIMARY KEY (code);"))
  48. is_first_chunk = False
  49. # Write chunk to a temporary table
  50. df.to_sql('temp_products', con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
  51. # Use INSERT IGNORE to append to the main table, skipping any global duplicate codes
  52. with engine.begin() as connection:
  53. # Ensure columns match by explicitly listing them
  54. cols = ", ".join([f"`{c}`" for c in df.columns])
  55. connection.execute(text(f"INSERT IGNORE INTO products ({cols}) SELECT {cols} FROM temp_products"))
  56. total_processed += len(df)
  57. print(f" Successfully appended {total_processed} rows into unified dynamic schema...", end="\r")
  58. except BaseException as e:
  59. print(f"\n [Warning] Chunk skipped due to error: {e}")
  60. # Cleanup temp table
  61. with engine.begin() as connection:
  62. connection.execute(text("DROP TABLE IF EXISTS temp_products"))
  63. print(f"\n✅ Finished importing {filename}.")
  64. return True
  65. def create_indexes(engine):
  66. print("\n🛠️ Creating performance indexes (FULLTEXT and Standard)...")
  67. try:
  68. with engine.begin() as connection:
  69. # Add Fulltext Search on vital textual fields if they exist
  70. try:
  71. connection.execute(text("ALTER TABLE products ADD FULLTEXT idx_search (product_name, ingredients_text);"))
  72. print(" - Added FULLTEXT index on product_name, ingredients_text")
  73. except Exception as e:
  74. print(f" - Skipped FULLTEXT idx_search: {e}")
  75. try:
  76. connection.execute(text("ALTER TABLE products ADD FULLTEXT idx_allergens (allergens);"))
  77. print(" - Added FULLTEXT index on allergens")
  78. except Exception as e:
  79. print(f" - Skipped FULLTEXT idx_allergens: {e}")
  80. # Standard indexes for fast exact matches
  81. try:
  82. connection.execute(text("ALTER TABLE products ADD INDEX idx_brands (brands(50));"))
  83. print(" - Added INDEX on brands")
  84. except Exception as e:
  85. print(f" - Skipped INDEX idx_brands: {e}")
  86. try:
  87. connection.execute(text("ALTER TABLE products ADD INDEX idx_generic (generic_name(50));"))
  88. print(" - Added INDEX on generic_name")
  89. except Exception as e:
  90. print(f" - Skipped INDEX idx_generic: {e}")
  91. print("✅ Indexing Complete!")
  92. except Exception as e:
  93. print(f"❌ Indexing encountered an issue: {e}")
  94. if __name__ == "__main__":
  95. print("Initiating OpenFoodFacts CSV Unified Dynamic Ingestion Process...")
  96. engine = get_loader_engine()
  97. processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
  98. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', engine)
  99. if not processed_en and not processed_fr:
  100. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  101. print("Please download them directly into the root folder and run this script again.")
  102. else:
  103. create_indexes(engine)
  104. print("\n🎉 Full database reload complete! Ready for AI RAG.")