In [1]:
import pandas as pd
import sqlite3
import os
from io import StringIO
# =====================================================
# 1. INITIALIZATION & DYNAMIC TICKER DISCOVERY
# =====================================================
qb = QuantBook()
print("Scanning Object Store for available tickers...")
discovered_tickers = set()
for key in qb.ObjectStore.Keys:
if key.startswith("Data_Prices/prices_daily_") and key.endswith(".csv") and "yield" not in key:
file_name = key.split('/')[-1]
ticker = file_name.replace("prices_daily_", "").replace(".csv", "")
discovered_tickers.add(ticker)
tickers = sorted(list(discovered_tickers))
if "USTYCR" not in tickers:
tickers.append("USTYCR")
print(f"Found {len(tickers)} unique tickers: {tickers}\n")
local_db_path = "temp_master_database.db"
folder_name = "Database"
file_name = "financials_and_prices.db"
object_store_db_key = f"{folder_name}/{file_name}"
# =====================================================
# 2. CREATE LOCAL SQLITE DATABASE & HELPER
# =====================================================
conn = sqlite3.connect(local_db_path)
cursor = conn.cursor()
print(f"Starting master database compilation for {len(tickers)} tickers...\n")
# HELPER: Map Pandas data types to SQLite data types
def get_sqlite_type(dtype):
dtype_str = str(dtype).lower()
if "int" in dtype_str or "bool" in dtype_str:
return "INTEGER"
elif "float" in dtype_str:
return "REAL"
else:
return "TEXT"
# =====================================================
# 3. MASTER LOOP TO IMPORT ALL CSVS
# =====================================================
for ticker in tickers:
print(f"--- Processing {ticker} ---")
if ticker == "USTYCR":
file_to_table_map = {
f"Data_Prices/prices_daily_yield_{ticker}.csv": "prices_daily_yield",
f"Data_Prices/prices_weekly_yield_{ticker}.csv": "prices_weekly_yield",
f"Data_Prices/prices_monthly_yield_{ticker}.csv": "prices_monthly_yield",
}
else:
file_to_table_map = {
f"Data_Financials/financials_quarterly_{ticker}.csv": "financials_quarterly",
f"Data_Prices/prices_daily_{ticker}.csv": "prices_daily",
f"Data_Prices/prices_weekly_{ticker}.csv": "prices_weekly",
f"Data_Prices/prices_monthly_{ticker}.csv": "prices_monthly",
}
for file_key, table_name in file_to_table_map.items():
if qb.ObjectStore.ContainsKey(file_key):
try:
csv_string = qb.ObjectStore.Read(file_key)
df = pd.read_csv(StringIO(csv_string))
# --- DYNAMIC SCHEMA GENERATION WITH PRIMARY KEYS ---
# Check if the table already exists in the database
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")
if not cursor.fetchone():
cols = df.columns
if len(cols) >= 2:
# Extract the first two columns to use as the composite primary key
pk1, pk2 = cols[0], cols[1]
# Generate column definitions with SQLite data types
col_defs = [f'"{col}" {get_sqlite_type(df[col].dtype)}' for col in cols]
# Build and execute the SQL string with a composite primary key
create_table_sql = f'''
CREATE TABLE "{table_name}" (
{", ".join(col_defs)},
PRIMARY KEY ("{pk1}", "{pk2}")
)
'''
cursor.execute(create_table_sql)
else:
print(f"Warning: Not enough columns in {file_key} to create composite primary key.")
# Append the data. SQLite will enforce the Primary Key constraint.
df.to_sql(table_name, conn, if_exists='append', index=False)
print(f"Appended {len(df)} rows to '{table_name}'.")
except sqlite3.IntegrityError as e:
# Triggers if the script attempts to append rows with duplicate primary keys
print(f"Skipped duplicate records in {file_key} (Primary Key Constraint): {e}")
except Exception as e:
print(f"Error reading/parsing {file_key}: {e}")
else:
print(f"Warning: '{file_key}' not found. Skipping.")
conn.commit()
conn.close()
# =====================================================
# 4. EXPORT COMPILED DATABASE TO OBJECT STORE
# =====================================================
print("\nUploading compiled master database to the Object Store...")
with open(local_db_path, 'rb') as db_file:
db_bytes = db_file.read()
success = qb.ObjectStore.SaveBytes(object_store_db_key, db_bytes)
if success:
print(f"Success! Master database saved to ObjectStore under: {object_store_db_key}")
else:
print("Failed to save the database to the Object Store.")
if os.path.exists(local_db_path):
os.remove(local_db_path)