|
@@ -2,6 +2,7 @@ import pandas as pd
|
|
|
import myloginpath
|
|
import myloginpath
|
|
|
import urllib.parse
|
|
import urllib.parse
|
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy import create_engine, text
|
|
|
|
|
+from sqlalchemy.types import VARCHAR, TEXT, DOUBLE
|
|
|
import os
|
|
import os
|
|
|
import sys
|
|
import sys
|
|
|
|
|
|
|
@@ -25,25 +26,15 @@ def ingest_file(filename, engine):
|
|
|
print(f"File {filename} not found locally.")
|
|
print(f"File {filename} not found locally.")
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
- print(f"\n🚀 Found {filename}! Starting extreme batch ingestion into unified table...")
|
|
|
|
|
|
|
+ print(f"\n🚀 Found {filename}! Starting extreme batch ingestion for ALL columns...")
|
|
|
|
|
|
|
|
chunk_size = 10000
|
|
chunk_size = 10000
|
|
|
total_processed = 0
|
|
total_processed = 0
|
|
|
-
|
|
|
|
|
- required_columns = [
|
|
|
|
|
- 'code', 'product_name', 'generic_name', 'brands', 'allergens', 'ingredients_text',
|
|
|
|
|
- 'proteins_100g', 'fat_100g', 'carbohydrates_100g', 'sugars_100g', 'sodium_100g', 'salt_100g',
|
|
|
|
|
- 'energy-kcal_100g', 'vitamin-a_100g', 'vitamin-d_100g', 'vitamin-e_100g', 'vitamin-k_100g',
|
|
|
|
|
- 'vitamin-c_100g', 'vitamin-b1_100g', 'vitamin-b2_100g', 'vitamin-pp_100g', 'vitamin-b6_100g',
|
|
|
|
|
- 'vitamin-b9_100g', 'vitamin-b12_100g', 'calcium_100g', 'iron_100g', 'magnesium_100g',
|
|
|
|
|
- 'zinc_100g', 'potassium_100g', 'cholesterol_100g', 'fiber_100g'
|
|
|
|
|
- ]
|
|
|
|
|
|
|
+ is_first_chunk = True
|
|
|
|
|
|
|
|
for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
|
|
for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
|
|
|
try:
|
|
try:
|
|
|
- # Filter to only the columns that actually exist in this chunk and are in required_columns
|
|
|
|
|
- available_cols = [c for c in required_columns if c in chunk.columns]
|
|
|
|
|
- df = chunk[available_cols].copy()
|
|
|
|
|
|
|
+ df = chunk.copy()
|
|
|
|
|
|
|
|
if 'code' not in df.columns:
|
|
if 'code' not in df.columns:
|
|
|
continue
|
|
continue
|
|
@@ -52,23 +43,32 @@ def ingest_file(filename, engine):
|
|
|
df.dropna(subset=['code'], inplace=True)
|
|
df.dropna(subset=['code'], inplace=True)
|
|
|
df.drop_duplicates(subset=['code'], inplace=True)
|
|
df.drop_duplicates(subset=['code'], inplace=True)
|
|
|
|
|
|
|
|
- # Ensure all required columns exist in the dataframe (fill missing with None)
|
|
|
|
|
- for col in required_columns:
|
|
|
|
|
- if col not in df.columns:
|
|
|
|
|
- df[col] = None
|
|
|
|
|
-
|
|
|
|
|
- # Reorder columns to exactly match the target table schema
|
|
|
|
|
- df = df[required_columns]
|
|
|
|
|
|
|
+ # Map datatypes dynamically to avoid InnoDB row size limits
|
|
|
|
|
+ # Code is VARCHAR(50), everything else is TEXT (strings) or DOUBLE (if we were casting, but we read as str)
|
|
|
|
|
+ # Since we read dtype=str, pandas will default all to TEXT which is perfect for Off-Page storage.
|
|
|
|
|
+ sql_dtypes = {col: TEXT() for col in df.columns}
|
|
|
|
|
+ sql_dtypes['code'] = VARCHAR(50)
|
|
|
|
|
|
|
|
|
|
+ if is_first_chunk:
|
|
|
|
|
+ # 1. Initialize the target table with the exact schema from the first chunk
|
|
|
|
|
+ df.head(0).to_sql('products', con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
|
|
|
|
|
+
|
|
|
|
|
+ # 2. Add Primary Key immediately
|
|
|
|
|
+ with engine.begin() as conn:
|
|
|
|
|
+ conn.execute(text("ALTER TABLE products ADD PRIMARY KEY (code);"))
|
|
|
|
|
+ is_first_chunk = False
|
|
|
|
|
+
|
|
|
# Write chunk to a temporary table
|
|
# Write chunk to a temporary table
|
|
|
- df.to_sql('temp_products', con=engine, if_exists='replace', index=False)
|
|
|
|
|
|
|
+ df.to_sql('temp_products', con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
|
|
|
|
|
|
|
|
# Use INSERT IGNORE to append to the main table, skipping any global duplicate codes
|
|
# Use INSERT IGNORE to append to the main table, skipping any global duplicate codes
|
|
|
with engine.begin() as connection:
|
|
with engine.begin() as connection:
|
|
|
- connection.execute(text("INSERT IGNORE INTO products SELECT * FROM temp_products"))
|
|
|
|
|
|
|
+ # Ensure columns match by explicitly listing them
|
|
|
|
|
+ cols = ", ".join([f"`{c}`" for c in df.columns])
|
|
|
|
|
+ connection.execute(text(f"INSERT IGNORE INTO products ({cols}) SELECT {cols} FROM temp_products"))
|
|
|
|
|
|
|
|
total_processed += len(df)
|
|
total_processed += len(df)
|
|
|
- print(f" Successfully appended {total_processed} rows into unified schema...", end="\r")
|
|
|
|
|
|
|
+ print(f" Successfully appended {total_processed} rows into unified dynamic schema...", end="\r")
|
|
|
except BaseException as e:
|
|
except BaseException as e:
|
|
|
print(f"\n [Warning] Chunk skipped due to error: {e}")
|
|
print(f"\n [Warning] Chunk skipped due to error: {e}")
|
|
|
|
|
|
|
@@ -79,8 +79,42 @@ def ingest_file(filename, engine):
|
|
|
print(f"\n✅ Finished importing {filename}.")
|
|
print(f"\n✅ Finished importing {filename}.")
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
|
|
+def create_indexes(engine):
|
|
|
|
|
+ print("\n🛠️ Creating performance indexes (FULLTEXT and Standard)...")
|
|
|
|
|
+ try:
|
|
|
|
|
+ with engine.begin() as connection:
|
|
|
|
|
+ # Add Fulltext Search on vital textual fields if they exist
|
|
|
|
|
+ try:
|
|
|
|
|
+ connection.execute(text("ALTER TABLE products ADD FULLTEXT idx_search (product_name, ingredients_text);"))
|
|
|
|
|
+ print(" - Added FULLTEXT index on product_name, ingredients_text")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" - Skipped FULLTEXT idx_search: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ connection.execute(text("ALTER TABLE products ADD FULLTEXT idx_allergens (allergens);"))
|
|
|
|
|
+ print(" - Added FULLTEXT index on allergens")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" - Skipped FULLTEXT idx_allergens: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ # Standard indexes for fast exact matches
|
|
|
|
|
+ try:
|
|
|
|
|
+ connection.execute(text("ALTER TABLE products ADD INDEX idx_brands (brands(50));"))
|
|
|
|
|
+ print(" - Added INDEX on brands")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" - Skipped INDEX idx_brands: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ connection.execute(text("ALTER TABLE products ADD INDEX idx_generic (generic_name(50));"))
|
|
|
|
|
+ print(" - Added INDEX on generic_name")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" - Skipped INDEX idx_generic: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ print("✅ Indexing Complete!")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"❌ Indexing encountered an issue: {e}")
|
|
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
- print("Initiating OpenFoodFacts CSV Unified Ingestion Process...")
|
|
|
|
|
|
|
+ print("Initiating OpenFoodFacts CSV Unified Dynamic Ingestion Process...")
|
|
|
engine = get_loader_engine()
|
|
engine = get_loader_engine()
|
|
|
|
|
|
|
|
processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
|
|
processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
|
|
@@ -90,4 +124,5 @@ if __name__ == "__main__":
|
|
|
print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
|
|
print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
|
|
|
print("Please download them directly into the root folder and run this script again.")
|
|
print("Please download them directly into the root folder and run this script again.")
|
|
|
else:
|
|
else:
|
|
|
|
|
+ create_indexes(engine)
|
|
|
print("\n🎉 Full database reload complete! Ready for AI RAG.")
|
|
print("\n🎉 Full database reload complete! Ready for AI RAG.")
|