Data Science Projects
Home Credit Geometric Risk
Raw to Bronze with DuckDB + Polars
Data Science Projects
Home Credit Geometric Risk
Raw to Bronze with DuckDB + Polars
Roberto SSoares - LfLngLrnng
in/roberto-dos-santos-soares
Portifólio: roberto-ssoares
" [+] Faturamento,
[-] Custo,
[+] Qualidade de vida "
"Mestre Bruno Jardim"
🎯 Objetivo
⚙️ Ações realizadas
🧠 Justificativa técnica
✅ Resultados esperados
.parquet na camada bronze
- A Ingestão precisa entregar:
- ingestão reprodutível
- auditoria mínima
- sanidade dos arquivos
- logs claros
- artefatos de controle
- DX decente para manutenção futura
📌 Variáveis de controle.
📌 Testes de sanidade.
- Objetivo:
- Importar bibliotecas necessárias para manipulação de dados, construção do grafo,
- cálculo de curvatura e modelagem preditiva.
- Ações realizadas:
- Importação de bibliotecas de análise, ML e grafos
- Configuração de warnings
- Justificativa técnica:
- Este projeto integra processamento tabular com modelagem em grafos.
- Por isso, precisamos combinar bibliotecas clássicas de ciência de dados
- com bibliotecas de análise de redes.
- Resultados esperados:
- Ambiente pronto para carga, preparação, construção do grafo e modelagem.
✔️ 00 - Imports
import sys
#!{sys.executable} -m pip install duckdb polars
#!{sys.executable} -m pip install watermark -q -U
import warnings
warnings.filterwarnings("ignore")
from __future__ import annotations
import os
import glob
import time
import uuid
from datetime import datetime
from pathlib import Path
import duckdb
import polars as pl
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "RobertoSSoares-LfLngLrnng"
Author: RobertoSSoares-LfLngLrnng
✔️ 01 - Funções Utilitárias
def ensure_dir(path: str | Path) -> Path:
"""
Garante que um diretório exista.
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def bytes_to_mb(num_bytes: int) -> float:
"""
Converte bytes para megabytes.
"""
return round(num_bytes / (1024 * 1024), 4)
def safe_file_size_mb(path: str | Path) -> float | None:
"""
Retorna o tamanho do arquivo em MB, ou None se o arquivo não existir.
"""
path = Path(path)
if path.exists():
return bytes_to_mb(path.stat().st_size)
return None
def normalize_output_name(file_path: str | Path) -> str:
"""
Gera o nome do arquivo parquet de saída a partir do CSV de entrada.
"""
return Path(file_path).stem + ".parquet"
def now_str() -> str:
"""
Timestamp amigável para auditoria.
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
✔️ 01.1 - Função Principal - Ingestão RAW -> BRONZE usando DuckDB + auditoria em Polars.
🧠 Ajuste importante de conceito
# Função principal (VERSÃO POLARS)
def raw_to_bronze_duckdb(
raw_dir: str = "../data/00-raw",
bronze_dir: str = "../data/01-bronze",
audit_dir: str = "../data/99-audit",
glob_pattern: str = "*.csv",
sample_size: int = -1,
all_varchar: bool = False,
ignore_errors: bool = False,
filename: bool = True,
union_by_name: bool = False,
compression: str = "zstd",
overwrite: bool = True,
verbose: bool = True,
) -> pl.DataFrame:
"""
Ingestão RAW -> BRONZE usando DuckDB + auditoria em Polars.
"""
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:8]
ingestion_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
raw_path = Path(raw_dir)
bronze_path = ensure_dir(bronze_dir)
audit_path = ensure_dir(audit_dir)
csv_files = sorted(glob.glob(str(raw_path / glob_pattern)))
if not csv_files:
raise FileNotFoundError(f"Nenhum CSV encontrado em {raw_dir}")
if verbose:
print("=" * 90)
print("🚀 RAW → BRONZE (DuckDB + Polars)")
print(f"Run ID: {run_id}")
print("=" * 90)
audit_rows = []
start_all = time.time()
con = duckdb.connect()
for file_path in csv_files:
start_file = time.time()
source_path = Path(file_path)
target_path = bronze_path / (source_path.stem + ".parquet")
if verbose:
print(f"📦 {source_path.name}")
try:
csv_size_mb = safe_file_size_mb(source_path)
read_csv_relation = f"""
read_csv_auto(
'{str(source_path).replace("\\", "/")}',
sample_size={sample_size},
all_varchar={str(all_varchar).lower()},
ignore_errors={str(ignore_errors).lower()}
)"""
if target_path.exists() and overwrite:
target_path.unlink()
# COPY → Parquet
con.execute(f"""
COPY (
SELECT *
FROM {read_csv_relation}
)
TO '{str(target_path).replace("\\", "/")}'
(FORMAT PARQUET, COMPRESSION '{compression}');
""")
# Contagens
csv_rows = con.execute(
f"SELECT COUNT(*) FROM {read_csv_relation}"
).fetchone()[0]
parquet_rows = con.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(target_path).replace("\\", "/")}')"
).fetchone()[0]
column_count = con.execute(f"""
SELECT COUNT(*) FROM (
DESCRIBE SELECT * FROM read_parquet('{str(target_path).replace("\\", "/")}')
)
""").fetchone()[0]
parquet_size_mb = safe_file_size_mb(target_path)
sanity = (
target_path.exists()
and csv_rows == parquet_rows
and column_count > 0
)
compression_ratio = None
if csv_size_mb and parquet_size_mb:
compression_ratio = round(csv_size_mb / parquet_size_mb, 4)
audit_rows.append({
"run_id": run_id,
"file": source_path.name,
"status": "SUCCESS" if sanity else "WARNING",
"csv_rows": csv_rows,
"parquet_rows": parquet_rows,
"columns": column_count,
"csv_mb": csv_size_mb,
"parquet_mb": parquet_size_mb,
"compression_ratio": compression_ratio,
"sanity": sanity,
"elapsed": round(time.time() - start_file, 4),
})
except Exception as e:
audit_rows.append({
"run_id": run_id,
"file": source_path.name,
"status": "ERROR",
"error": str(e),
"elapsed": round(time.time() - start_file, 4),
})
if verbose:
print(f"❌ {e}")
con.close()
# 🔥 Aqui entra o Polars
audit_df = pl.DataFrame(audit_rows)
# Persistência
audit_df.write_csv(audit_path / "raw_to_bronze_audit.csv")
audit_df.write_parquet(audit_path / "raw_to_bronze_audit.parquet")
if verbose:
print("=" * 90)
print("🏁 FINALIZADO")
if audit_df.height > 0 and "status" in audit_df.columns:
print("Resumo por status:")
print(
audit_df
.group_by("status")
.len()
.rename({"len": "qtd_arquivos"})
.sort("status")
)
else:
print("Nenhum registro de auditoria foi gerado.")
print("=" * 90)
return audit_df
# Execução
audit_df = raw_to_bronze_duckdb(
raw_dir="../data/00-raw",
bronze_dir="../data/01-bronze",
audit_dir="../data/99-audit",
glob_pattern="*.csv",
sample_size=-1, # inferência mais confiável
all_varchar=False, # mantém inferência automática
ignore_errors=False, # mais rigoroso
compression="zstd",
overwrite=True,
verbose=True,
)
audit_df
========================================================================================== 🚀 RAW → BRONZE (DuckDB + Polars) Run ID: 20260320_221107_77ddda88 ========================================================================================== 📦 POS_CASH_balance.csv 📦 application_test.csv 📦 application_train.csv 📦 bureau.csv 📦 bureau_balance.csv 📦 credit_card_balance.csv 📦 installments_payments.csv 📦 previous_application.csv ========================================================================================== 🏁 FINALIZADO Resumo por status: shape: (1, 2) ┌─────────┬──────────────┐ │ status ┆ qtd_arquivos │ │ --- ┆ --- │ │ str ┆ u32 │ ╞═════════╪══════════════╡ │ SUCCESS ┆ 8 │ └─────────┴──────────────┘ ==========================================================================================
| run_id | file | status | csv_rows | parquet_rows | columns | csv_mb | parquet_mb | compression_ratio | sanity | elapsed |
|---|---|---|---|---|---|---|---|---|---|---|
| str | str | str | i64 | i64 | i64 | f64 | f64 | f64 | bool | f64 |
| "20260320_221107_77ddda88" | "POS_CASH_balance.csv" | "SUCCESS" | 10001358 | 10001358 | 8 | 374.5109 | 84.0572 | 4.4554 | true | 32.5869 |
| "20260320_221107_77ddda88" | "application_test.csv" | "SUCCESS" | 92254 | 92254 | 171 | 131.85 | 39.0049 | 3.3803 | true | 7.7143 |
| "20260320_221107_77ddda88" | "application_train.csv" | "SUCCESS" | 215257 | 215257 | 172 | 308.0314 | 90.8214 | 3.3916 | true | 17.1665 |
| "20260320_221107_77ddda88" | "bureau.csv" | "SUCCESS" | 1716428 | 1716428 | 17 | 162.1406 | 24.9118 | 6.5086 | true | 15.8967 |
| "20260320_221107_77ddda88" | "bureau_balance.csv" | "SUCCESS" | 27299925 | 27299925 | 3 | 358.1933 | 11.3023 | 31.6921 | true | 37.7715 |
| "20260320_221107_77ddda88" | "credit_card_balance.csv" | "SUCCESS" | 3840312 | 3840312 | 23 | 404.9135 | 97.3264 | 4.1604 | true | 41.4394 |
| "20260320_221107_77ddda88" | "installments_payments.csv" | "SUCCESS" | 13605401 | 13605401 | 8 | 689.6194 | 237.7724 | 2.9003 | true | 66.7619 |
| "20260320_221107_77ddda88" | "previous_application.csv" | "SUCCESS" | 1670214 | 1670214 | 37 | 386.2126 | 52.579 | 7.3454 | true | 22.7277 |
✔️ 01.2 - Função Schema Registry (Lazy)
# Schema Registry (LAZY)
def build_schema_registry(bronze_dir="../data/01-bronze", audit_dir="../data/99-audit"):
rows = []
for file in Path(bronze_dir).glob("*.parquet"):
schema = pl.scan_parquet(str(file)).collect_schema()
for i, (col, dtype) in enumerate(schema.items(), start=1):
rows.append({
"table": file.stem,
"column": col,
"dtype": str(dtype),
"order": i
})
df = pl.DataFrame(rows)
df.write_parquet(Path(audit_dir) / "schema_registry.parquet")
df.write_csv(Path(audit_dir) / "schema_registry.csv")
return df
schema_df = build_schema_registry()
display(schema_df.head(5))
print(" ")
display(schema_df.sample(5))
| table | column | dtype | order |
|---|---|---|---|
| str | str | str | i64 |
| "application_test" | "SK_ID_CURR" | "Int64" | 1 |
| "application_test" | "NAME_CONTRACT_TYPE" | "String" | 2 |
| "application_test" | "CODE_GENDER" | "String" | 3 |
| "application_test" | "FLAG_OWN_CAR" | "String" | 4 |
| "application_test" | "FLAG_OWN_REALTY" | "String" | 5 |
| table | column | dtype | order |
|---|---|---|---|
| str | str | str | i64 |
| "application_test" | "EXT_SOURCE_2" | "Float64" | 42 |
| "application_test" | "var_33" | "Float64" | 154 |
| "credit_card_balance" | "AMT_CREDIT_LIMIT_ACTUAL" | "Int64" | 5 |
| "application_train" | "DAYS_ID_PUBLISH" | "Int64" | 21 |
| "application_test" | "ENTRANCES_AVG" | "Float64" | 50 |
✔️ 01.3 - Função Profiling de Colunas
# Profiling de Colunas
def profile_bronze(bronze_dir="../data/01-bronze", out_dir="../data/90-profiling"):
rows = []
for file in Path(bronze_dir).glob("*.parquet"):
df = pl.read_parquet(file)
for col in df.columns:
s = df[col]
rows.append({
"table": file.stem,
"column": col,
"dtype": str(s.dtype),
"rows": df.height,
"nulls": s.null_count(),
"null_pct": s.null_count() / df.height * 100 if df.height else 0,
"unique": s.n_unique(),
})
df = pl.DataFrame(rows)
ensure_dir(out_dir)
df.write_parquet(Path(out_dir) / "column_profile.parquet")
df.write_csv(Path(out_dir) / "column_profile.csv")
return df
profile_df = profile_bronze()
display(profile_df.head(5))
print()
display(profile_df.sample(5))
| table | column | dtype | rows | nulls | null_pct | unique |
|---|---|---|---|---|---|---|
| str | str | str | i64 | i64 | f64 | i64 |
| "application_test" | "SK_ID_CURR" | "Int64" | 92254 | 0 | 0.0 | 92254 |
| "application_test" | "NAME_CONTRACT_TYPE" | "String" | 92254 | 0 | 0.0 | 2 |
| "application_test" | "CODE_GENDER" | "String" | 92254 | 0 | 0.0 | 3 |
| "application_test" | "FLAG_OWN_CAR" | "String" | 92254 | 0 | 0.0 | 2 |
| "application_test" | "FLAG_OWN_REALTY" | "String" | 92254 | 0 | 0.0 | 2 |
| table | column | dtype | rows | nulls | null_pct | unique |
|---|---|---|---|---|---|---|
| str | str | str | i64 | i64 | f64 | i64 |
| "POS_CASH_balance" | "SK_ID_PREV" | "Int64" | 10001358 | 0 | 0.0 | 936325 |
| "credit_card_balance" | "SK_DPD_DEF" | "Int64" | 3840312 | 0 | 0.0 | 378 |
| "bureau" | "AMT_CREDIT_SUM_OVERDUE" | "Float64" | 1716428 | 0 | 0.0 | 1616 |
| "application_test" | "REGION_RATING_CLIENT_W_CITY" | "Int64" | 92254 | 0 | 0.0 | 3 |
| "application_test" | "FLAG_DOCUMENT_15" | "Int64" | 92254 | 0 | 0.0 | 2 |
✔️ 2 - Null Analysis
# Null Analysis
null_df = (profile_df.sort("null_pct", descending=True))
null_df.head(10)
| table | column | dtype | rows | nulls | null_pct | unique |
|---|---|---|---|---|---|---|
| str | str | str | i64 | i64 | f64 | i64 |
| "previous_application" | "RATE_INTEREST_PRIMARY" | "Float64" | 1670214 | 1664263 | 99.643698 | 149 |
| "previous_application" | "RATE_INTEREST_PRIVILEGED" | "Float64" | 1670214 | 1664263 | 99.643698 | 26 |
| "bureau" | "AMT_ANNUITY" | "Float64" | 1716428 | 1226791 | 71.47349 | 40322 |
| "application_test" | "COMMONAREA_AVG" | "Float64" | 92254 | 64488 | 69.90266 | 2372 |
| "application_test" | "COMMONAREA_MODE" | "Float64" | 92254 | 64488 | 69.90266 | 2340 |
| … | … | … | … | … | … | … |
| "application_test" | "LIVINGAPARTMENTS_AVG" | "Float64" | 92254 | 63120 | 68.419798 | 1433 |
| "application_test" | "LIVINGAPARTMENTS_MODE" | "Float64" | 92254 | 63120 | 68.419798 | 655 |
| "application_test" | "LIVINGAPARTMENTS_MEDI" | "Float64" | 92254 | 63120 | 68.419798 | 944 |
| "application_test" | "FONDKAPREMONT_MODE" | "String" | 92254 | 63112 | 68.411126 | 5 |
| "application_train" | "FONDKAPREMONT_MODE" | "String" | 215257 | 147183 | 68.375477 | 5 |
✔️ 3 - Testes Globais
# Testes Globais
assert audit_df.filter(pl.col("status") == "ERROR").height == 0
assert profile_df.height > 0
assert schema_df.height > 0
print("✅ Pipeline RAW → BRONZE validado com sucesso")
✅ Pipeline RAW → BRONZE validado com sucesso
✔️ O que foi construido
🎯 Próximo passo (Grafo de Similaridade + Curvatura de Ricci)
Seguimos para:
👉 Grafo de Similaridade + Curvatura de Ricci
%reload_ext watermark
%watermark -a "Roberto-SSoares-LfLngLrnng" -d -t -u --iversions -v -m -h
Author: Roberto-SSoares-LfLngLrnng Last updated: 2026-03-21 16:01:18 Python implementation: CPython Python version : 3.12.3 IPython version : 9.11.0 Compiler : GCC 13.3.0 OS : Linux Release : 6.6.87.2-microsoft-standard-WSL2 Machine : x86_64 Processor : x86_64 CPU cores : 4 Architecture: 64bit Hostname: PC-ROBERTO duckdb: 1.5.0 polars: 1.39.3
FIM
#!uv pip install nbconvert -U -q
!jupyter nbconvert 03_raw_to_bronze_duckdb_polars.ipynb --to html --template _my-template-html-v07.tpl