Explorar o código

Refactor database pipeline to unified schema bypassing row size limits

lanfr144 hai 2 semanas
pai
achega
cb8b0b645e
Modificáronse 2 ficheiros con 62 adicións e 69 borrados
  1. 39 68
      ingest_csv.py
  2. 23 1
      setup_db.py

+ 39 - 68
ingest_csv.py

@@ -25,86 +25,59 @@ def ingest_file(filename, engine):
         print(f"File {filename} not found locally.")
         print(f"File {filename} not found locally.")
         return False
         return False
         
         
-    print(f"\n🚀 Found {filename}! Starting extreme batch ingestion...")
+    print(f"\n🚀 Found {filename}! Starting extreme batch ingestion into unified table...")
     
     
-    chunk_size = 5000 
+    chunk_size = 10000 
     total_processed = 0
     total_processed = 0
 
 
-    # Read dynamically without filtering. Setting low_memory=False to let pandas parse column types flexibly
-    # Forced utf-8 encoding to prevent French accent corruption on Windows OS defaults
+    required_columns = [
+        'code', 'product_name', 'generic_name', 'brands', 'allergens', 'ingredients_text',
+        'proteins_100g', 'fat_100g', 'carbohydrates_100g', 'sugars_100g', 'sodium_100g', 
+        'energy-kcal_100g', 'vitamin-c_100g', 'iron_100g', 'calcium_100g'
+    ]
+
     for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
     for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
         try:
         try:
-            # Drop duplicates by code natively
-            if 'code' in chunk.columns:
-                df = chunk.drop_duplicates(subset=['code'])
-            else:
-                df = chunk
-            # Eliminate completely empty columns to save storage
-            df.dropna(axis=1, how='all', inplace=True)
+            # Filter to only the columns that actually exist in this chunk and are in required_columns
+            available_cols = [c for c in required_columns if c in chunk.columns]
+            df = chunk[available_cols].copy()
+            
+            if 'code' not in df.columns:
+                continue
+
+            # Drop missing codes and local duplicates
+            df.dropna(subset=['code'], inplace=True)
+            df.drop_duplicates(subset=['code'], inplace=True)
             
             
-            # Segment the dataframe into chunks of 50 columns each to bypass InnoDB constraints
-            cols = list(df.columns)
-            if 'code' in cols: cols.remove('code')
+            # Ensure all required columns exist in the dataframe (fill missing with None)
+            for col in required_columns:
+                if col not in df.columns:
+                    df[col] = None
+                    
+            # Reorder columns to exactly match the target table schema
+            df = df[required_columns]
             
             
-            p_chunk_size = 4 # Extreme safe size for TEXT columns to stay under 8126 byte row limit
-            chunks = [cols[i:i + p_chunk_size] for i in range(0, len(cols), p_chunk_size)]
+            # Write chunk to a temporary table
+            df.to_sql('temp_products', con=engine, if_exists='replace', index=False)
+            
+            # Use INSERT IGNORE to append to the main table, skipping any global duplicate codes
+            with engine.begin() as connection:
+                connection.execute(text("INSERT IGNORE INTO products SELECT * FROM temp_products"))
             
             
-            for i, col_chunk in enumerate(chunks):
-                table_name = f'products_{i+1}'
-                df_slice = df[['code'] + col_chunk].copy()
-                df_slice.to_sql(table_name, con=engine, if_exists='append', index=False)
-
             total_processed += len(df)
             total_processed += len(df)
-            print(f"   Successfully appended {total_processed} rows (Dynamic schema)...", end="\r")
+            print(f"   Successfully appended {total_processed} rows into unified schema...", end="\r")
         except BaseException as e:
         except BaseException as e:
-            if "Duplicate entry" in str(e):
-                pass
-            else:
-                 print(f"\n   [Warning] Chunk skipped due to error: {e}")
+            print(f"\n   [Warning] Chunk skipped due to error: {e}")
+            
+    # Cleanup temp table
+    with engine.begin() as connection:
+        connection.execute(text("DROP TABLE IF EXISTS temp_products"))
         
         
     print(f"\n✅ Finished importing {filename}.")
     print(f"\n✅ Finished importing {filename}.")
     return True
     return True
 
 
-def create_indexes(engine):
-    # Determine how many tables were actually created
-    num_tables = 0
-    with engine.connect() as conn:
-        res = conn.execute(text("SHOW TABLES LIKE 'products_%'"))
-        num_tables = len(res.fetchall())
-
-    print(f"\n🛠️ Creating performance indexes on {num_tables} partition tables...")
-    try:
-        with engine.begin() as connection:
-            # Enforce Primary Keys on ALL partitions
-            for i in range(1, num_tables + 1):
-                try:
-                    connection.execute(text(f"ALTER TABLE products_{i} MODIFY code VARCHAR(50);"))
-                    connection.execute(text(f"ALTER TABLE products_{i} ADD PRIMARY KEY (code);"))
-                except: pass
-
-            print("  Building Global MySQL VIEW...")
-            view_sql = f"CREATE VIEW products AS SELECT p1.* "
-            joins = []
-            for i in range(2, num_tables + 1):
-                # Get columns for this table except 'code'
-                cols_res = connection.execute(text(f"SHOW COLUMNS FROM products_{i}"))
-                table_cols = [c[0] for c in cols_res.fetchall() if c[0] != 'code']
-                if table_cols:
-                    view_sql += ", " + ", ".join([f"p{i}.`{c}`" for c in table_cols])
-                joins.append(f"LEFT JOIN products_{i} p{i} ON p1.code = p{i}.code")
-            
-            view_sql += " FROM products_1 p1 " + " ".join(joins)
-            
-            try:
-                connection.execute(text(view_sql))
-            except Exception as ev:
-                print(f"  Warning: View creation failed: {ev}")
-        print("✅ Indexing Complete!")
-    except Exception as e:
-        print(f"❌ Indexing encountered an issue: {e}")
-
 if __name__ == "__main__":
 if __name__ == "__main__":
-    print("Initiating OpenFoodFacts CSV Ingestion Process...")
+    print("Initiating OpenFoodFacts CSV Unified Ingestion Process...")
     engine = get_loader_engine()
     engine = get_loader_engine()
     
     
     processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
     processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
@@ -114,6 +87,4 @@ if __name__ == "__main__":
         print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
         print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
         print("Please download them directly into the root folder and run this script again.")
         print("Please download them directly into the root folder and run this script again.")
     else:
     else:
-        # Build indexes now that all data is appended!
-        create_indexes(engine)
-        print("\n🎉 Full database reload and indexing complete! Ready for AI RAG.")
+        print("\n🎉 Full database reload complete! Ready for AI RAG.")

+ 23 - 1
setup_db.py

@@ -125,10 +125,32 @@ def run_db_setup():
     ) ENGINE=InnoDB;
     ) ENGINE=InnoDB;
     """)
     """)
 
 
-    # 4. Products Table (Dynamic Drop for partitioned logic)
+    # 4. Products Table (Unified)
     for i in range(1, 101): # Drop up to 100 partitions just in case
     for i in range(1, 101): # Drop up to 100 partitions just in case
         cursor.execute(f"DROP TABLE IF EXISTS food_db.products_{i};")
         cursor.execute(f"DROP TABLE IF EXISTS food_db.products_{i};")
     cursor.execute("DROP VIEW IF EXISTS food_db.products;")
     cursor.execute("DROP VIEW IF EXISTS food_db.products;")
+    cursor.execute("DROP TABLE IF EXISTS food_db.products;")
+    
+    cursor.execute("""
+    CREATE TABLE IF NOT EXISTS food_db.products (
+        code VARCHAR(50) PRIMARY KEY,
+        product_name TEXT NULL,
+        generic_name TEXT NULL,
+        brands TEXT NULL,
+        allergens TEXT NULL,
+        ingredients_text TEXT NULL,
+        proteins_100g DOUBLE NULL,
+        fat_100g DOUBLE NULL,
+        carbohydrates_100g DOUBLE NULL,
+        sugars_100g DOUBLE NULL,
+        sodium_100g DOUBLE NULL,
+        `energy-kcal_100g` DOUBLE NULL,
+        `vitamin-c_100g` DOUBLE NULL,
+        iron_100g DOUBLE NULL,
+        calcium_100g DOUBLE NULL,
+        FULLTEXT idx_search (product_name, ingredients_text)
+    ) ENGINE=InnoDB;
+    """)
     
     
     # Table Context Grants (PoLP)
     # Table Context Grants (PoLP)
     # The authenticated app process can handle credentials and now read/write custom plates!
     # The authenticated app process can handle credentials and now read/write custom plates!