Pular para conteúdo

Camada Bronze

Objetivo

A camada Bronze corresponde à primeira materialização tipada da arquitetura medalhão. Os ficheiros CSV provenientes da landing zone são lidos pelo Spark com schema enforcement, enriquecidos com metadados de linhagem temporal e de ficheiro de origem, e persistidos em Delta Lake no MinIO, o que habilita transações ACID, versionamento e consultas de time travel sobre o histórico de versões.

Pipeline

s3a://landing-zone/dbo_Categoria/*.csv
  Spark read (CSV + schema)
  Adiciona colunas de auditoria:
    • _bronze_loaded_at
    • _bronze_source_file
  Spark write (Delta Lake)
s3a://bronze/dbo_Categoria/
    ├── _delta_log/
    │   └── 00000000000000000000.json
    └── part-00000-*.snappy.parquet

Tabelas processadas

Tabela Schema Linhas esperadas
dbo_Categoria id_categoria, nome, descricao 5
dbo_Autor id_autor, nome, nacionalidade, data_nascimento 10
dbo_Livro id_livro, titulo, isbn, ano_publicacao, id_categoria, id_autor 20
dbo_Membro id_membro, nome, email, telefone, data_cadastro 15
dbo_Emprestimo id_emprestimo, id_livro, id_membro, datas, status 30
dbo_Multa id_multa, id_emprestimo, valor, pago, data_geracao 8

Colunas de auditoria

Cada registro na camada Bronze recebe duas colunas adicionais:

Coluna Tipo Descrição
_bronze_loaded_at String (ISO 8601) Timestamp de quando o registro foi ingerido na Bronze
_bronze_source_file String Caminho S3 do arquivo CSV de origem

Como executar

# Certifique-se de que os containers estão no ar
docker compose up -d

# Abra o JupyterLab
uv run jupyter lab notebook/

# Execute o notebook 02
# notebook/02_landing_to_bronze_delta.ipynb

Pré-requisito

Os notebooks 00_setup_sqlserver.ipynb e 01_extracao_sqlserver_landing_zone.ipynb devem ter sido executados com sucesso antes deste.

O bucket bronze no MinIO deve existir antes da gravação Delta: o docker compose (serviço minio-init) e o próprio notebook 02 criam-no idempotentemente (API S3 com boto3). Consulte também Pré-requisitos e configuração.


DML na Bronze — exemplos do dml_bronze.ipynb

Após a carga Delta no MinIO, o notebook dml_bronze.ipynb demonstra mutações sobre o caminho s3a://bronze/dbo_Categoria/ e a leitura do registo transacional do Delta. Os excertos abaixo reproduzem a lógica principal; o caminho e a variável novo_id devem estar coerentes com a execução sequencial das células do notebook.

INSERT (registo sintético de validação)

import pyspark.sql.functions as F

BRONZE_TABLE_PATH = "s3a://bronze/dbo_Categoria/"
df_atual = spark.read.format("delta").load(BRONZE_TABLE_PATH)
proximo_id = df_atual.select(F.max(F.col("id_categoria")).alias("m")).first()["m"]
base_id = int(proximo_id) if proximo_id is not None else 0
novo_id = base_id + 1

linha_nova = spark.range(1).select(
    F.lit(novo_id).cast("int").alias("id_categoria"),
    F.lit("Categoria fictícia (INSERT DML)").alias("nome"),
    F.lit("Registro sintético para validar escrita ACID na Bronze.").alias("descricao"),
    F.date_format(
        F.current_timestamp(), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
    ).alias("_bronze_loaded_at"),
    F.lit("dml_bronze.ipynb#INSERT").alias("_bronze_source_file"),
)

linha_nova.write.format("delta").mode("append").save(BRONZE_TABLE_PATH)

UPDATE (correção pontual, id_categoria = 1)

import pyspark.sql.functions as F
from delta.tables import DeltaTable

BRONZE_TABLE_PATH = "s3a://bronze/dbo_Categoria/"
tabela_delta = DeltaTable.forPath(spark, BRONZE_TABLE_PATH)

tabela_delta.update(
    condition="id_categoria = 1",
    set={
        "descricao": F.lit(
            "Descrição revisada na Bronze (UPDATE via DeltaTable)."
        )
    },
)

spark.read.format("delta").load(BRONZE_TABLE_PATH).filter(
    F.col("id_categoria") == F.lit(1)
).select("id_categoria", "nome", "descricao").show(truncate=False)

DELETE (expurgo do identificador sintético)

import pyspark.sql.functions as F
from delta.tables import DeltaTable

BRONZE_TABLE_PATH = "s3a://bronze/dbo_Categoria/"
tabela_delta = DeltaTable.forPath(spark, BRONZE_TABLE_PATH)
tabela_delta.delete(F.col("id_categoria") == F.lit(novo_id))

A variável novo_id corresponde ao identificador criado no bloco INSERT do mesmo fluxo de execução.

Auditoria — history()

import pyspark.sql.functions as F
from delta.tables import DeltaTable

tabela_delta = DeltaTable.forPath(spark, "s3a://bronze/dbo_Categoria/")
tabela_delta.history().select(
    "version",
    "timestamp",
    "operation",
    "operationParameters",
).orderBy(F.col("version").desc()).show(20, truncate=80)

Para a distinção entre tabelas geridas e externas no Spark, e para o efeito de DROP TABLE sobre dados em s3a://, consultar a página Tabelas gerenciadas vs não gerenciadas.