1
0

ingest_csv.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import pandas as pd
  2. import myloginpath
  3. import urllib.parse
  4. from sqlalchemy import create_engine, text
  5. import os
  6. import sys
  7. def get_loader_engine():
  8. try:
  9. conf = myloginpath.parse('app_loader')
  10. user = conf.get('user')
  11. password = urllib.parse.quote_plus(conf.get('password'))
  12. host = conf.get('host', '127.0.0.1')
  13. database = 'food_db'
  14. # Build strict SQLAlchemy PyMySQL string
  15. conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
  16. return create_engine(conn_str)
  17. except Exception as e:
  18. print(f"❌ Failed to parse myloginpath or create engine: {e}")
  19. sys.exit(1)
  20. def ingest_file(filename, engine):
  21. if not os.path.exists(filename):
  22. print(f"File {filename} not found locally.")
  23. return False
  24. print(f"\n🚀 Found {filename}! Starting extreme batch ingestion...")
  25. chunk_size = 5000
  26. total_processed = 0
  27. # Read dynamically without filtering. Setting low_memory=False to let pandas parse column types flexibly
  28. # Forced utf-8 encoding to prevent French accent corruption on Windows OS defaults
  29. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
  30. try:
  31. # Drop duplicates by code natively
  32. if 'code' in chunk.columns:
  33. df = chunk.drop_duplicates(subset=['code'])
  34. else:
  35. df = chunk
  36. # Eliminate completely empty columns to save storage
  37. df.dropna(axis=1, how='all', inplace=True)
  38. # Segment the dataframe into chunks of 50 columns each to bypass InnoDB constraints
  39. cols = list(df.columns)
  40. if 'code' in cols: cols.remove('code')
  41. chunk_size = 50
  42. chunks = [cols[i:i + chunk_size] for i in range(0, len(cols), chunk_size)]
  43. for i, col_chunk in enumerate(chunks):
  44. # Ensure 'code' maps across every single table
  45. table_name = f'products_{i+1}'
  46. df_slice = df[['code'] + col_chunk].copy()
  47. df_slice.to_sql(table_name, con=engine, if_exists='append', index=False)
  48. total_processed += len(df)
  49. print(f" Successfully appended {total_processed} rows (Dynamic schema)...", end="\r")
  50. except BaseException as e:
  51. if "Duplicate entry" in str(e):
  52. pass
  53. else:
  54. print(f"\n [Warning] Chunk skipped due to internal structural error: {e}")
  55. print(f"\n✅ Finished importing {filename}.")
  56. return True
  57. def create_indexes(engine):
  58. print("\n🛠️ Creating performance indexes on newly generated table...")
  59. # B-TREE and FULLTEXT INDEXES created post-ingestion for extreme speed
  60. try:
  61. with engine.begin() as connection:
  62. print(" Building Core Architecture on Partitions...")
  63. # Enforce Primary Keys on the first 4 partitions
  64. for i in range(1, 5):
  65. try:
  66. connection.execute(text(f"ALTER TABLE products_{i} MODIFY code VARCHAR(50);"))
  67. connection.execute(text(f"ALTER TABLE products_{i} ADD PRIMARY KEY (code);"))
  68. except: pass
  69. print(" Building Dynamic MySQL View...")
  70. # We build a massive Join View so the app doesn't need to know about the segments
  71. try:
  72. connection.execute(text("""
  73. CREATE VIEW products AS
  74. SELECT p1.*,
  75. p2.energy_100g, p2.`energy-kcal_100g`, p2.proteins_100g, p2.fat_100g, p2.carbohydrates_100g, p2.sugars_100g, p2.salt_100g, p2.sodium_100g, p2.fiber_100g,
  76. p3.iron_100g, p3.calcium_100g, p3.`vitamin-c_100g`, p3.`vitamin-d_100g`
  77. FROM products_1 p1
  78. LEFT JOIN products_2 p2 ON p1.code = p2.code
  79. LEFT JOIN products_3 p3 ON p1.code = p3.code
  80. """))
  81. except: pass
  82. print("✅ Indexing Complete!")
  83. except Exception as e:
  84. print(f"❌ Indexing encountered an issue: {e}")
  85. if __name__ == "__main__":
  86. print("Initiating OpenFoodFacts CSV Ingestion Process...")
  87. engine = get_loader_engine()
  88. processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
  89. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', engine)
  90. if not processed_en and not processed_fr:
  91. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  92. print("Please download them directly into the root folder and run this script again.")
  93. else:
  94. # Build indexes now that all data is appended!
  95. create_indexes(engine)
  96. print("\n🎉 Full database reload and indexing complete! Ready for AI RAG.")