Ver código fonte

Implement Grouped Vertical Partitioning architecture

lanfr144 2 semanas atrás
pai
commit
3fd4469aff
2 arquivos alterados com 140 adições e 86 exclusões
  1. 46 75
      ingest_csv.py
  2. 94 11
      setup_db.py

+ 46 - 75
ingest_csv.py

@@ -13,8 +13,6 @@ def get_loader_engine():
         password = urllib.parse.quote_plus(conf.get('password'))
         host = conf.get('host', '127.0.0.1')
         database = 'food_db'
-        
-        # Build strict SQLAlchemy PyMySQL string
         conn_str = f"mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4"
         return create_engine(conn_str)
     except Exception as e:
@@ -26,103 +24,76 @@ def ingest_file(filename, engine):
         print(f"File {filename} not found locally.")
         return False
         
-    print(f"\n🚀 Found {filename}! Starting extreme batch ingestion for ALL columns...")
+    print(f"\n🚀 Found {filename}! Starting grouped vertical partition ingestion...")
     
     chunk_size = 10000 
     total_processed = 0
-    is_first_chunk = True
+
+    # Define the groupings
+    groups = {
+        'products_core': ['code', 'product_name', 'generic_name', 'brands', 'ingredients_text'],
+        'products_allergens': ['code', 'allergens'],
+        'products_macros': ['code', 'energy-kcal_100g', 'proteins_100g', 'fat_100g', 'carbohydrates_100g', 'sugars_100g', 'fiber_100g', 'sodium_100g', 'salt_100g', 'cholesterol_100g'],
+        'products_vitamins': ['code', 'vitamin-a_100g', 'vitamin-b1_100g', 'vitamin-b2_100g', 'vitamin-pp_100g', 'vitamin-b6_100g', 'vitamin-b9_100g', 'vitamin-b12_100g', 'vitamin-c_100g', 'vitamin-d_100g', 'vitamin-e_100g', 'vitamin-k_100g'],
+        'products_minerals': ['code', 'calcium_100g', 'iron_100g', 'magnesium_100g', 'potassium_100g', 'zinc_100g']
+    }
+
+    # Pre-calculate what to read
+    all_required_cols = list(set([col for cols in groups.values() for col in cols]))
 
     for chunk in pd.read_csv(filename, sep='\t', dtype=str, chunksize=chunk_size, on_bad_lines='skip', low_memory=False, encoding='utf-8'):
         try:
-            df = chunk.copy()
-            
-            if 'code' not in df.columns:
+            # Drop rows with missing codes
+            if 'code' not in chunk.columns:
                 continue
-
-            # Drop missing codes and local duplicates
-            df.dropna(subset=['code'], inplace=True)
-            df.drop_duplicates(subset=['code'], inplace=True)
+            df = chunk.dropna(subset=['code']).drop_duplicates(subset=['code']).copy()
             
-            # Map datatypes dynamically to avoid InnoDB row size limits
-            # Code is VARCHAR(50), everything else is TEXT (strings) or DOUBLE (if we were casting, but we read as str)
-            # Since we read dtype=str, pandas will default all to TEXT which is perfect for Off-Page storage.
-            sql_dtypes = {col: TEXT() for col in df.columns}
-            sql_dtypes['code'] = VARCHAR(50)
-            
-            if is_first_chunk:
-                # 1. Initialize the target table with the exact schema from the first chunk
-                df.head(0).to_sql('products', con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
+            # Ensure all required columns exist in the chunk (fill with None if missing)
+            for col in all_required_cols:
+                if col not in df.columns:
+                    df[col] = None
+                    
+            for table_name, columns in groups.items():
+                slice_df = df[columns].copy()
+                
+                # Cast datatypes: core and allergens are TEXT, others are DOUBLE
+                if table_name in ['products_core', 'products_allergens']:
+                    sql_dtypes = {col: TEXT() for col in columns if col != 'code'}
+                    sql_dtypes['code'] = VARCHAR(50)
+                else:
+                    # Convert to numeric (double) safely
+                    for col in columns:
+                        if col != 'code':
+                            slice_df[col] = pd.to_numeric(slice_df[col], errors='coerce')
+                    sql_dtypes = {col: DOUBLE() for col in columns if col != 'code'}
+                    sql_dtypes['code'] = VARCHAR(50)
+
+                # Write to temp table
+                temp_name = f"temp_{table_name}"
+                slice_df.to_sql(temp_name, con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
                 
-                # 2. Add Primary Key immediately
+                # INSERT IGNORE into final table
                 with engine.begin() as conn:
-                    conn.execute(text("ALTER TABLE products ADD PRIMARY KEY (code);"))
-                is_first_chunk = False
+                    cols_str = ", ".join([f"`{c}`" for c in columns])
+                    conn.execute(text(f"INSERT IGNORE INTO {table_name} ({cols_str}) SELECT {cols_str} FROM {temp_name}"))
+                    conn.execute(text(f"DROP TABLE IF EXISTS {temp_name}"))
 
-            # Write chunk to a temporary table
-            df.to_sql('temp_products', con=engine, if_exists='replace', index=False, dtype=sql_dtypes)
-            
-            # Use INSERT IGNORE to append to the main table, skipping any global duplicate codes
-            with engine.begin() as connection:
-                # Ensure columns match by explicitly listing them
-                cols = ", ".join([f"`{c}`" for c in df.columns])
-                connection.execute(text(f"INSERT IGNORE INTO products ({cols}) SELECT {cols} FROM temp_products"))
-            
             total_processed += len(df)
-            print(f"   Successfully appended {total_processed} rows into unified dynamic schema...", end="\r")
+            print(f"   Successfully appended {total_processed} rows into grouped tables...", end="\r")
         except BaseException as e:
             print(f"\n   [Warning] Chunk skipped due to error: {e}")
             
-    # Cleanup temp table
-    with engine.begin() as connection:
-        connection.execute(text("DROP TABLE IF EXISTS temp_products"))
-        
     print(f"\n✅ Finished importing {filename}.")
     return True
 
-def create_indexes(engine):
-    print("\n🛠️ Creating performance indexes (FULLTEXT and Standard)...")
-    try:
-        with engine.begin() as connection:
-            # Add Fulltext Search on vital textual fields if they exist
-            try:
-                connection.execute(text("ALTER TABLE products ADD FULLTEXT idx_search (product_name, ingredients_text);"))
-                print("  - Added FULLTEXT index on product_name, ingredients_text")
-            except Exception as e:
-                print(f"  - Skipped FULLTEXT idx_search: {e}")
-                
-            try:
-                connection.execute(text("ALTER TABLE products ADD FULLTEXT idx_allergens (allergens);"))
-                print("  - Added FULLTEXT index on allergens")
-            except Exception as e:
-                print(f"  - Skipped FULLTEXT idx_allergens: {e}")
-
-            # Standard indexes for fast exact matches
-            try:
-                connection.execute(text("ALTER TABLE products ADD INDEX idx_brands (brands(50));"))
-                print("  - Added INDEX on brands")
-            except Exception as e:
-                print(f"  - Skipped INDEX idx_brands: {e}")
-                
-            try:
-                connection.execute(text("ALTER TABLE products ADD INDEX idx_generic (generic_name(50));"))
-                print("  - Added INDEX on generic_name")
-            except Exception as e:
-                print(f"  - Skipped INDEX idx_generic: {e}")
-
-        print("✅ Indexing Complete!")
-    except Exception as e:
-        print(f"❌ Indexing encountered an issue: {e}")
-
 if __name__ == "__main__":
-    print("Initiating OpenFoodFacts CSV Unified Dynamic Ingestion Process...")
+    print("Initiating OpenFoodFacts Grouped Vertical Ingestion Process...")
     engine = get_loader_engine()
     
     processed_en = ingest_file('en.openfoodfacts.org.products.csv', engine)
     processed_fr = ingest_file('fr.openfoodfacts.org.products.csv', engine)
     
     if not processed_en and not processed_fr:
-        print("\n❌ Could not find either 'en.openfoodfacts.org.products.csv' or 'fr.openfoodfacts.org.products.csv'.")
-        print("Please download them directly into the root folder and run this script again.")
+        print("\n❌ Could not find CSVs.")
     else:
-        create_indexes(engine)
         print("\n🎉 Full database reload complete! Ready for AI RAG.")

+ 94 - 11
setup_db.py

@@ -125,17 +125,100 @@ def run_db_setup():
     ) ENGINE=InnoDB;
     """)
 
-    # The products table is now dynamically generated by ingest_csv.py to support all ~200 columns.
-    
-    # Table Context Grants (PoLP)
-    # The authenticated app process can handle credentials and now read/write custom plates!
-    cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON food_db.users TO 'db_app_auth'@'%';")
-    cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON food_db.user_health_profiles TO 'db_app_auth'@'%';")
-    cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON food_db.plates TO 'db_app_auth'@'%';")
-    cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON food_db.plate_items TO 'db_app_auth'@'%';")
-    
-    # Give the app read privileges on the whole database
-    cursor.execute("GRANT SELECT ON food_db.* TO 'db_app_auth'@'%';")
+    # 4. Products Tables (Grouped Vertical Partitioning)
+    tables = [
+        "food_db.products_core",
+        "food_db.products_allergens",
+        "food_db.products_macros",
+        "food_db.products_vitamins",
+        "food_db.products_minerals"
+    ]
+    cursor.execute("DROP VIEW IF EXISTS food_db.products;")
+    for t in tables:
+        cursor.execute(f"DROP TABLE IF EXISTS {t};")
+        
+    cursor.execute("""
+    CREATE TABLE food_db.products_core (
+        code VARCHAR(50) PRIMARY KEY,
+        product_name TEXT NULL,
+        generic_name TEXT NULL,
+        brands TEXT NULL,
+        ingredients_text TEXT NULL,
+        FULLTEXT idx_search (product_name, ingredients_text),
+        INDEX idx_brands (brands(50)),
+        INDEX idx_generic (generic_name(50))
+    ) ENGINE=InnoDB;
+    """)
+
+    cursor.execute("""
+    CREATE TABLE food_db.products_allergens (
+        code VARCHAR(50) PRIMARY KEY,
+        allergens TEXT NULL,
+        FULLTEXT idx_allergens (allergens),
+        FOREIGN KEY (code) REFERENCES food_db.products_core(code) ON DELETE CASCADE
+    ) ENGINE=InnoDB;
+    """)
+
+    cursor.execute("""
+    CREATE TABLE food_db.products_macros (
+        code VARCHAR(50) PRIMARY KEY,
+        `energy-kcal_100g` DOUBLE NULL,
+        proteins_100g DOUBLE NULL,
+        fat_100g DOUBLE NULL,
+        carbohydrates_100g DOUBLE NULL,
+        sugars_100g DOUBLE NULL,
+        fiber_100g DOUBLE NULL,
+        sodium_100g DOUBLE NULL,
+        salt_100g DOUBLE NULL,
+        cholesterol_100g DOUBLE NULL,
+        FOREIGN KEY (code) REFERENCES food_db.products_core(code) ON DELETE CASCADE
+    ) ENGINE=InnoDB;
+    """)
+
+    cursor.execute("""
+    CREATE TABLE food_db.products_vitamins (
+        code VARCHAR(50) PRIMARY KEY,
+        `vitamin-a_100g` DOUBLE NULL,
+        `vitamin-b1_100g` DOUBLE NULL,
+        `vitamin-b2_100g` DOUBLE NULL,
+        `vitamin-pp_100g` DOUBLE NULL,
+        `vitamin-b6_100g` DOUBLE NULL,
+        `vitamin-b9_100g` DOUBLE NULL,
+        `vitamin-b12_100g` DOUBLE NULL,
+        `vitamin-c_100g` DOUBLE NULL,
+        `vitamin-d_100g` DOUBLE NULL,
+        `vitamin-e_100g` DOUBLE NULL,
+        `vitamin-k_100g` DOUBLE NULL,
+        FOREIGN KEY (code) REFERENCES food_db.products_core(code) ON DELETE CASCADE
+    ) ENGINE=InnoDB;
+    """)
+
+    cursor.execute("""
+    CREATE TABLE food_db.products_minerals (
+        code VARCHAR(50) PRIMARY KEY,
+        calcium_100g DOUBLE NULL,
+        iron_100g DOUBLE NULL,
+        magnesium_100g DOUBLE NULL,
+        potassium_100g DOUBLE NULL,
+        zinc_100g DOUBLE NULL,
+        FOREIGN KEY (code) REFERENCES food_db.products_core(code) ON DELETE CASCADE
+    ) ENGINE=InnoDB;
+    """)
+
+    cursor.execute("""
+    CREATE VIEW food_db.products AS 
+    SELECT 
+        c.code, c.product_name, c.generic_name, c.brands, c.ingredients_text,
+        a.allergens,
+        m.`energy-kcal_100g`, m.proteins_100g, m.fat_100g, m.carbohydrates_100g, m.sugars_100g, m.fiber_100g, m.sodium_100g, m.salt_100g, m.cholesterol_100g,
+        v.`vitamin-a_100g`, v.`vitamin-b1_100g`, v.`vitamin-b2_100g`, v.`vitamin-pp_100g`, v.`vitamin-b6_100g`, v.`vitamin-b9_100g`, v.`vitamin-b12_100g`, v.`vitamin-c_100g`, v.`vitamin-d_100g`, v.`vitamin-e_100g`, v.`vitamin-k_100g`,
+        min.calcium_100g, min.iron_100g, min.magnesium_100g, min.potassium_100g, min.zinc_100g
+    FROM food_db.products_core c
+    LEFT JOIN food_db.products_allergens a ON c.code = a.code
+    LEFT JOIN food_db.products_macros m ON c.code = m.code
+    LEFT JOIN food_db.products_vitamins v ON c.code = v.code
+    LEFT JOIN food_db.products_minerals min ON c.code = min.code;
+    """)
     
     cursor.execute("GRANT SELECT ON food_db.* TO 'db_reader'@'%';")
     cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE, DROP, CREATE, ALTER, INDEX, CREATE VIEW ON food_db.* TO 'db_loader'@'%';")