ingest_csv.py 4.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import pandas as pd
  2. import myloginpath
  3. import urllib.parse
  4. from sqlalchemy import create_engine
  5. import os
  6. import sys
  7. def get_loader_engine():
  8. try:
  9. conf = myloginpath.parse('app_loader')
  10. user = conf.get('user')
  11. password = urllib.parse.quote_plus(conf.get('password'))
  12. host = conf.get('host', '127.0.0.1')
  13. database = 'food_db'
  14. # Build strict SQLAlchemy PyMySQL string
  15. conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
  16. return create_engine(conn_str)
  17. except Exception as e:
  18. print(f"❌ Failed to parse myloginpath or create engine: {e}")
  19. sys.exit(1)
  20. def ingest_file(filename, engine):
  21. if not os.path.exists(filename):
  22. print(f"File {filename} not found locally.")
  23. return False
  24. print(f"\n🚀 Found {filename}! Starting extreme batch ingestion...")
  25. chunk_size = 5000
  26. total_processed = 0
  27. # Read dynamically without filtering. Setting low_memory=False to let pandas parse column types flexibly
  28. # Forced utf-8 encoding to prevent French accent corruption on Windows OS defaults
  29. for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
  30. try:
  31. # Drop duplicates by code natively
  32. if 'code' in chunk.columns:
  33. df = chunk.drop_duplicates(subset=['code'])
  34. else:
  35. df = chunk
  36. df.to_sql('products', con=engine, if_exists='append', index=False)
  37. total_processed += len(df)
  38. print(f" Successfully appended {total_processed} rows (Dynamic schema)...", end="\r")
  39. except BaseException as e:
  40. if "Duplicate entry" in str(e):
  41. pass
  42. else:
  43. print(f"\n [Warning] Chunk skipped due to internal structural error: {e}")
  44. print(f"\n✅ Finished importing {filename}.")
  45. return True
  46. def create_indexes(engine):
  47. print("\n🛠️ Creating performance indexes on newly generated table...")
  48. # B-TREE and FULLTEXT INDEXES created post-ingestion for extreme speed
  49. try:
  50. with engine.begin() as connection:
  51. print(" Building Primary Key on `code`...")
  52. # We must make `code` the primary key if pandas just made it a TEXT field
  53. # But MySQL cannot have a TEXT field as PRIMARY KEY without a length constraint.
  54. # Convert code to VARCHAR(50) first.
  55. connection.execute(urllib.parse.unquote("ALTER TABLE products MODIFY code VARCHAR(50);"))
  56. connection.execute(urllib.parse.unquote("ALTER TABLE products ADD PRIMARY KEY (code);"))
  57. print(" Building Fulltext Indexes...")
  58. connection.execute(urllib.parse.unquote("CREATE FULLTEXT INDEX ft_idx_search ON products(product_name, ingredients_text, brands);"))
  59. print(" Building B-TREE Indexes on core macros...")
  60. # We attempt to index key macros if they exist
  61. macro_cols = ['energy-kcal_100g', 'fat_100g', 'carbohydrates_100g', 'proteins_100g', 'sugars_100g', 'sodium_100g', 'iron_100g', 'calcium_100g', 'vitamin-c_100g']
  62. for col in macro_cols:
  63. # Convert TEXT to DOUBLE for numerical indexing and querying
  64. # We catch errors if the column doesn't exist to be safe
  65. try:
  66. connection.execute(urllib.parse.unquote(f"ALTER TABLE products MODIFY `{col}` DOUBLE;"))
  67. connection.execute(urllib.parse.unquote(f"CREATE INDEX idx_{col.replace('-', '_')} ON products(`{col}`);"))
  68. except:
  69. pass
  70. print("✅ Indexing Complete!")
  71. except Exception as e:
  72. print(f"❌ Indexing encountered an issue: {e}")
  73. if __name__ == "__main__":
  74. print("Initiating OpenFoodFacts CSV Ingestion Process...")
  75. engine = get_loader_engine()
  76. processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
  77. processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', engine)
  78. if not processed_en and not processed_fr:
  79. print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
  80. print("Please download them directly into the root folder and run this script again.")
  81. else:
  82. # Build indexes now that all data is appended!
  83. create_indexes(engine)
  84. print("\n🎉 Full database reload and indexing complete! Ready for AI RAG.")