|
|
@@ -46,11 +46,10 @@ def ingest_file(filename, engine):
|
|
|
cols = list(df.columns)
|
|
|
if 'code' in cols: cols.remove('code')
|
|
|
|
|
|
- chunk_size = 50
|
|
|
- chunks = [cols[i:i + chunk_size] for i in range(0, len(cols), chunk_size)]
|
|
|
+ p_chunk_size = 8 # Safe size for TEXT columns
|
|
|
+ chunks = [cols[i:i + p_chunk_size] for i in range(0, len(cols), p_chunk_size)]
|
|
|
|
|
|
for i, col_chunk in enumerate(chunks):
|
|
|
- # Ensure 'code' maps across every single table
|
|
|
table_name = f'products_{i+1}'
|
|
|
df_slice = df[['code'] + col_chunk].copy()
|
|
|
df_slice.to_sql(table_name, con=engine, if_exists='append', index=False)
|
|
|
@@ -61,37 +60,45 @@ def ingest_file(filename, engine):
|
|
|
if "Duplicate entry" in str(e):
|
|
|
pass
|
|
|
else:
|
|
|
- print(f"\n [Warning] Chunk skipped due to internal structural error: {e}")
|
|
|
+ print(f"\n [Warning] Chunk skipped due to error: {e}")
|
|
|
|
|
|
print(f"\n✅ Finished importing {filename}.")
|
|
|
return True
|
|
|
|
|
|
def create_indexes(engine):
|
|
|
- print("\n🛠️ Creating performance indexes on newly generated table...")
|
|
|
- # B-TREE and FULLTEXT INDEXES created post-ingestion for extreme speed
|
|
|
+ # Determine how many tables were actually created
|
|
|
+ num_tables = 0
|
|
|
+ with engine.connect() as conn:
|
|
|
+ res = conn.execute(text("SHOW TABLES LIKE 'products_%'"))
|
|
|
+ num_tables = len(res.fetchall())
|
|
|
+
|
|
|
+ print(f"\n🛠️ Creating performance indexes on {num_tables} partition tables...")
|
|
|
try:
|
|
|
with engine.begin() as connection:
|
|
|
- print(" Building Core Architecture on Partitions...")
|
|
|
- # Enforce Primary Keys on the first 4 partitions
|
|
|
- for i in range(1, 5):
|
|
|
+ # Enforce Primary Keys on ALL partitions
|
|
|
+ for i in range(1, num_tables + 1):
|
|
|
try:
|
|
|
connection.execute(text(f"ALTER TABLE products_{i} MODIFY code VARCHAR(50);"))
|
|
|
connection.execute(text(f"ALTER TABLE products_{i} ADD PRIMARY KEY (code);"))
|
|
|
except: pass
|
|
|
|
|
|
- print(" Building Dynamic MySQL View...")
|
|
|
- # We build a massive Join View so the app doesn't need to know about the segments
|
|
|
+ print(" Building Global MySQL VIEW...")
|
|
|
+ view_sql = f"CREATE VIEW products AS SELECT p1.* "
|
|
|
+ joins = []
|
|
|
+ for i in range(2, num_tables + 1):
|
|
|
+ # Get columns for this table except 'code'
|
|
|
+ cols_res = connection.execute(text(f"SHOW COLUMNS FROM products_{i}"))
|
|
|
+ table_cols = [c[0] for c in cols_res.fetchall() if c[0] != 'code']
|
|
|
+ if table_cols:
|
|
|
+ view_sql += ", " + ", ".join([f"p{i}.`{c}`" for c in table_cols])
|
|
|
+ joins.append(f"LEFT JOIN products_{i} p{i} ON p1.code = p{i}.code")
|
|
|
+
|
|
|
+ view_sql += " FROM products_1 p1 " + " ".join(joins)
|
|
|
+
|
|
|
try:
|
|
|
- connection.execute(text("""
|
|
|
- CREATE VIEW products AS
|
|
|
- SELECT p1.*,
|
|
|
- p2.energy_100g, p2.`energy-kcal_100g`, p2.proteins_100g, p2.fat_100g, p2.carbohydrates_100g, p2.sugars_100g, p2.salt_100g, p2.sodium_100g, p2.fiber_100g,
|
|
|
- p3.iron_100g, p3.calcium_100g, p3.`vitamin-c_100g`, p3.`vitamin-d_100g`
|
|
|
- FROM products_1 p1
|
|
|
- LEFT JOIN products_2 p2 ON p1.code = p2.code
|
|
|
- LEFT JOIN products_3 p3 ON p1.code = p3.code
|
|
|
- """))
|
|
|
- except: pass
|
|
|
+ connection.execute(text(view_sql))
|
|
|
+ except Exception as ev:
|
|
|
+ print(f" Warning: View creation failed: {ev}")
|
|
|
print("✅ Indexing Complete!")
|
|
|
except Exception as e:
|
|
|
print(f"❌ Indexing encountered an issue: {e}")
|