Skip to content

Pipeline ETL e Script Documentati

Deliverable D2.2: Script ETL (Python/SQL) documentati e testati

[TODO] Questo capitolo sarà completato durante la fase di sviluppo (Task 2.2, M10-M12)

5.1 Overview Pipeline ETL

Architettura Prefect Multi-Worker-Pools

Le pipeline ETL sono organizzate per worker pool in base ai requisiti computazionali:

prefect/flows/
├── istat/              # istat-pool (lightweight, 2 workers)
│   ├── Dockerfile
│   ├── requirements.txt
│   ├── popolazione_flow.py
│   └── pendolarismo_flow.py
├── pdf-extraction/     # pdf-pool (heavyweight, 1 worker)
│   ├── Dockerfile
│   ├── requirements.txt
│   └── tabacchi_adm_flow.py
└── analytics/          # analytics-pool (medium, 1 worker)
    ├── Dockerfile
    ├── requirements.txt
    └── spatial_aggregation_flow.py

Pattern Comune Pipeline

Tutte le pipeline seguono il pattern sequenziale:

  1. 01_ingestion: Download/scrape → Bronze layer
  2. 02_transform: Parse → Silver layer (EAV)
  3. 03_data_quality: Great Expectations validation
  4. 04_metadata: Update OpenMetadata catalog

5.2 Pipeline ISTAT Popolazione

Scopo

Ingestione dati popolazione residente ISTAT con granularità comunale (2010-2025).

Fonte Dati

Codice Sorgente

[TODO] Link al codice in deployment/prefect/flows/istat/popolazione_flow.py

python
# Skeleton struttura
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import pandas as pd
from pathlib import Path

@task(retries=3, retry_delay_seconds=60)
def download_popolazione(anno: int, output_path: Path) -> Path:
    """Download CSV popolazione ISTAT per anno specificato"""
    # Implementation...
    pass

@task
def load_to_silver(csv_path: Path, anno: int) -> int:
    """Carica dati in silver.comuni_attributi_eav"""
    # Implementation...
    pass

@task
def validate_data(anno: int) -> dict:
    """Valida completeness e accuracy con Great Expectations"""
    # Implementation...
    pass

@flow(name="istat-popolazione", log_prints=True)
def popolazione_flow(anno: int = 2025):
    """Flow principale ingestion popolazione ISTAT"""
    bronze_path = download_popolazione(anno, Path(f"/data/bronze/istat/{anno}"))
    records = load_to_silver(bronze_path, anno)
    validation = validate_data(anno)
    return {"records": records, "validation": validation}

Deployment

bash
# Build worker image
cd deployment/prefect/flows/istat/
docker build -t maps/worker-istat:latest .

# Deploy flow
prefect deployment build popolazione_flow.py:popolazione_flow \
  --name "ISTAT Popolazione ${ANNO}" \
  --pool istat-pool \
  --cron "0 2 1 * *"  # Ogni 1° del mese alle 2AM

prefect deployment apply popolazione_flow-deployment.yaml

5.3 Pipeline GTFS Trasporto Pubblico

[TODO] Documentazione pipeline GTFS

5.4 Pipeline PDF Extraction (Tabacchi ADM)

[TODO] Documentazione pipeline Docling

5.5 Data Quality Framework

Great Expectations Suite

[TODO] Esempio suite validazione

python
# expectations/istat_popolazione.py
import great_expectations as gx

suite = gx.core.ExpectationSuite(name="istat_popolazione")

# Completeness
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "codice_istat"}
    )
)

# Accuracy
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_match_regex",
        kwargs={
            "column": "codice_istat",
            "regex": r"^\d{6}$"  # 6 cifre
        }
    )
)

# Timeliness
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={
            "min_value": 7500,  # ~95% comuni (8000 × 0.95)
            "max_value": 8500
        }
    )
)

5.6 Monitoring e Alerting

Prefect Dashboard

Logging

Tutti i log sono centralizzati in /root/maps-docker/logs/workers/.


[WIP] Questo capitolo sarà completato con:

  • Documentazione completa di tutte le pipeline prioritarie
  • Unit test per ogni task
  • Integration test end-to-end
  • Performance benchmarks

Prossimo capitolo: Validazione Pipeline ETL