00_raw_to_bronze_duckdb_polars



Data Science Projects

Home Credit Geometric Risk

Raw to Bronze with DuckDB + Polars


Roberto SSoares - LfLngLrnng

in/roberto-dos-santos-soares
Portifólio: roberto-ssoares

" [+] Faturamento,
[-] Custo,
[+] Qualidade de vida "

"Mestre Bruno Jardim"

🎯 Objetivo

  • Converter os arquivos CSV da camada raw para Parquet na camada bronze com alta performance, garantindo auditoria, rastreabilidade e qualidade mínima dos dados.

⚙️ Ações realizadas

  • Ingestão de CSV → Parquet com DuckDB
  • Geração de auditoria de execução
  • Criação de schema registry (estrutura das tabelas)
  • Profiling automático por coluna
  • Análise de valores nulos

🧠 Justificativa técnica

  • DuckDB permite ingestão extremamente rápida e eficiente.
  • Polars oferece manipulação leve, rápida e moderna para auditoria e profiling.

✅ Resultados esperados

  • Arquivos .parquet na camada bronze
  • Relatório de auditoria
  • Catálogo de schema
  • Perfil estatístico das colunas
  • Análise de qualidade (nulls)
  • A Ingestão precisa entregar:
    • ingestão reprodutível
    • auditoria mínima
    • sanidade dos arquivos
    • logs claros
    • artefatos de controle
    • DX decente para manutenção futura

📌 Variáveis de controle.

  • Incluir:
    • run_id: identificador único da execução
    • ingestion_ts: timestamp da ingestão
    • source_file
    • target_file
    • status
    • elapsed_seconds
    • row_count
    • column_count
    • file_size_mb_csv
    • file_size_mb_parquet
    • compression_ratio_aprox
    • sanity_check_passed
    • error_message

📌 Testes de sanidade.

  • Para cada arquivo:
    • verificar se o CSV existe
    • verificar se o Parquet foi criado
    • contar linhas do CSV e do Parquet
    • comparar contagem de linhas
    • verificar se número de colunas é maior que zero
    • capturar schema inferido
    • marcar sucesso ou erro no log

📚 Instalando e Carregando os Pacotes

  • Objetivo:
    • Importar bibliotecas necessárias para manipulação de dados, construção do grafo,
    • cálculo de curvatura e modelagem preditiva.
  • Ações realizadas:
    • Importação de bibliotecas de análise, ML e grafos
    • Configuração de warnings
  • Justificativa técnica:
    • Este projeto integra processamento tabular com modelagem em grafos.
    • Por isso, precisamos combinar bibliotecas clássicas de ciência de dados
    • com bibliotecas de análise de redes.
  • Resultados esperados:
    • Ambiente pronto para carga, preparação, construção do grafo e modelagem.

✔️ 00 - Imports

import sys
#!{sys.executable} -m pip install duckdb polars
#!{sys.executable} -m pip install watermark -q -U
import warnings
warnings.filterwarnings("ignore")

from __future__ import annotations

import os
import glob
import time
import uuid
from datetime import datetime
from pathlib import Path

import duckdb
import polars as pl
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "RobertoSSoares-LfLngLrnng"
Author: RobertoSSoares-LfLngLrnng

✔️ 01 - Funções Utilitárias

  1. Garante que um diretório exista.
  2. Converte bytes para megabytes
  3. Retorna o tamanho do arquivo em MB, ou None se o arquivo não existir.
  4. Gera o nome do arquivo parquet de saída a partir do CSV de entrada.

def ensure_dir(path: str | Path) -> Path:
    """
    Garante que um diretório exista.
    """
    path = Path(path)
    path.mkdir(parents=True, exist_ok=True)
    return path


def bytes_to_mb(num_bytes: int) -> float:
    """
    Converte bytes para megabytes.
    """
    return round(num_bytes / (1024 * 1024), 4)


def safe_file_size_mb(path: str | Path) -> float | None:
    """
    Retorna o tamanho do arquivo em MB, ou None se o arquivo não existir.
    """
    path = Path(path)
    if path.exists():
        return bytes_to_mb(path.stat().st_size)
    return None


def normalize_output_name(file_path: str | Path) -> str:
    """
    Gera o nome do arquivo parquet de saída a partir do CSV de entrada.
    """
    return Path(file_path).stem + ".parquet"


def now_str() -> str:
    """
    Timestamp amigável para auditoria.
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

✔️ 01.1 - Função Principal - Ingestão RAW -> BRONZE usando DuckDB + auditoria em Polars.

🧠 Ajuste importante de conceito

  • Polars trabalha melhor com:
    • estruturas imutáveis
    • execução vetorizada
    • schemas explícitos
  • Mas aqui usamos Polars apenas para:
    • consolidar auditoria
    • persistir resultados
    • inspeção leve

# Função principal (VERSÃO POLARS)
def raw_to_bronze_duckdb(
    raw_dir: str = "../data/00-raw",
    bronze_dir: str = "../data/01-bronze",
    audit_dir: str = "../data/99-audit",
    glob_pattern: str = "*.csv",
    sample_size: int = -1,
    all_varchar: bool = False,
    ignore_errors: bool = False,
    filename: bool = True,
    union_by_name: bool = False,
    compression: str = "zstd",
    overwrite: bool = True,
    verbose: bool = True,
) -> pl.DataFrame:
    """
    Ingestão RAW -> BRONZE usando DuckDB + auditoria em Polars.
    """

    run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:8]
    ingestion_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    raw_path = Path(raw_dir)
    bronze_path = ensure_dir(bronze_dir)
    audit_path = ensure_dir(audit_dir)

    csv_files = sorted(glob.glob(str(raw_path / glob_pattern)))

    if not csv_files:
        raise FileNotFoundError(f"Nenhum CSV encontrado em {raw_dir}")

    if verbose:
        print("=" * 90)
        print("🚀 RAW → BRONZE (DuckDB + Polars)")
        print(f"Run ID: {run_id}")
        print("=" * 90)

    audit_rows = []
    start_all = time.time()

    con = duckdb.connect()

    for file_path in csv_files:
        start_file = time.time()

        source_path = Path(file_path)
        target_path = bronze_path / (source_path.stem + ".parquet")

        if verbose:
            print(f"📦 {source_path.name}")

        try:
            csv_size_mb = safe_file_size_mb(source_path)

            read_csv_relation = f"""
                read_csv_auto(
                    '{str(source_path).replace("\\", "/")}',
                    sample_size={sample_size},
                    all_varchar={str(all_varchar).lower()},
                    ignore_errors={str(ignore_errors).lower()}
                )"""

            if target_path.exists() and overwrite:
                target_path.unlink()

            # COPY → Parquet
            con.execute(f"""
                COPY (
                    SELECT *
                    FROM {read_csv_relation}
                )
                TO '{str(target_path).replace("\\", "/")}'
                (FORMAT PARQUET, COMPRESSION '{compression}');
            """)

            # Contagens
            csv_rows = con.execute(
                f"SELECT COUNT(*) FROM {read_csv_relation}"
            ).fetchone()[0]

            parquet_rows = con.execute(
                f"SELECT COUNT(*) FROM read_parquet('{str(target_path).replace("\\", "/")}')"
            ).fetchone()[0]

            column_count = con.execute(f"""
                SELECT COUNT(*) FROM (
                    DESCRIBE SELECT * FROM read_parquet('{str(target_path).replace("\\", "/")}')
                )
            """).fetchone()[0]

            parquet_size_mb = safe_file_size_mb(target_path)

            sanity = (
                target_path.exists()
                and csv_rows == parquet_rows
                and column_count > 0
            )

            compression_ratio = None
            if csv_size_mb and parquet_size_mb:
                compression_ratio = round(csv_size_mb / parquet_size_mb, 4)

            audit_rows.append({
                "run_id": run_id,
                "file": source_path.name,
                "status": "SUCCESS" if sanity else "WARNING",
                "csv_rows": csv_rows,
                "parquet_rows": parquet_rows,
                "columns": column_count,
                "csv_mb": csv_size_mb,
                "parquet_mb": parquet_size_mb,
                "compression_ratio": compression_ratio,
                "sanity": sanity,
                "elapsed": round(time.time() - start_file, 4),
            })

        except Exception as e:
            audit_rows.append({
                "run_id": run_id,
                "file": source_path.name,
                "status": "ERROR",
                "error": str(e),
                "elapsed": round(time.time() - start_file, 4),
            })

            if verbose:
                print(f"❌ {e}")

    con.close()

    # 🔥 Aqui entra o Polars
    audit_df = pl.DataFrame(audit_rows)

    # Persistência
    audit_df.write_csv(audit_path / "raw_to_bronze_audit.csv")
    audit_df.write_parquet(audit_path / "raw_to_bronze_audit.parquet")

    if verbose:
        print("=" * 90)
        print("🏁 FINALIZADO")
    
        if audit_df.height > 0 and "status" in audit_df.columns:
            print("Resumo por status:")
            print(
                audit_df
                .group_by("status")
                .len()
                .rename({"len": "qtd_arquivos"})
                .sort("status")
            )
        else:
            print("Nenhum registro de auditoria foi gerado.")
    
        print("=" * 90)

    return audit_df
# Execução
audit_df = raw_to_bronze_duckdb(
    raw_dir="../data/00-raw",
    bronze_dir="../data/01-bronze",
    audit_dir="../data/99-audit",
    glob_pattern="*.csv",
    sample_size=-1,      # inferência mais confiável
    all_varchar=False,   # mantém inferência automática
    ignore_errors=False, # mais rigoroso
    compression="zstd",
    overwrite=True,
    verbose=True,
)

audit_df
==========================================================================================
🚀 RAW → BRONZE (DuckDB + Polars)
Run ID: 20260320_221107_77ddda88
==========================================================================================
📦 POS_CASH_balance.csv
📦 application_test.csv
📦 application_train.csv
📦 bureau.csv
📦 bureau_balance.csv
📦 credit_card_balance.csv
📦 installments_payments.csv
📦 previous_application.csv
==========================================================================================
🏁 FINALIZADO
Resumo por status:
shape: (1, 2)
┌─────────┬──────────────┐
│ status  ┆ qtd_arquivos │
│ ---     ┆ ---          │
│ str     ┆ u32          │
╞═════════╪══════════════╡
│ SUCCESS ┆ 8            │
└─────────┴──────────────┘
==========================================================================================
shape: (8, 11)
run_idfilestatuscsv_rowsparquet_rowscolumnscsv_mbparquet_mbcompression_ratiosanityelapsed
strstrstri64i64i64f64f64f64boolf64
"20260320_221107_77ddda88""POS_CASH_balance.csv""SUCCESS"10001358100013588374.510984.05724.4554true32.5869
"20260320_221107_77ddda88""application_test.csv""SUCCESS"9225492254171131.8539.00493.3803true7.7143
"20260320_221107_77ddda88""application_train.csv""SUCCESS"215257215257172308.031490.82143.3916true17.1665
"20260320_221107_77ddda88""bureau.csv""SUCCESS"1716428171642817162.140624.91186.5086true15.8967
"20260320_221107_77ddda88""bureau_balance.csv""SUCCESS"27299925272999253358.193311.302331.6921true37.7715
"20260320_221107_77ddda88""credit_card_balance.csv""SUCCESS"3840312384031223404.913597.32644.1604true41.4394
"20260320_221107_77ddda88""installments_payments.csv""SUCCESS"13605401136054018689.6194237.77242.9003true66.7619
"20260320_221107_77ddda88""previous_application.csv""SUCCESS"1670214167021437386.212652.5797.3454true22.7277

✔️ 01.2 - Função Schema Registry (Lazy)

# Schema Registry (LAZY)

def build_schema_registry(bronze_dir="../data/01-bronze", audit_dir="../data/99-audit"):
    rows = []

    for file in Path(bronze_dir).glob("*.parquet"):
        schema = pl.scan_parquet(str(file)).collect_schema()

        for i, (col, dtype) in enumerate(schema.items(), start=1):
            rows.append({
                "table": file.stem,
                "column": col,
                "dtype": str(dtype),
                "order": i
            })

    df = pl.DataFrame(rows)

    df.write_parquet(Path(audit_dir) / "schema_registry.parquet")
    df.write_csv(Path(audit_dir) / "schema_registry.csv")

    return df

schema_df = build_schema_registry()
display(schema_df.head(5))
print(" ")
display(schema_df.sample(5))
shape: (5, 4)
tablecolumndtypeorder
strstrstri64
"application_test""SK_ID_CURR""Int64"1
"application_test""NAME_CONTRACT_TYPE""String"2
"application_test""CODE_GENDER""String"3
"application_test""FLAG_OWN_CAR""String"4
"application_test""FLAG_OWN_REALTY""String"5
 
shape: (5, 4)
tablecolumndtypeorder
strstrstri64
"application_test""EXT_SOURCE_2""Float64"42
"application_test""var_33""Float64"154
"credit_card_balance""AMT_CREDIT_LIMIT_ACTUAL""Int64"5
"application_train""DAYS_ID_PUBLISH""Int64"21
"application_test""ENTRANCES_AVG""Float64"50

✔️ 01.3 - Função Profiling de Colunas

# Profiling de Colunas
def profile_bronze(bronze_dir="../data/01-bronze", out_dir="../data/90-profiling"):
    rows = []

    for file in Path(bronze_dir).glob("*.parquet"):
        df = pl.read_parquet(file)

        for col in df.columns:
            s = df[col]

            rows.append({
                "table": file.stem,
                "column": col,
                "dtype": str(s.dtype),
                "rows": df.height,
                "nulls": s.null_count(),
                "null_pct": s.null_count() / df.height * 100 if df.height else 0,
                "unique": s.n_unique(),
            })

    df = pl.DataFrame(rows)
    ensure_dir(out_dir)

    df.write_parquet(Path(out_dir) / "column_profile.parquet")
    df.write_csv(Path(out_dir) / "column_profile.csv")

    return df


profile_df = profile_bronze()
display(profile_df.head(5))
print()
display(profile_df.sample(5))
shape: (5, 7)
tablecolumndtyperowsnullsnull_pctunique
strstrstri64i64f64i64
"application_test""SK_ID_CURR""Int64"9225400.092254
"application_test""NAME_CONTRACT_TYPE""String"9225400.02
"application_test""CODE_GENDER""String"9225400.03
"application_test""FLAG_OWN_CAR""String"9225400.02
"application_test""FLAG_OWN_REALTY""String"9225400.02

shape: (5, 7)
tablecolumndtyperowsnullsnull_pctunique
strstrstri64i64f64i64
"POS_CASH_balance""SK_ID_PREV""Int64"1000135800.0936325
"credit_card_balance""SK_DPD_DEF""Int64"384031200.0378
"bureau""AMT_CREDIT_SUM_OVERDUE""Float64"171642800.01616
"application_test""REGION_RATING_CLIENT_W_CITY""Int64"9225400.03
"application_test""FLAG_DOCUMENT_15""Int64"9225400.02

✔️ 2 - Null Analysis

# Null Analysis
null_df = (profile_df.sort("null_pct", descending=True))
null_df.head(10)
shape: (20, 7)
tablecolumndtyperowsnullsnull_pctunique
strstrstri64i64f64i64
"previous_application""RATE_INTEREST_PRIMARY""Float64"1670214166426399.643698149
"previous_application""RATE_INTEREST_PRIVILEGED""Float64"1670214166426399.64369826
"bureau""AMT_ANNUITY""Float64"1716428122679171.4734940322
"application_test""COMMONAREA_AVG""Float64"922546448869.902662372
"application_test""COMMONAREA_MODE""Float64"922546448869.902662340
"application_test""LIVINGAPARTMENTS_AVG""Float64"922546312068.4197981433
"application_test""LIVINGAPARTMENTS_MODE""Float64"922546312068.419798655
"application_test""LIVINGAPARTMENTS_MEDI""Float64"922546312068.419798944
"application_test""FONDKAPREMONT_MODE""String"922546311268.4111265
"application_train""FONDKAPREMONT_MODE""String"21525714718368.3754775

✔️ 3 - Testes Globais

# Testes Globais

assert audit_df.filter(pl.col("status") == "ERROR").height == 0
assert profile_df.height > 0
assert schema_df.height > 0

print("✅ Pipeline RAW → BRONZE validado com sucesso")
✅ Pipeline RAW → BRONZE validado com sucesso

✔️ O que foi construido

  • NÃO fez só ingestão.
  • Criou:
    • ✔ Camada Bronze profissional
      • rápida (DuckDB)
      • compacta (Parquet)
      • auditável
    • ✔ Observabilidade de dados
      • auditoria
      • schema registry
      • profiling
      • null analysis
    • ✔ Base perfeita para:
      • feature engineering
      • grafos
      • ML
      • explicabilidade

🎯 Próximo passo (Grafo de Similaridade + Curvatura de Ricci)

Seguimos para:

👉 Grafo de Similaridade + Curvatura de Ricci

  • Onde vamos:
    • usar application_train
    • construir KNN graph
    • calcular curvatura
    • gerar primeiras features geométricas

%reload_ext watermark 
%watermark -a "Roberto-SSoares-LfLngLrnng" -d -t -u --iversions -v -m -h
Author: Roberto-SSoares-LfLngLrnng

Last updated: 2026-03-21 16:01:18

Python implementation: CPython
Python version       : 3.12.3
IPython version      : 9.11.0

Compiler    : GCC 13.3.0
OS          : Linux
Release     : 6.6.87.2-microsoft-standard-WSL2
Machine     : x86_64
Processor   : x86_64
CPU cores   : 4
Architecture: 64bit

Hostname: PC-ROBERTO

duckdb: 1.5.0
polars: 1.39.3

FIM

#!uv pip install nbconvert -U -q
!jupyter nbconvert 03_raw_to_bronze_duckdb_polars.ipynb --to html --template _my-template-html-v07.tpl