๐ฎ๐น Italiano
๐ฎ๐น Italiano
Appearance
๐ฎ๐น Italiano
๐ฎ๐น Italiano
Appearance
Deliverable D2.2 โ Appendice: Guida operativa per la costruzione e la manutenzione delle pipeline ETL nel Data Lake MAPS.
Questa guida รจ rivolta agli sviluppatori che entrano nel progetto e devono costruire una nuova pipeline dati o manutenere una esistente. Descrive il flusso di lavoro standard, le convenzioni e i pattern adottati in tutte le pipeline dell'infrastruttura MAPS. L'implementazione di riferimento รจ la pipeline Agenzia Dogane e Monopoli โ tabaccherie; il codice sorgente completo รจ disponibile in flows/agenzie-dogane/tabacchi/ nel repository.
Per avviare l'ambiente di sviluppo locale sono necessari Python 3.11, Docker e Docker Compose. I passi seguenti configurano il database, il server Prefect e l'interprete Python in circa dieci minuti.
git clone https://gitlab.openpolis.io/openpolis/gst/gst-maps-pipelines.git
cd gst-maps-pipelinescp .env.example .envI valori predefiniti in .env.example funzionano senza modifiche per lo sviluppo locale. L'unica variabile che vale la pena verificare รจ BRONZE_BASE_PATH:
# .env.example โ commentata, non necessaria in sviluppo locale
# BRONZE_BASE_PATH=/data/bronzeIn sviluppo locale i flow calcolano automaticamente il path Bronze come {repo_root}/data/bronze (derivato dalla posizione del file). In produzione, dentro il container prefect-worker, impostare BRONZE_BASE_PATH=/data/bronze โ il volume Docker รจ giร montato in quella posizione.
I flow caricano .env automaticamente all'avvio tramite python-dotenv (incluso in requirements-dev.txt): non รจ necessario esportare le variabili manualmente nella shell prima di eseguire uno script.
docker compose -f docker-compose.local.yml up -dAvvia PostgreSQL 17 + PostGIS 3.5, il server Prefect e OpenMetadata (con il suo database dedicato ed Elasticsearch). Lo schema del database MAPS (schemi bronze, silver, gold, tabelle e indici) viene creato automaticamente al primo avvio tramite postgres/init-scripts/01-init-schemas.sql. Verificare che i servizi siano attivi:
docker compose -f docker-compose.local.yml psSe si modifica docker-compose.local.yml o .env dopo che i container sono giร in esecuzione, docker compose restart non rilegge le variabili d'ambiente. Usare invece:
docker compose -f docker-compose.local.yml up -d --force-recreatepython3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txtQuesto installa Prefect, psycopg2, requests, BeautifulSoup e python-dotenv. Il virtualenv va attivato ogni volta che si apre una nuova sessione di terminale (source .venv/bin/activate).
export PREFECT_API_URL=http://localhost:4200/api
prefect work-pool create default-pool --type processIl work pool default-pool รจ il pool di esecuzione referenziato nel prefect.yaml di ogni pipeline.
admin@open-metadata.org / admin)psql -h localhost -U maps -d maps_db (password: maps_dev)Verificare che gli schemi e le tabelle siano stati creati correttamente:
-- schemi attesi: bronze, silver, gold
\dn
-- tabelle attese: bronze.ingestion_log, silver.territory_types,
-- silver.territories, silver.territory_containments,
-- silver.territory_identifiers, silver.territory_names,
-- silver.territory_attributes,
-- silver.service_types, silver.services, silver.service_identifiers,
-- silver.service_attributes, silver.service_categories
\dt bronze.*
\dt silver.*L'ambiente รจ pronto. Il layer Bronze รจ un volume Docker (bronze-data) montato in /data all'interno dei container; i file grezzi scaricati dai flow saranno visibili in quel volume.
OpenMetadata impiega alcuni minuti per completare la migrazione del database al primo avvio; attendere che il container maps-openmetadata riporti lo stato healthy prima di aprire l'interfaccia.
Prefect segue un'architettura client/server. Il server (container maps-prefect-server) tiene traccia delle esecuzioni, conserva i log e serve la UI su http://localhost:4200. Il worker (container maps-prefect-worker) รจ il processo che esegue materialmente i flow: interroga il server in cerca di lavoro assegnato al suo pool (default-pool) e avvia ogni flow in un subprocess.
Lo sviluppatore interagisce con il server tramite il client Prefect โ la CLI e l'SDK Python โ la cui destinazione รจ configurata dalla variabile d'ambiente PREFECT_API_URL. Durante lo sviluppo locale il client punta al server nel container; in produzione punterร al server remoto, senza che il codice dei flow cambi.
Un deployment associa un flow a un pool di worker e ne fissa i parametri di esecuzione (schedule, parametri di default). Una volta registrato, il flow puรฒ essere avviato dalla UI o da CLI senza che lo sviluppatore debba avere il processo attivo localmente.
sequenceDiagram
participant Dev as Sviluppatore
participant Server as Prefect Server
participant Worker as Worker (default-pool)
Dev->>Server: registra deployment
Note over Server: salva configurazione flow,
schedule, parametri
Dev->>Server: avvia esecuzione (UI o CLI)
Server->>Worker: invia work item
Worker->>Worker: esegue flow in subprocess
Worker-->>Server: aggiorna stato + log
Ogni pipeline MAPS segue il pattern Medallion: i file grezzi vengono scaricati nel Bronze layer, trasformati e validati nel Silver layer, infine aggregati nel Gold layer. Le pipeline sono orchestrate da Prefect e seguono una struttura standard a quattro flow.
Fonte esterna โ Flow 1: Ingestion โ Bronze (filesystem)
โ Flow 2: Transform โ Silver (PostgreSQL)
โ Flow 3: Quality โ Report di validazione
โ Flow 4: Metadata โ Catalogo OpenMetadataOgni flow รจ indipendente e puรฒ essere rieseguito senza dover rieseguire i flow precedenti. Questo รจ intenzionale: se cambia la logica di trasformazione, รจ sufficiente rieseguire il Flow 2 senza dover ri-scaricare i dati dalla fonte.
Ogni pipeline risiede in una sottodirectory dedicata sotto flows/:
flows/
โโโ utils/ # Moduli condivisi tra tutte le pipeline
โ โโโ bronze_writer.py # save_to_bronze(), log_ingestion()
โ โโโ rate_limiter.py # rate_limited_request()
โ โโโ html_parser.py # parse_table(), extract_form_data()
โ โโโ situas_client.py # SituasClient โ wrapper API ISTAT SITUAS
โ โโโ territory_resolver.py # build_comune_lookup_from_conn(), resolve_comune()
โ โโโ service_writer.py # upsert_service(), upsert_service_identifier(), ...
โโโ {ente}/{dataset}/
โโโ 01_ingestion_flow.py # Flow 1: download โ Bronze
โโโ 02_transform_flow.py # Flow 2: Bronze โ Silver
โโโ 03_data_quality_flow.py # Flow 3: validazione
โโโ 04_metadata_flow.py # Flow 4: catalogo OpenMetadata
โโโ prefect.yaml # Definizioni dei deployment (tutti e 4 i flow)
โโโ requirements.txt # Dipendenze Python
โโโ README.md # Documentazione della pipeline
โโโ docs/
โ โโโ DEPLOY_GUIDE.md # Istruzioni di deployment passo per passo
โ โโโ KNOWN_ISSUES.md # Storico dei problemi e relative soluzioni
โโโ tests/
โโโ test_flow.py # Test unitariConvenzioni di denominazione:
agenzie-dogane/tabacchi/)01_, 02_, 03_, 04_La directory flows/istat/ contiene le pipeline fondazionali che popolano il modello territoriale Silver. Tutte le altre pipeline dati dipendono dalla presenza di questi dati.
| Pipeline | Directory | Fonte | Copertura |
|---|---|---|---|
| Province, regioni, ripartizioni | flows/istat/province-regioni/ | SITUAS pfun=64/68/71 + variazioni pfun=106โ108, 112โ114 | ~107 province, 20 regioni, 5 ripartizioni per anno (2000โoggi) |
| Comuni | flows/istat/comuni/ | SITUAS pfun=61 + variazioni pfun=129 | ~7.900 comuni per anno (2000โoggi) |
| Variazioni amministrative | flows/istat/variazioni-amministrative/ | SITUAS pfun=106โ108, 112โ114, 98, 104, 105, 129 | Solo ingestione โ scarica la storia completa dal 1861 nel Bronze |
| Territory corrections | flows/istat/territory-corrections/ | Dati statici โ nomi alias ADM e correzioni di qualitร | Flow idempotente; va eseguito dopo che i comuni sono popolati |
L'ordine di esecuzione รจ importante. Il processamento delle variazioni รจ integrato nei flow di trasformazione di province-regioni e comuni. I dati Bronze delle variazioni devono essere ingeriti prima. Il flow territory-corrections va eseguito per ultimo, dopo che i comuni sono popolati.
Passo 1 โ Ingestione (qualsiasi ordine, eseguibili in parallelo):
istat-province-regioni โ Flow 1 (ingestion)
istat-comuni โ Flow 1 (ingestion)
istat-variazioni โ Flow 1 (ingestion) โ deve completarsi prima del Passo 2
Passo 2 โ Trasformazione (ordine vincolato):
istat-province-regioni โ Flow 2 (transform) โ legge bronze province + bronze variazioni
istat-comuni โ Flow 2 (transform) โ legge bronze comuni + bronze variazioni + silver province
Passo 3 โ Correzioni (dopo che i comuni sono popolati):
istat-territory-corrections โ Flow 1 (corrections)Ogni file di flow accetta --anni come argomento da riga di comando. I flow possono essere eseguiti direttamente via Python (con il virtualenv attivo) o via Docker (raccomandato):
# Opzione A: Python direttamente (virtualenv attivo)
# Passo 1: Ingestione (qualsiasi ordine)
python flows/istat/variazioni-amministrative/01_ingestion_flow.py
python flows/istat/province-regioni/01_ingestion_flow.py
python flows/istat/comuni/01_ingestion_flow.py
# Passo 2: Trasformazione (ordine vincolato โ bronze variazioni deve esistere prima)
python flows/istat/province-regioni/02_transform_flow.py
python flows/istat/comuni/02_transform_flow.py
# Passo 3: Correzioni (dopo che i comuni sono popolati)
python flows/istat/territory-corrections/01_corrections_flow.py
# Solo anni specifici
python flows/istat/province-regioni/01_ingestion_flow.py --anni 2024 2025 2026
# Opzione B: Docker (raccomandato)
# Passo 1: Ingestione
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/variazioni-amministrative && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/province-regioni && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/comuni && python 01_ingestion_flow.py"
# Passo 2: Trasformazione (province-regioni prima, poi comuni)
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/province-regioni && python 02_transform_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/comuni && python 02_transform_flow.py"
# Passo 3: Correzioni
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/territory-corrections && python 01_corrections_flow.py"Tutte le pipeline ISTAT sono registrate sul server Prefect senza schedule automatico; le esecuzioni vengono avviate manualmente. Per registrare o ri-registrare dopo una modifica al prefect.yaml:
docker exec maps-prefect-worker bash -c "cd /flows/istat/province-regioni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/comuni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/variazioni-amministrative && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/territory-corrections && prefect deploy --all"Per avviare una run via CLI (rispettare l'ordine di esecuzione):
export PREFECT_API_URL=http://localhost:4200/api
# Passo 1: Ingestione (qualsiasi ordine)
prefect deployment run 'istat-variazioni-ingestion/istat-variazioni-01-ingestion'
prefect deployment run 'istat-province-regioni-ingestion/istat-pr-01-ingestion' \
--param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-ingestion/istat-comuni-01-ingestion' \
--param anni='[2024,2025,2026]'
# Passo 2: Trasformazione (attendere il completamento dell'ingestione; province-regioni prima dei comuni)
prefect deployment run 'istat-province-regioni-transform/istat-pr-02-transform' \
--param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-transform/istat-comuni-02-transform' \
--param anni='[2024,2025,2026]'
# Passo 3: Correzioni (dopo il completamento della trasformazione dei comuni)
prefect deployment run 'istat-territory-corrections/istat-territory-corrections-01'Tutti i flow ISTAT sono idempotenti: rieseguire una trasformazione su un Bronze invariato produce un output Silver identico.
Il Silver layer adotta un modello a identitร surrogata stabile per i territori. Ogni territorio ha un id interno che non cambia mai; i codici ISTAT sono memorizzati come identificatori in silver.territory_identifiers. Questo elimina la duplicazione dei record quando i comuni cambiano provincia โ il comune rimane una sola riga, con piรน periodi di codice ISTAT.
| Tabella | Scopo |
|---|---|
silver.territory_types | Registro dei tipi di entitร โ pre-popolato con 9 tipi |
silver.territories | Registro principale: id surrogato stabile, type_code, label, date di ciclo di vita (valid_from/valid_to) |
silver.territory_identifiers | Identificatori temporali: codici ISTAT (scheme='istat'), codici catastali, codici fiscali, codici UTS |
silver.territory_names | Nomi temporali con supporto multilingue (italiano e bilinguismi/locali) |
silver.territory_containments | Contenimento gerarchico temporale (comune โ provincia, con date precise per i cambi di provincia) |
silver.territory_relationships | Legami predecessore/successore da eventi variazioni (ES, CS, RN, AQES, CECS) |
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territory_types โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ PK code VARCHAR(30) โ
โ label โ
โ hierarchy_level โ
โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ
โ 1
โ (type_code)
โ N
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territories โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ PK id SERIAL โ
โ FK type_code label subtype โ
โ valid_from DATE valid_to DATE end_reason TEXT โ
โโโโฌโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ 1:N โ 1:N โ 1:N (member + container)
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territory_ โ โ territory_ โ โ territory_containments โ
โ identifiers โ โ names โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโค โ FK member_id โ territories โ
โFK territory_ โ โFK territory_ โ โ FK container_id โ territories โ
โ id โ โ id โ โ valid_from DATE โ
โ scheme โ โ name โ โ valid_to DATE โ
โ identifier โ โ language โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ valid_from โ โ name_type โ
โ valid_to โ โ valid_from โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ fonte โ โ valid_to โ โ territory_relationships โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ FK source_id โ territories โ
โ FK dest_id โ territories โ
โ classification TEXT โ
โ valid_from DATE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโI territori vengono risolti per codice ISTAT tramite un join su territory_identifiers:
-- Trovare il territorio per il codice ISTAT '092001'
SELECT t.id, t.label FROM silver.territories t
JOIN silver.territory_identifiers ti ON ti.territory_id = t.id
WHERE t.type_code = 'comune' AND ti.scheme = 'istat' AND ti.identifier = '092001';
-- In quale provincia si trovava Arbus il 1ยฐ gennaio 2015?
SELECT p.label FROM silver.territory_containments tc
JOIN silver.territories p ON p.id = tc.container_id AND p.type_code = 'provincia'
JOIN silver.territories t ON t.id = tc.member_id
WHERE t.type_code = 'comune' AND t.label = 'Arbus'
AND (tc.valid_from IS NULL OR tc.valid_from <= '2015-01-01')
AND (tc.valid_to IS NULL OR tc.valid_to > '2015-01-01');valid_from = NULL indica un inizio di validitร sconosciuto (precedente ai dati disponibili). valid_to = NULL indica che il territorio รจ ancora attivo.
Tutte le pipeline scrivono in silver.territory_attributes, la tabella EAV generalizzata che supporta qualsiasi tipo di entitร territoriale.
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value, data_type, source, valid_from, valid_to)
VALUES
(42, 'comune', 'popolazione', '2635', 'integer', 'ISTAT', '2024-01-01', NULL),
(107, 'provincia', 'n_tabaccherie', '245', 'integer', 'ADM', '2024-01-01', '2025-12-31'),
(8, 'sll', 'addetti', '15200', 'integer', 'ISTAT', '2021-01-01', NULL);| Campo | Descrizione | Esempi |
|---|---|---|
territory_id | FK verso silver.territories(id) โ surrogato stabile, sopravvive ai cambi di codice ISTAT | risolto tramite territory_resolver |
type_code | Tipo di entitร (denormalizzato da territories) | 'comune', 'provincia', 'regione', 'sll', 'slo' |
attribute | Nome dell'attributo | 'popolazione', 'n_tabaccherie_adm' |
value | Valore memorizzato come testo | '2635' |
data_type | Suggerimento per il cast del tipo | 'integer', 'float', 'string', 'boolean' |
source | Origine del dato | 'ISTAT', 'ADM', 'MinSalute' |
valid_from | Inizio del periodo di validitร | '2024-01-01' |
valid_to | Fine del periodo di validitร โ NULL indica il valore corrente | NULL |
Il pattern ON CONFLICT ... DO UPDATE garantisce la riesecuzione idempotente:
execute_values(
cur,
"""
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value,
data_type, source, valid_from, valid_to)
VALUES %s
ON CONFLICT (territory_id, attribute, valid_from)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""",
eav_records
)silver.territory_attributes memorizza una riga per ogni (territorio, attributo, periodo): รจ ottimizzata per statistiche territoriali aggregate come conteggi di popolazione o numero di farmacie per comune. Non รจ progettata per memorizzare record individuali di punti di interesse.
Diverse fonti pubblicano elenchi di servizi locali individuali (farmacie, ospedali, scuole, tabaccherie e altri). Conservare i record individuali nel Silver offre vantaggi concreti rispetto alla sola scrittura di conteggi aggregati: gli stessi dati grezzi possono essere aggregati per comune, provincia, ASL o SLL senza dover re-ingerire dalla fonte; i record provenienti da piรน dataset possono essere deduplicati tramite service_identifiers usando codici esterni come codice fiscale o codice HSP; attributi specifici del servizio possono essere memorizzati senza modifiche allo schema.
| Tabella | Scopo |
|---|---|
silver.service_types | Registro dei tipi di servizio โ pre-popolato con 5 tipi (farmacia, ospedale, scuola, biblioteca, tabaccheria) |
silver.services | Una riga per ogni servizio individuale, con territory_id risolto, date di ciclo di vita e un vincolo di unicitร per upsert idempotenti |
silver.service_identifiers | Codici esterni per servizio (codice_fiscale, codice_ministeriale, codice_hsp, ...) โ stesso identificatore significa stesso servizio logico tra dataset diversi |
silver.service_attributes | EAV per metriche specifiche del servizio (posti_letto, num_studenti, ...) โ rispecchia territory_attributes ma legato a una riga di servizio |
silver.service_categories | Tag tipizzati multivalore (tipo_ospedale: DEA II livello, livello_scuola: primaria, ...) |
La scrittura dei servizi da una pipeline avviene tramite gli helper in utils/service_writer.py:
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
from utils.service_writer import (
upsert_service, upsert_service_identifier,
upsert_service_attribute, upsert_service_category,
)
conn = _get_db_conn()
lookup = build_comune_lookup_from_conn(conn)
with conn.cursor() as cur:
for record in source_records:
# 1. Risolvere il territorio
match = resolve_comune(lookup, sigla=record["provincia"], nome=record["comune"])
territory_id = match[0] if match else None
# 2. Upsert della riga servizio
service_id = upsert_service(
cur,
type_code="farmacia",
name=record["denominazione"],
address=record["indirizzo"],
territory_id=territory_id,
source="AIFA",
valid_from="2024-01-01",
)
# 3. Allegare identificatore esterno (per deduplicazione cross-source)
upsert_service_identifier(cur, service_id=service_id,
scheme="codice_fiscale", identifier=record["cf"])
# 4. Memorizzare metrica specifica del servizio
upsert_service_attribute(cur, service_id=service_id,
attribute="tipo_farmacia", value=record["tipo"],
data_type="text", source="AIFA", valid_from="2024-01-01")
# 5. Aggiungere categoria
upsert_service_category(cur, service_id=service_id,
scheme="tipo_farmacia", value=record["tipo"])
conn.commit()
conn.close()Una volta che i record individuali sono nel Silver, i conteggi per territorio sono una semplice query GROUP BY che puรฒ alimentare silver.territory_attributes come statistiche aggregate.
La pipeline Agenzia Dogane e Monopoli โ tabaccherie รจ la pipeline di riferimento perchรฉ รจ la piรน complessa tra quelle sviluppate: richiede scraping HTML di un portale JSF con paginazione Ajax, gestione della sessione, rate limiting e normalizzazione dei codici territoriali. Una pipeline che scarica direttamente un file CSV o XLSX segue lo stesso schema ma con un Flow 1 molto piรน semplice.
Prima di registrare un deployment su Prefect, ogni flow puรฒ essere eseguito direttamente come script Python. Questo รจ il modo normale di lavorare: scrivere il flow, eseguirlo, correggere gli errori, rieseguirlo โ senza toccare il server.
Ogni file di flow deve contenere un blocco if __name__ == "__main__" con argparse in modo che i parametri possano essere passati da riga di comando senza modificare il sorgente:
# 01_ingestion_flow.py
from prefect import flow
@flow
def ingestion_flow(anno: int = 2025):
...
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--anno", type=int, default=2025)
args = parser.parse_args()
ingestion_flow(anno=args.anno)Usare sempre lo stesso metodo di esecuzione all'interno di una run di pipeline. Python scrive i file Bronze in {repo_root}/data/bronze/ sull'host; Docker scrive in /data/bronze/ dentro il container. Se si ingerisce con Python e si trasforma con Docker (o viceversa), il flow di trasformazione non troverร i file Bronze. In docker-compose.local.yml la directory Bronze รจ montata in bind da ./data, quindi entrambi i metodi condividono lo stesso path โ ma solo se BRONZE_BASE_PATH รจ impostato in modo coerente (v. ยง1.3).
Opzione A โ Python direttamente (il virtualenv deve essere attivo):
source .venv/bin/activate
cd flows/agenzie-dogane/tabacchi/
python 01_ingestion_flow.py # usa i parametri predefiniti
python 01_ingestion_flow.py --anno 2024 # override a runtimeOpzione B โ Docker (raccomandato โ replica esattamente l'ambiente di produzione):
docker exec maps-prefect-worker bash -c \
"cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py --anno 2024"Se PREFECT_API_URL รจ impostata e il server รจ in esecuzione, la run appare nella UI con log e stato. Se il server non รจ disponibile, il flow gira comunque come script Python normale.
L'iterazione tipica รจ:
Ogni flow puรฒ essere sviluppato e testato indipendentemente dagli altri. Il Flow 2 puรฒ essere testato leggendo un file Bronze giร presente su disco, senza che il Flow 1 sia terminato.
Ogni file di flow inizia con la stessa intestazione in quattro parti. Copiarla verbatim e adattare l'indice parents[N] e la costante FONTE.
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[3] / ".env") # repo root
except ImportError:
pass
import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task, get_run_logger
sys.path.insert(0, str(Path(__file__).resolve().parents[2])) # aggiunge flows/ al path
from utils.bronze_writer import save_to_bronze, log_ingestion # e altri utils se necessario
FONTE = "nome-ente" # corrisponde alla directory Bronze
_REPO_ROOT = Path(__file__).resolve().parents[3]
BRONZE_BASE = os.getenv("BRONZE_BASE_PATH", str(_REPO_ROOT / "data" / "bronze"))
def _get_db_conn():
return psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", 5432)),
database=os.getenv("POSTGRES_DB", "maps_db"),
user=os.getenv("POSTGRES_USER", "maps"),
password=os.getenv("POSTGRES_PASSWORD", "maps_dev"),
)L'indice parents[N] dipende dalla profonditร della directory: parents[2] da flows/{ente}/{dataset}/ raggiunge flows/ (dove si trova utils/); parents[3] raggiunge la root del repository (dove si trova .env). _get_db_conn() รจ definita localmente in ogni flow, non importata da utils, in modo che ogni script sia autocontenuto.
Il Flow 1 scarica i dati grezzi dalla fonte esterna e li salva nel filesystem Bronze tramite l'utility save_to_bronze. Registra l'operazione in bronze.ingestion_log.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
from utils.bronze_writer import save_to_bronze, log_ingestion
@task(
retries=3,
retry_delay_seconds=[60, 300, 900],
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24)
)
def download_fonte(anno: int) -> dict:
url = "https://example.gov.it/data/dataset.xlsx"
response = httpx.get(url, follow_redirects=True, timeout=300)
response.raise_for_status()
file_info = save_to_bronze(
data=response.content,
fonte="nome-ente",
dataset="nome_dataset",
anno=anno,
ext="xlsx"
)
log_ingestion(file_info, n_record=0, url_source=url)
return file_infosave_to_bronze crea la directory, scrive i byte grezzi, calcola il checksum SHA256 e restituisce un dizionario con file_path, size_bytes, checksum, fonte, dataset e anno. log_ingestion inserisce la riga corrispondente in bronze.ingestion_log.
Il Bronze layer contiene sempre la fonte grezza esattamente come ricevuta dall'origine: byte HTTP per file CSV, XLSX e PDF; HTML grezzo per le pipeline di scraping. La trasformazione in strutture dati avviene nel Flow 2.
Convenzione del path Bronze: {repo_root}/data/bronze/{ente}/{anno}/{dataset}.{ext} Esempi: /data/bronze/istat/2024/popolazione_residente.xlsx, /data/bronze/minsalute/2024/asl_boundaries.xlsx
Portali JSF/PrimeFaces. Alcuni portali pubblici italiani usano JavaServer Faces con paginazione Ajax. Ogni risposta contiene un token javax.faces.ViewState che deve essere ritrasmesso nella richiesta successiva. Usare una requests.Session per mantenere i cookie, estrarre il ViewState con BeautifulSoup e usare rate_limited_request() da utils/rate_limiter.py:
from utils.rate_limiter import rate_limited_request
def _get_viewstate(html: str) -> str:
from bs4 import BeautifulSoup
vs = BeautifulSoup(html, "html.parser").find("input", {"name": "javax.faces.ViewState"})
return vs.get("value", "") if vs else ""
session = requests.Session()
response = rate_limited_request(
url, method="POST",
data={
"javax.faces.ViewState": viewstate,
"javax.faces.partial.ajax": "true",
"javax.faces.partial.execute": "formBusca:regione",
"javax.faces.partial.render": "formBusca:provincia",
"formBusca:regione": regione_code,
},
session=session,
delay=2,
)Per un esempio completo vedere flows/agenzie-dogane/tabacchi/01_ingestion_flow.py.
Il Flow 2 legge dal Bronze, normalizza i dati e scrive in silver.territory_attributes. ร il flow piรน specifico per ogni dataset.
Scheletro standard:
@task
def trasforma_e_carica(file_info: dict, entity_type: str) -> int:
# 1. Leggere il file sorgente
if file_info['file_path'].endswith('.xlsx'):
df = pd.read_excel(file_info['file_path'])
elif file_info['file_path'].endswith('.csv'):
df = pd.read_csv(file_info['file_path'], encoding='utf-8-sig', sep=';')
# 2. Risolvere territory_id e costruire i record EAV
conn = _get_db_conn()
lookup = _build_territory_lookup(conn) # (sigla, label_upper) โ (territory_id, type_code)
eav_records = []
for _, row in df.iterrows():
match = lookup.get((row['SIGLA'].upper(), row['COMUNE'].upper()))
if not match:
continue
territory_id, type_code = match
eav_records.extend([
(territory_id, type_code, 'attributo_1', str(row['COL_A']),
'integer', fonte, f'{anno}-01-01', None),
(territory_id, type_code, 'attributo_2', str(row['COL_B']),
'string', fonte, f'{anno}-01-01', None),
])
# 3. UPSERT nel Silver EAV
try:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value,
data_type, source, valid_from, valid_to)
VALUES %s
ON CONFLICT (territory_id, attribute, valid_from)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""", eav_records)
conn.commit()
finally:
conn.close()
return len(eav_records)Risoluzione del territorio: le pipeline devono risolvere territory_id da silver.territories prima di scrivere in silver.territory_attributes. Usare _build_territory_lookup() (v. ยง6.0.1) o il modulo condiviso territory_resolver.build_comune_lookup_from_conn() per costruire un dizionario (sigla_automobilistica, label_upper) โ (territory_id, type_code). I record il cui comune non puรฒ essere risolto vengono registrati come warning e saltati โ comportamento atteso quando le pipeline fondazionali ISTAT non sono ancora state eseguite.
Il Flow 3 valida sia i file Bronze che i record Silver. Usa Great Expectations per la validazione strutturata e produce un report.
Validazione Bronze (verificare il file grezzo prima di fidarsi della trasformazione):
import great_expectations as gx
suite = gx.core.ExpectationSuite(name=f"{fonte}_{dataset}")
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_to_exist",
kwargs={"column": "colonna_codice_entita"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "colonna_codice_entita", "mostly": 0.99}
))Validazione Silver (verificare la correttezza dei record EAV):
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "territory_id"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "valore_cast", "min_value": 0, "max_value": 10_000_000}
))Strategia di errore: i fallimenti non critici vengono registrati come warning; i fallimenti critici nella validazione Silver bloccano il Flow 4 (catalogo).
Il Flow 4 registra i file Bronze e la tabella Silver in OpenMetadata, creando una voce di lineage dalla fonte all'EAV.
@task
def catalog_bronze_files(bronze_path: str, fonte: str, dataset: str):
"""Scansiona la directory Bronze e registra i file in OpenMetadata."""
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection
)
# ... configurazione ingestion OpenMetadata e creazione lineagePrerequisiti: pip install 'openmetadata-ingestion[datalake]' e variabile d'ambiente OPENMETADATA_JWT_TOKEN impostata (generare dalla UI OpenMetadata โ Settings โ Bots โ ingestion-bot).
Sei moduli in flows/utils/ sono disponibili per tutte le pipeline.
utils/bronze_writer.py โ scrittura Bronze standardizzata con checksum e log:
from utils.bronze_writer import save_to_bronze, log_ingestion
file_info = save_to_bronze(data=response.content, fonte="agenzie-dogane", dataset="tabacchi", anno=2025, ext="html")
log_ingestion(file_info, n_record=0, url_source=url)utils/rate_limiter.py โ rate limiting globale thread-safe per il web scraping:
from utils.rate_limiter import rate_limited_request
response = rate_limited_request(url, method="POST", delay=2)utils/html_parser.py โ parsing HTML condiviso:
from utils.html_parser import parse_table, extract_form_data
data = parse_table(soup, table_id="results")utils/situas_client.py โ wrapper API ISTAT SITUAS:
from utils.situas_client import SituasClient
client = SituasClient()
catalogue = client.get_catalogue() # lista di 74 dataset
records = client.get_all_records(pfun=64, pdata="01/01/2024") # paginato
records = client.get_records(pfun=61, pdata="01/01/2024") # singola paginaget_all_records() gestisce la paginazione iterando pagine da 500 record. Alcuni pfun โ in particolare pfun=61 (comuni) โ restituiscono tutti i risultati in un'unica risposta e non supportano la paginazione per offset; usare get_records() direttamente per quelli.
utils/territory_resolver.py โ lookup condiviso (sigla, comune) โ territory_id:
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
# Costruire il lookup una volta per run (una query DB)
lookup = build_comune_lookup_from_conn(conn)
# Risolvere ogni record โ lookup O(1) su dizionario
match = resolve_comune(lookup, sigla="MI", nome="Milano")
if match:
territory_id, type_code = matchIl lookup cerca sia in silver.territories.label che in silver.territory_names, che include nomi alternativi, forme bilingui e alias ADM. Se resolve_comune restituisce None, il territorio non รจ stato trovato โ registrare un warning e continuare.
utils/service_writer.py โ helper upsert idempotenti per silver.services e le tabelle satellite:
from utils.service_writer import (
upsert_service, # โ int (service id)
upsert_service_identifier, # allegare un codice esterno
upsert_service_attribute, # memorizzare una metrica tipizzata (EAV)
upsert_service_category, # allegare un tag categorico
)Tutte le funzioni accettano un cursore psycopg2 aperto. Il chiamante gestisce la connessione e chiama conn.commit(). Per esempi d'uso vedere ยง5.3.
Usare queste utility invece di reimplementare pattern comuni in ogni pipeline.
Un deployment trasforma un flow in una risorsa pianificata e monitorata sul server Prefect. Lo sviluppatore definisce prefect.yaml una volta; le esecuzioni successive avvengono tramite UI o CLI senza toccare il codice.
Ogni pipeline definisce quattro deployment in prefect.yaml. La sezione pull รจ obbligatoria in tutte le pipeline: imposta la directory di lavoro e installa le dipendenze del flow a runtime. Questo รจ necessario perchรฉ il container prefect-worker usa l'immagine base prefecthq/prefect:3-python3.11, che non pre-installa pacchetti specifici della pipeline come psycopg2-binary.
name: {ente}-{dataset}
prefect-version: "3.*"
work_pool:
name: default-pool
pull:
- prefect.deployments.steps.set_working_directory:
directory: /flows/{ente}/{dataset}
- prefect.deployments.steps.pip_install_requirements:
directory: /flows/{ente}/{dataset}
requirements_file: requirements.txt
deployments:
- name: {ente}-{dataset}-01-ingestion # Flow 1 โ download โ Bronze
entrypoint: 01_ingestion_flow.py:ingestion_flow
schedules: [] # nessuno schedule automatico โ avviare manualmente
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-02-transform # Flow 2 โ Bronze โ Silver
entrypoint: 02_transform_flow.py:transform_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-03-quality # Flow 3 โ monitoraggio qualitร
entrypoint: 03_data_quality_flow.py:quality_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-04-metadata # Flow 4 โ aggiornamento catalogo
entrypoint: 04_metadata_flow.py:metadata_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-poolIl prefisso numerico (01, 02, ...) mantiene i deployment in ordine di esecuzione nella UI di Prefect. schedules: [] rimuove esplicitamente qualsiasi schedule โ omettere la chiave non elimina gli schedule precedentemente registrati.
I deployment devono essere registrati dall'interno del container prefect-worker, non dalla shell dell'host. Questo perchรฉ Prefect registra il path della directory di lavoro al momento della registrazione e lo usa letteralmente al momento dell'esecuzione. Il worker esegue i flow in /flows/{ente}/{dataset}/ (il mount del volume Docker); registrare dall'host registrerebbe il path dell'host che non esiste dentro il container.
docker exec maps-prefect-worker bash -c \
"cd /flows/{ente}/{dataset} && prefect deploy --all"Dopo la registrazione, i deployment sono visibili su http://localhost:4200/deployments.
Il worker Prefect legge il codice dal filesystem ad ogni esecuzione:
prefect deploy --all. Il worker eseguirร il file aggiornato alla prossima run.prefect.yaml (nuovi deployment, cambi di schedule, nuovi parametri, sezione pull): richiedono prefect deploy --all eseguito dall'interno del container.schedules: [] in prefect.yaml e ri-registrare. Omettere la chiave non rimuove uno schedule esistente.Dalla UI โ andare su http://localhost:4200/deployments, cliccare un deployment, poi "Run". Il form dei parametri รจ pre-compilato con i valori predefiniti e puรฒ essere modificato prima di ogni run.
Dalla CLI:
export PREFECT_API_URL=http://localhost:4200/api
# Eseguire con parametri predefiniti
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion'
# Override dei parametri a runtime
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion' \
--param anno=2024
# Elencare tutti i deployment registrati
prefect deployment lsIl nome CLI segue il pattern {flow-name}/{deployment-name}. Il nome del flow viene dal decoratore @flow(name=...); il nome del deployment da prefect.yaml. Verificare i nomi esatti con prefect deployment ls o dalla UI.
Non avviare un worker Prefect locale (prefect worker start) quando si usa lo stack Docker. Il container maps-prefect-worker fa giร polling su default-pool. Un worker locale concorrente prenderebbe in carico le run ma fallirebbe perchรฉ i path /flows/ non esistono sull'host.
Monitoraggio: Prefect UI su http://localhost:4200.
Ognuno dei quattro flow puรฒ essere sviluppato e testato indipendentemente. Una suddivisione naturale per un team di due sviluppatori che lavorano sulla stessa pipeline:
| Sviluppatore A | Sviluppatore B |
|---|---|
| Flow 1: Ingestion (specifico per la fonte, spesso scraping o API) | Flow 2: Transform (normalizzazione EAV, risoluzione territorio) |
| Flow 4: Catalogo metadata | Flow 3: Data quality (aspettative GX, rilevamento anomalie) |
README.md + prefect.yaml | tests/test_flow.py |
Criterio di passaggio di consegne tra i flow: il Flow 1 รจ considerato completo quando almeno un file Bronze รจ scritto nel path corretto e il suo checksum รจ registrato in bronze.ingestion_log. Il Flow 2 puรฒ iniziare con quel file indipendentemente dal completamento del Flow 1.
Tracciamento su GitLab: aprire un issue per ogni flow. Collegarli all'issue principale della pipeline. Chiudere ogni sub-issue quando il flow raggiunge i criteri di accettazione Bronze/Silver (v. ยง10).
/data/bronze/{ente}/{anno}/)bronze.ingestion_logretries=3 con backoff)README.md documenta URL della fonte, formato e frequenza di aggiornamentosilver.territory_attributes con il type_code correttoON CONFLICT DO UPDATE attivo (riesecuzioni idempotenti)territory_id risolto per tutti i record (comuni non risolti registrati e contati)territory_id, range dei valori, campi temporaliON CONFLICT fallisce con "no unique constraint": il vincolo UNIQUE su (territory_id, attribute, valid_from) deve essere presente. Verificare con \d silver.territory_attributes in psql.
territory_id รจ NULL / nessun record risolto: le pipeline fondazionali ISTAT (flows/istat/) devono essere eseguite prima di qualsiasi pipeline EAV. Se _build_territory_lookup() restituisce un dizionario vuoto, silver.territories non รจ ancora popolato.
valid_from causa una chiave duplicata: due righe per la stessa entitร /attributo/anno con date valid_from diverse. Usare una data fissa (es. f"{anno}-01-01") invece di CURRENT_DATE per garantire l'idempotenza.
Il Flow 2 legge un file Bronze obsoleto: la cache dei task Prefect potrebbe restituire un risultato in cache. Disabilitare la cache per quel task in quella run, oppure eliminare la voce dalla UI di Prefect.
JWT OpenMetadata scaduto: ruotare il token dalla UI di OpenMetadata (Settings โ Bots) e aggiornare la variabile d'ambiente OPENMETADATA_JWT_TOKEN nel container prefect-worker.
La run del flow va in crash con FileNotFoundError: /flows/{ente}/{dataset}: il deployment รจ stato registrato dalla shell dell'host invece di dall'interno del container. Ri-registrare usando docker exec maps-prefect-worker bash -c "cd /flows/... && prefect deploy --all" (v. ยง8.2).
La run del flow va in crash con ModuleNotFoundError: No module named 'psycopg2': la sezione pull รจ assente o incompleta in prefect.yaml. Assicurarsi che entrambi i passi set_working_directory e pip_install_requirements siano presenti (v. ยง8.1), poi ri-registrare il deployment.
La run รจ pianificata ma non si avvia mai: un worker locale (prefect worker start) รจ in esecuzione insieme al worker Docker e si aggiudica la run, poi va in crash. Arrestare i processi worker locali e lasciare che maps-prefect-worker gestisca l'esecuzione.
get_all_records() solleva JSONDecodeError: il pfun restituisce tutti i record in un'unica risposta e non supporta la paginazione per offset tramite pdatada. Usare get_records() direttamente (v. ยง7 โ situas_client.py).