Pipeline ETL e Script Documentati
Deliverable D2.2: Script ETL (Python/SQL) documentati e testati
[TODO] Questo capitolo sarà completato durante la fase di sviluppo (Task 2.2, M10-M12)
5.1 Overview Pipeline ETL
Architettura Prefect Multi-Worker-Pools
Le pipeline ETL sono organizzate per worker pool in base ai requisiti computazionali:
prefect/flows/
├── istat/ # istat-pool (lightweight, 2 workers)
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── popolazione_flow.py
│ └── pendolarismo_flow.py
├── pdf-extraction/ # pdf-pool (heavyweight, 1 worker)
│ ├── Dockerfile
│ ├── requirements.txt
│ └── tabacchi_adm_flow.py
└── analytics/ # analytics-pool (medium, 1 worker)
├── Dockerfile
├── requirements.txt
└── spatial_aggregation_flow.pyPattern Comune Pipeline
Tutte le pipeline seguono il pattern sequenziale:
- 01_ingestion: Download/scrape → Bronze layer
- 02_transform: Parse → Silver layer (EAV)
- 03_data_quality: Great Expectations validation
- 04_metadata: Update OpenMetadata catalog
5.2 Pipeline ISTAT Popolazione
Scopo
Ingestione dati popolazione residente ISTAT con granularità comunale (2010-2025).
Fonte Dati
- URL: https://demo.istat.it/popres/
- Formato: CSV
- Frequenza: Annuale
- Dimensione: ~8.000 righe × 50 colonne
Codice Sorgente
[TODO] Link al codice in deployment/prefect/flows/istat/popolazione_flow.py
python
# Skeleton struttura
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import pandas as pd
from pathlib import Path
@task(retries=3, retry_delay_seconds=60)
def download_popolazione(anno: int, output_path: Path) -> Path:
"""Download CSV popolazione ISTAT per anno specificato"""
# Implementation...
pass
@task
def load_to_silver(csv_path: Path, anno: int) -> int:
"""Carica dati in silver.comuni_attributi_eav"""
# Implementation...
pass
@task
def validate_data(anno: int) -> dict:
"""Valida completeness e accuracy con Great Expectations"""
# Implementation...
pass
@flow(name="istat-popolazione", log_prints=True)
def popolazione_flow(anno: int = 2025):
"""Flow principale ingestion popolazione ISTAT"""
bronze_path = download_popolazione(anno, Path(f"/data/bronze/istat/{anno}"))
records = load_to_silver(bronze_path, anno)
validation = validate_data(anno)
return {"records": records, "validation": validation}Deployment
bash
# Build worker image
cd deployment/prefect/flows/istat/
docker build -t maps/worker-istat:latest .
# Deploy flow
prefect deployment build popolazione_flow.py:popolazione_flow \
--name "ISTAT Popolazione ${ANNO}" \
--pool istat-pool \
--cron "0 2 1 * *" # Ogni 1° del mese alle 2AM
prefect deployment apply popolazione_flow-deployment.yaml5.3 Pipeline GTFS Trasporto Pubblico
[TODO] Documentazione pipeline GTFS
5.4 Pipeline PDF Extraction (Tabacchi ADM)
[TODO] Documentazione pipeline Docling
5.5 Data Quality Framework
Great Expectations Suite
[TODO] Esempio suite validazione
python
# expectations/istat_popolazione.py
import great_expectations as gx
suite = gx.core.ExpectationSuite(name="istat_popolazione")
# Completeness
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "codice_istat"}
)
)
# Accuracy
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_match_regex",
kwargs={
"column": "codice_istat",
"regex": r"^\d{6}$" # 6 cifre
}
)
)
# Timeliness
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 7500, # ~95% comuni (8000 × 0.95)
"max_value": 8500
}
)
)5.6 Monitoring e Alerting
Prefect Dashboard
- URL: https://prefect.maps.deppsviluppo.org
- Metrics: Flow runs, task duration, failure rate
- Alerting: Email/Slack su failure
Logging
Tutti i log sono centralizzati in /root/maps-docker/logs/workers/.
[WIP] Questo capitolo sarà completato con:
- Documentazione completa di tutte le pipeline prioritarie
- Unit test per ogni task
- Integration test end-to-end
- Performance benchmarks
Prossimo capitolo: Validazione Pipeline ETL