|
|
@@ -25,42 +25,62 @@ def ingest_file(filename, engine):
|
|
|
print(f"File {filename} not found locally.")
|
|
|
return False
|
|
|
|
|
|
- print(f"\n🚀 Found {filename}! Starting ingestion via SQLAlchemy pipeline...")
|
|
|
-
|
|
|
- expected_columns = [
|
|
|
- "code", "url", "creator", "created_t", "created_datetime", "last_modified_t",
|
|
|
- "last_modified_datetime", "product_name", "generic_name", "quantity", "packaging",
|
|
|
- "brands", "categories", "origins", "labels", "stores", "countries", "ingredients_text",
|
|
|
- "allergens", "traces"
|
|
|
- ]
|
|
|
+ print(f"\n🚀 Found {filename}! Starting extreme batch ingestion...")
|
|
|
|
|
|
chunk_size = 5000
|
|
|
total_processed = 0
|
|
|
|
|
|
- for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip'):
|
|
|
- # Filter explicitly to schema
|
|
|
- available_cols = [col for col in expected_columns if col in chunk.columns]
|
|
|
- df = chunk[available_cols]
|
|
|
-
|
|
|
- # Pandas to_sql safely transforms NaNs to SQL NULLs internally
|
|
|
+ # Read dynamically without filtering. Setting low_memory=False to let pandas parse column types flexibly
|
|
|
+ for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False):
|
|
|
try:
|
|
|
- # We use 'append' because the products table already exists with primary keys
|
|
|
- # To handle duplicate 'code' primary keys effortlessly, we drop duplicates from the dataframe before insert
|
|
|
- # Or depend on PyMySQL. But pandas natively crashes on dupes unless managed.
|
|
|
- df = df.drop_duplicates(subset=['code'])
|
|
|
+ # Drop duplicates by code natively
|
|
|
+ if 'code' in chunk.columns:
|
|
|
+ df = chunk.drop_duplicates(subset=['code'])
|
|
|
+ else:
|
|
|
+ df = chunk
|
|
|
+
|
|
|
df.to_sql('products', con=engine, if_exists='append', index=False)
|
|
|
total_processed += len(df)
|
|
|
- print(f" Successfully appended {total_processed} rows...")
|
|
|
+ print(f" Successfully appended {total_processed} rows (Dynamic schema)...", end="\r")
|
|
|
except BaseException as e:
|
|
|
- # If a strict primary key duplicate existed in DB already from a previous chunk, ignore row crashes
|
|
|
if "Duplicate entry" in str(e):
|
|
|
pass
|
|
|
else:
|
|
|
- print(f" [Warning] Chunk skipped due to internal structural error: {e}")
|
|
|
+ print(f"\n [Warning] Chunk skipped due to internal structural error: {e}")
|
|
|
|
|
|
- print(f"✅ Finished importing {filename}.")
|
|
|
+ print(f"\n✅ Finished importing {filename}.")
|
|
|
return True
|
|
|
|
|
|
+def create_indexes(engine):
|
|
|
+ print("\n🛠️ Creating performance indexes on newly generated table...")
|
|
|
+ # B-TREE and FULLTEXT INDEXES created post-ingestion for extreme speed
|
|
|
+ try:
|
|
|
+ with engine.begin() as connection:
|
|
|
+ print(" Building Primary Key on `code`...")
|
|
|
+ # We must make `code` the primary key if pandas just made it a TEXT field
|
|
|
+ # But MySQL cannot have a TEXT field as PRIMARY KEY without a length constraint.
|
|
|
+ # Convert code to VARCHAR(50) first.
|
|
|
+ connection.execute(urllib.parse.unquote("ALTER TABLE products MODIFY code VARCHAR(50);"))
|
|
|
+ connection.execute(urllib.parse.unquote("ALTER TABLE products ADD PRIMARY KEY (code);"))
|
|
|
+
|
|
|
+ print(" Building Fulltext Indexes...")
|
|
|
+ connection.execute(urllib.parse.unquote("CREATE FULLTEXT INDEX ft_idx_search ON products(product_name, ingredients_text, brands);"))
|
|
|
+
|
|
|
+ print(" Building B-TREE Indexes on core macros...")
|
|
|
+ # We attempt to index key macros if they exist
|
|
|
+ macro_cols = ['energy-kcal_100g', 'fat_100g', 'carbohydrates_100g', 'proteins_100g']
|
|
|
+ for col in macro_cols:
|
|
|
+ # Convert TEXT to DOUBLE for numerical indexing and querying
|
|
|
+ # We catch errors if the column doesn't exist to be safe
|
|
|
+ try:
|
|
|
+ connection.execute(urllib.parse.unquote(f"ALTER TABLE products MODIFY `{col}` DOUBLE;"))
|
|
|
+ connection.execute(urllib.parse.unquote(f"CREATE INDEX idx_{col.replace('-', '_')} ON products(`{col}`);"))
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ print("✅ Indexing Complete!")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"❌ Indexing encountered an issue: {e}")
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
print("Initiating OpenFoodFacts CSV Ingestion Process...")
|
|
|
engine = get_loader_engine()
|
|
|
@@ -71,3 +91,7 @@ if __name__ == "__main__":
|
|
|
if not processed_en and not processed_fr:
|
|
|
print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
|
|
|
print("Please download them directly into the root folder and run this script again.")
|
|
|
+ else:
|
|
|
+ # Build indexes now that all data is appended!
|
|
|
+ create_indexes(engine)
|
|
|
+ print("\n🎉 Full database reload and indexing complete! Ready for AI RAG.")
|