Skip to content

Appendice A2 โ€” Manuale operativo per lo sviluppo di pipeline ETL โ€‹

Deliverable D2.2 โ€” Appendice: Guida operativa per la costruzione e la manutenzione delle pipeline ETL nel Data Lake MAPS.

Questa guida รจ rivolta agli sviluppatori che entrano nel progetto e devono costruire una nuova pipeline dati o manutenere una esistente. Descrive il flusso di lavoro standard, le convenzioni e i pattern adottati in tutte le pipeline dell'infrastruttura MAPS. L'implementazione di riferimento รจ la pipeline Agenzia Dogane e Monopoli โ€” tabaccherie; il codice sorgente completo รจ disponibile in flows/agenzie-dogane/tabacchi/ nel repository.

1. Quickstart โ€” Ambiente di sviluppo โ€‹

Per avviare l'ambiente di sviluppo locale sono necessari Python 3.11, Docker e Docker Compose. I passi seguenti configurano il database, il server Prefect e l'interprete Python in circa dieci minuti.

1.1 Prerequisiti โ€‹

  • Python 3.11
  • Docker 24+
  • Docker Compose v2

1.2 Clona il repository โ€‹

bash
git clone https://gitlab.openpolis.io/openpolis/gst/gst-maps-pipelines.git
cd gst-maps-pipelines

1.3 Variabili d'ambiente โ€‹

bash
cp .env.example .env

I valori predefiniti in .env.example funzionano senza modifiche per lo sviluppo locale. L'unica variabile che vale la pena verificare รจ BRONZE_BASE_PATH:

# .env.example โ€” commentata, non necessaria in sviluppo locale
# BRONZE_BASE_PATH=/data/bronze

In sviluppo locale i flow calcolano automaticamente il path Bronze come {repo_root}/data/bronze (derivato dalla posizione del file). In produzione, dentro il container prefect-worker, impostare BRONZE_BASE_PATH=/data/bronze โ€” il volume Docker รจ giร  montato in quella posizione.

I flow caricano .env automaticamente all'avvio tramite python-dotenv (incluso in requirements-dev.txt): non รจ necessario esportare le variabili manualmente nella shell prima di eseguire uno script.

1.4 Avvio dei servizi โ€‹

bash
docker compose -f docker-compose.local.yml up -d

Avvia PostgreSQL 17 + PostGIS 3.5, il server Prefect e OpenMetadata (con il suo database dedicato ed Elasticsearch). Lo schema del database MAPS (schemi bronze, silver, gold, tabelle e indici) viene creato automaticamente al primo avvio tramite postgres/init-scripts/01-init-schemas.sql. Verificare che i servizi siano attivi:

bash
docker compose -f docker-compose.local.yml ps

Se si modifica docker-compose.local.yml o .env dopo che i container sono giร  in esecuzione, docker compose restart non rilegge le variabili d'ambiente. Usare invece:

bash
docker compose -f docker-compose.local.yml up -d --force-recreate

1.5 Installazione dipendenze Python โ€‹

bash
python3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt

Questo installa Prefect, psycopg2, requests, BeautifulSoup e python-dotenv. Il virtualenv va attivato ogni volta che si apre una nuova sessione di terminale (source .venv/bin/activate).

1.6 Configurazione Prefect โ€‹

bash
export PREFECT_API_URL=http://localhost:4200/api
prefect work-pool create default-pool --type process

Il work pool default-pool รจ il pool di esecuzione referenziato nel prefect.yaml di ogni pipeline.

1.7 Verifica โ€‹

Verificare che gli schemi e le tabelle siano stati creati correttamente:

sql
-- schemi attesi: bronze, silver, gold
\dn

-- tabelle attese: bronze.ingestion_log, silver.territory_types,
--   silver.territories, silver.territory_containments,
--   silver.territory_identifiers, silver.territory_names,
--   silver.territory_attributes,
--   silver.service_types, silver.services, silver.service_identifiers,
--   silver.service_attributes, silver.service_categories
\dt bronze.*
\dt silver.*

L'ambiente รจ pronto. Il layer Bronze รจ un volume Docker (bronze-data) montato in /data all'interno dei container; i file grezzi scaricati dai flow saranno visibili in quel volume.

OpenMetadata impiega alcuni minuti per completare la migrazione del database al primo avvio; attendere che il container maps-openmetadata riporti lo stato healthy prima di aprire l'interfaccia.

2. Architettura delle pipeline โ€‹

2.1 Prefect: architettura client/server โ€‹

Prefect segue un'architettura client/server. Il server (container maps-prefect-server) tiene traccia delle esecuzioni, conserva i log e serve la UI su http://localhost:4200. Il worker (container maps-prefect-worker) รจ il processo che esegue materialmente i flow: interroga il server in cerca di lavoro assegnato al suo pool (default-pool) e avvia ogni flow in un subprocess.

Lo sviluppatore interagisce con il server tramite il client Prefect โ€” la CLI e l'SDK Python โ€” la cui destinazione รจ configurata dalla variabile d'ambiente PREFECT_API_URL. Durante lo sviluppo locale il client punta al server nel container; in produzione punterร  al server remoto, senza che il codice dei flow cambi.

Un deployment associa un flow a un pool di worker e ne fissa i parametri di esecuzione (schedule, parametri di default). Una volta registrato, il flow puรฒ essere avviato dalla UI o da CLI senza che lo sviluppatore debba avere il processo attivo localmente.

sequenceDiagram
    participant Dev as Sviluppatore
    participant Server as Prefect Server
    participant Worker as Worker (default-pool)

    Dev->>Server: registra deployment
    Note over Server: salva configurazione flow,
schedule, parametri Dev->>Server: avvia esecuzione (UI o CLI) Server->>Worker: invia work item Worker->>Worker: esegue flow in subprocess Worker-->>Server: aggiorna stato + log

2.2 Pattern Medallion โ€‹

Ogni pipeline MAPS segue il pattern Medallion: i file grezzi vengono scaricati nel Bronze layer, trasformati e validati nel Silver layer, infine aggregati nel Gold layer. Le pipeline sono orchestrate da Prefect e seguono una struttura standard a quattro flow.

Fonte esterna โ†’ Flow 1: Ingestion โ†’ Bronze (filesystem)
             โ†’ Flow 2: Transform  โ†’ Silver (PostgreSQL)
             โ†’ Flow 3: Quality    โ†’ Report di validazione
             โ†’ Flow 4: Metadata   โ†’ Catalogo OpenMetadata

Ogni flow รจ indipendente e puรฒ essere rieseguito senza dover rieseguire i flow precedenti. Questo รจ intenzionale: se cambia la logica di trasformazione, รจ sufficiente rieseguire il Flow 2 senza dover ri-scaricare i dati dalla fonte.

3. Struttura delle directory โ€‹

Ogni pipeline risiede in una sottodirectory dedicata sotto flows/:

flows/
โ”œโ”€โ”€ utils/                       # Moduli condivisi tra tutte le pipeline
โ”‚   โ”œโ”€โ”€ bronze_writer.py         # save_to_bronze(), log_ingestion()
โ”‚   โ”œโ”€โ”€ rate_limiter.py          # rate_limited_request()
โ”‚   โ”œโ”€โ”€ html_parser.py           # parse_table(), extract_form_data()
โ”‚   โ”œโ”€โ”€ situas_client.py         # SituasClient โ€” wrapper API ISTAT SITUAS
โ”‚   โ”œโ”€โ”€ territory_resolver.py    # build_comune_lookup_from_conn(), resolve_comune()
โ”‚   โ””โ”€โ”€ service_writer.py        # upsert_service(), upsert_service_identifier(), ...
โ””โ”€โ”€ {ente}/{dataset}/
    โ”œโ”€โ”€ 01_ingestion_flow.py     # Flow 1: download โ†’ Bronze
    โ”œโ”€โ”€ 02_transform_flow.py     # Flow 2: Bronze โ†’ Silver
    โ”œโ”€โ”€ 03_data_quality_flow.py  # Flow 3: validazione
    โ”œโ”€โ”€ 04_metadata_flow.py      # Flow 4: catalogo OpenMetadata
    โ”œโ”€โ”€ prefect.yaml             # Definizioni dei deployment (tutti e 4 i flow)
    โ”œโ”€โ”€ requirements.txt         # Dipendenze Python
    โ”œโ”€โ”€ README.md                # Documentazione della pipeline
    โ”œโ”€โ”€ docs/
    โ”‚   โ”œโ”€โ”€ DEPLOY_GUIDE.md      # Istruzioni di deployment passo per passo
    โ”‚   โ””โ”€โ”€ KNOWN_ISSUES.md      # Storico dei problemi e relative soluzioni
    โ””โ”€โ”€ tests/
        โ””โ”€โ”€ test_flow.py         # Test unitari

Convenzioni di denominazione:

  • Directory: minuscolo con trattini (agenzie-dogane/tabacchi/)
  • File dei flow: sempre numerati 01_, 02_, 03_, 04_
  • README: obbligatorio, descrive scopo, URL della fonte, formato e frequenza di aggiornamento

4. Pipeline territoriali ISTAT โ€‹

La directory flows/istat/ contiene le pipeline fondazionali che popolano il modello territoriale Silver. Tutte le altre pipeline dati dipendono dalla presenza di questi dati.

PipelineDirectoryFonteCopertura
Province, regioni, ripartizioniflows/istat/province-regioni/SITUAS pfun=64/68/71 + variazioni pfun=106โ€“108, 112โ€“114~107 province, 20 regioni, 5 ripartizioni per anno (2000โ€“oggi)
Comuniflows/istat/comuni/SITUAS pfun=61 + variazioni pfun=129~7.900 comuni per anno (2000โ€“oggi)
Variazioni amministrativeflows/istat/variazioni-amministrative/SITUAS pfun=106โ€“108, 112โ€“114, 98, 104, 105, 129Solo ingestione โ€” scarica la storia completa dal 1861 nel Bronze
Territory correctionsflows/istat/territory-corrections/Dati statici โ€” nomi alias ADM e correzioni di qualitร Flow idempotente; va eseguito dopo che i comuni sono popolati

L'ordine di esecuzione รจ importante. Il processamento delle variazioni รจ integrato nei flow di trasformazione di province-regioni e comuni. I dati Bronze delle variazioni devono essere ingeriti prima. Il flow territory-corrections va eseguito per ultimo, dopo che i comuni sono popolati.

Passo 1 โ€” Ingestione (qualsiasi ordine, eseguibili in parallelo):
  istat-province-regioni โ†’ Flow 1 (ingestion)
  istat-comuni           โ†’ Flow 1 (ingestion)
  istat-variazioni       โ†’ Flow 1 (ingestion)   โ† deve completarsi prima del Passo 2

Passo 2 โ€” Trasformazione (ordine vincolato):
  istat-province-regioni โ†’ Flow 2 (transform)   โ† legge bronze province + bronze variazioni
  istat-comuni           โ†’ Flow 2 (transform)   โ† legge bronze comuni + bronze variazioni + silver province

Passo 3 โ€” Correzioni (dopo che i comuni sono popolati):
  istat-territory-corrections โ†’ Flow 1 (corrections)

4.1 Esecuzione durante lo sviluppo โ€‹

Ogni file di flow accetta --anni come argomento da riga di comando. I flow possono essere eseguiti direttamente via Python (con il virtualenv attivo) o via Docker (raccomandato):

bash
# Opzione A: Python direttamente (virtualenv attivo)

# Passo 1: Ingestione (qualsiasi ordine)
python flows/istat/variazioni-amministrative/01_ingestion_flow.py
python flows/istat/province-regioni/01_ingestion_flow.py
python flows/istat/comuni/01_ingestion_flow.py

# Passo 2: Trasformazione (ordine vincolato โ€” bronze variazioni deve esistere prima)
python flows/istat/province-regioni/02_transform_flow.py
python flows/istat/comuni/02_transform_flow.py

# Passo 3: Correzioni (dopo che i comuni sono popolati)
python flows/istat/territory-corrections/01_corrections_flow.py

# Solo anni specifici
python flows/istat/province-regioni/01_ingestion_flow.py --anni 2024 2025 2026

# Opzione B: Docker (raccomandato)

# Passo 1: Ingestione
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/variazioni-amministrative && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/province-regioni && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/comuni && python 01_ingestion_flow.py"

# Passo 2: Trasformazione (province-regioni prima, poi comuni)
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/province-regioni && python 02_transform_flow.py"
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/comuni && python 02_transform_flow.py"

# Passo 3: Correzioni
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/territory-corrections && python 01_corrections_flow.py"

4.2 Esecuzione via deployment Prefect โ€‹

Tutte le pipeline ISTAT sono registrate sul server Prefect senza schedule automatico; le esecuzioni vengono avviate manualmente. Per registrare o ri-registrare dopo una modifica al prefect.yaml:

bash
docker exec maps-prefect-worker bash -c "cd /flows/istat/province-regioni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/comuni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/variazioni-amministrative && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/territory-corrections && prefect deploy --all"

Per avviare una run via CLI (rispettare l'ordine di esecuzione):

bash
export PREFECT_API_URL=http://localhost:4200/api

# Passo 1: Ingestione (qualsiasi ordine)
prefect deployment run 'istat-variazioni-ingestion/istat-variazioni-01-ingestion'
prefect deployment run 'istat-province-regioni-ingestion/istat-pr-01-ingestion' \
  --param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-ingestion/istat-comuni-01-ingestion' \
  --param anni='[2024,2025,2026]'

# Passo 2: Trasformazione (attendere il completamento dell'ingestione; province-regioni prima dei comuni)
prefect deployment run 'istat-province-regioni-transform/istat-pr-02-transform' \
  --param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-transform/istat-comuni-02-transform' \
  --param anni='[2024,2025,2026]'

# Passo 3: Correzioni (dopo il completamento della trasformazione dei comuni)
prefect deployment run 'istat-territory-corrections/istat-territory-corrections-01'

Tutti i flow ISTAT sono idempotenti: rieseguire una trasformazione su un Bronze invariato produce un output Silver identico.

5. Silver layer โ€‹

5.1 Modello delle entitร  territoriali โ€‹

Il Silver layer adotta un modello a identitร  surrogata stabile per i territori. Ogni territorio ha un id interno che non cambia mai; i codici ISTAT sono memorizzati come identificatori in silver.territory_identifiers. Questo elimina la duplicazione dei record quando i comuni cambiano provincia โ€” il comune rimane una sola riga, con piรน periodi di codice ISTAT.

TabellaScopo
silver.territory_typesRegistro dei tipi di entitร  โ€” pre-popolato con 9 tipi
silver.territoriesRegistro principale: id surrogato stabile, type_code, label, date di ciclo di vita (valid_from/valid_to)
silver.territory_identifiersIdentificatori temporali: codici ISTAT (scheme='istat'), codici catastali, codici fiscali, codici UTS
silver.territory_namesNomi temporali con supporto multilingue (italiano e bilinguismi/locali)
silver.territory_containmentsContenimento gerarchico temporale (comune โ†’ provincia, con date precise per i cambi di provincia)
silver.territory_relationshipsLegami predecessore/successore da eventi variazioni (ES, CS, RN, AQES, CECS)
              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
              โ”‚        territory_types       โ”‚
              โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
              โ”‚ PK code VARCHAR(30)          โ”‚
              โ”‚    label                     โ”‚
              โ”‚    hierarchy_level           โ”‚
              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                             โ”‚ 1
                             โ”‚ (type_code)
                             โ”‚ N
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚                      territories                        โ”‚
   โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
   โ”‚ PK id SERIAL                                            โ”‚
   โ”‚ FK type_code        label           subtype             โ”‚
   โ”‚    valid_from DATE  valid_to DATE   end_reason TEXT     โ”‚
   โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
      โ”‚ 1:N      โ”‚ 1:N                          โ”‚ 1:N (member + container)
      โ”‚          โ”‚                              โ”‚
      โ–ผ          โ–ผ                              โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ territory_    โ”‚ โ”‚ territory_    โ”‚  โ”‚    territory_containments       โ”‚
โ”‚ identifiers   โ”‚ โ”‚ names         โ”‚  โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค  โ”‚ FK member_id โ†’ territories      โ”‚
โ”‚FK territory_  โ”‚ โ”‚FK territory_  โ”‚  โ”‚ FK container_id โ†’ territories   โ”‚
โ”‚   id          โ”‚ โ”‚   id          โ”‚  โ”‚    valid_from DATE              โ”‚
โ”‚   scheme      โ”‚ โ”‚   name        โ”‚  โ”‚    valid_to DATE                โ”‚
โ”‚   identifier  โ”‚ โ”‚   language    โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚   valid_from  โ”‚ โ”‚   name_type   โ”‚
โ”‚   valid_to    โ”‚ โ”‚   valid_from  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   fonte       โ”‚ โ”‚   valid_to    โ”‚  โ”‚    territory_relationships      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
                                     โ”‚ FK source_id โ†’ territories      โ”‚
                                     โ”‚ FK dest_id โ†’ territories        โ”‚
                                     โ”‚    classification TEXT          โ”‚
                                     โ”‚    valid_from DATE              โ”‚
                                     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

I territori vengono risolti per codice ISTAT tramite un join su territory_identifiers:

sql
-- Trovare il territorio per il codice ISTAT '092001'
SELECT t.id, t.label FROM silver.territories t
JOIN silver.territory_identifiers ti ON ti.territory_id = t.id
WHERE t.type_code = 'comune' AND ti.scheme = 'istat' AND ti.identifier = '092001';

-- In quale provincia si trovava Arbus il 1ยฐ gennaio 2015?
SELECT p.label FROM silver.territory_containments tc
JOIN silver.territories p ON p.id = tc.container_id AND p.type_code = 'provincia'
JOIN silver.territories t ON t.id = tc.member_id
WHERE t.type_code = 'comune' AND t.label = 'Arbus'
  AND (tc.valid_from IS NULL OR tc.valid_from <= '2015-01-01')
  AND (tc.valid_to IS NULL OR tc.valid_to > '2015-01-01');

valid_from = NULL indica un inizio di validitร  sconosciuto (precedente ai dati disponibili). valid_to = NULL indica che il territorio รจ ancora attivo.

5.2 Tabella EAV โ€‹

Tutte le pipeline scrivono in silver.territory_attributes, la tabella EAV generalizzata che supporta qualsiasi tipo di entitร  territoriale.

sql
INSERT INTO silver.territory_attributes
    (territory_id, type_code, attribute, value, data_type, source, valid_from, valid_to)
VALUES
    (42,  'comune',    'popolazione',   '2635',  'integer', 'ISTAT', '2024-01-01', NULL),
    (107, 'provincia', 'n_tabaccherie', '245',   'integer', 'ADM',   '2024-01-01', '2025-12-31'),
    (8,   'sll',       'addetti',       '15200', 'integer', 'ISTAT', '2021-01-01', NULL);
CampoDescrizioneEsempi
territory_idFK verso silver.territories(id) โ€” surrogato stabile, sopravvive ai cambi di codice ISTATrisolto tramite territory_resolver
type_codeTipo di entitร  (denormalizzato da territories)'comune', 'provincia', 'regione', 'sll', 'slo'
attributeNome dell'attributo'popolazione', 'n_tabaccherie_adm'
valueValore memorizzato come testo'2635'
data_typeSuggerimento per il cast del tipo'integer', 'float', 'string', 'boolean'
sourceOrigine del dato'ISTAT', 'ADM', 'MinSalute'
valid_fromInizio del periodo di validitร '2024-01-01'
valid_toFine del periodo di validitร  โ€” NULL indica il valore correnteNULL

Il pattern ON CONFLICT ... DO UPDATE garantisce la riesecuzione idempotente:

python
execute_values(
    cur,
    """
    INSERT INTO silver.territory_attributes
        (territory_id, type_code, attribute, value,
         data_type, source, valid_from, valid_to)
    VALUES %s
    ON CONFLICT (territory_id, attribute, valid_from)
    DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
    """,
    eav_records
)

5.3 Modello dei servizi locali โ€‹

silver.territory_attributes memorizza una riga per ogni (territorio, attributo, periodo): รจ ottimizzata per statistiche territoriali aggregate come conteggi di popolazione o numero di farmacie per comune. Non รจ progettata per memorizzare record individuali di punti di interesse.

Diverse fonti pubblicano elenchi di servizi locali individuali (farmacie, ospedali, scuole, tabaccherie e altri). Conservare i record individuali nel Silver offre vantaggi concreti rispetto alla sola scrittura di conteggi aggregati: gli stessi dati grezzi possono essere aggregati per comune, provincia, ASL o SLL senza dover re-ingerire dalla fonte; i record provenienti da piรน dataset possono essere deduplicati tramite service_identifiers usando codici esterni come codice fiscale o codice HSP; attributi specifici del servizio possono essere memorizzati senza modifiche allo schema.

TabellaScopo
silver.service_typesRegistro dei tipi di servizio โ€” pre-popolato con 5 tipi (farmacia, ospedale, scuola, biblioteca, tabaccheria)
silver.servicesUna riga per ogni servizio individuale, con territory_id risolto, date di ciclo di vita e un vincolo di unicitร  per upsert idempotenti
silver.service_identifiersCodici esterni per servizio (codice_fiscale, codice_ministeriale, codice_hsp, ...) โ€” stesso identificatore significa stesso servizio logico tra dataset diversi
silver.service_attributesEAV per metriche specifiche del servizio (posti_letto, num_studenti, ...) โ€” rispecchia territory_attributes ma legato a una riga di servizio
silver.service_categoriesTag tipizzati multivalore (tipo_ospedale: DEA II livello, livello_scuola: primaria, ...)

La scrittura dei servizi da una pipeline avviene tramite gli helper in utils/service_writer.py:

python
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
from utils.service_writer import (
    upsert_service, upsert_service_identifier,
    upsert_service_attribute, upsert_service_category,
)

conn = _get_db_conn()
lookup = build_comune_lookup_from_conn(conn)

with conn.cursor() as cur:
    for record in source_records:
        # 1. Risolvere il territorio
        match = resolve_comune(lookup, sigla=record["provincia"], nome=record["comune"])
        territory_id = match[0] if match else None

        # 2. Upsert della riga servizio
        service_id = upsert_service(
            cur,
            type_code="farmacia",
            name=record["denominazione"],
            address=record["indirizzo"],
            territory_id=territory_id,
            source="AIFA",
            valid_from="2024-01-01",
        )

        # 3. Allegare identificatore esterno (per deduplicazione cross-source)
        upsert_service_identifier(cur, service_id=service_id,
                                  scheme="codice_fiscale", identifier=record["cf"])

        # 4. Memorizzare metrica specifica del servizio
        upsert_service_attribute(cur, service_id=service_id,
                                 attribute="tipo_farmacia", value=record["tipo"],
                                 data_type="text", source="AIFA", valid_from="2024-01-01")

        # 5. Aggiungere categoria
        upsert_service_category(cur, service_id=service_id,
                                scheme="tipo_farmacia", value=record["tipo"])

conn.commit()
conn.close()

Una volta che i record individuali sono nel Silver, i conteggi per territorio sono una semplice query GROUP BY che puรฒ alimentare silver.territory_attributes come statistiche aggregate.

6. La pipeline di riferimento: tabaccherie ADM โ€‹

La pipeline Agenzia Dogane e Monopoli โ€” tabaccherie รจ la pipeline di riferimento perchรฉ รจ la piรน complessa tra quelle sviluppate: richiede scraping HTML di un portale JSF con paginazione Ajax, gestione della sessione, rate limiting e normalizzazione dei codici territoriali. Una pipeline che scarica direttamente un file CSV o XLSX segue lo stesso schema ma con un Flow 1 molto piรน semplice.

6.0 Sviluppo locale โ€‹

Prima di registrare un deployment su Prefect, ogni flow puรฒ essere eseguito direttamente come script Python. Questo รจ il modo normale di lavorare: scrivere il flow, eseguirlo, correggere gli errori, rieseguirlo โ€” senza toccare il server.

Ogni file di flow deve contenere un blocco if __name__ == "__main__" con argparse in modo che i parametri possano essere passati da riga di comando senza modificare il sorgente:

python
# 01_ingestion_flow.py
from prefect import flow

@flow
def ingestion_flow(anno: int = 2025):
    ...

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--anno", type=int, default=2025)
    args = parser.parse_args()
    ingestion_flow(anno=args.anno)

Usare sempre lo stesso metodo di esecuzione all'interno di una run di pipeline. Python scrive i file Bronze in {repo_root}/data/bronze/ sull'host; Docker scrive in /data/bronze/ dentro il container. Se si ingerisce con Python e si trasforma con Docker (o viceversa), il flow di trasformazione non troverร  i file Bronze. In docker-compose.local.yml la directory Bronze รจ montata in bind da ./data, quindi entrambi i metodi condividono lo stesso path โ€” ma solo se BRONZE_BASE_PATH รจ impostato in modo coerente (v. ยง1.3).

Opzione A โ€” Python direttamente (il virtualenv deve essere attivo):

bash
source .venv/bin/activate
cd flows/agenzie-dogane/tabacchi/

python 01_ingestion_flow.py              # usa i parametri predefiniti
python 01_ingestion_flow.py --anno 2024  # override a runtime

Opzione B โ€” Docker (raccomandato โ€” replica esattamente l'ambiente di produzione):

bash
docker exec maps-prefect-worker bash -c \
  "cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py"

docker exec maps-prefect-worker bash -c \
  "cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py --anno 2024"

Se PREFECT_API_URL รจ impostata e il server รจ in esecuzione, la run appare nella UI con log e stato. Se il server non รจ disponibile, il flow gira comunque come script Python normale.

L'iterazione tipica รจ:

  1. Scrivere o modificare il flow
  2. Eseguire via Docker (o Python direttamente per modifiche rapide)
  3. Correggere errori, ripetere
  4. Quando funziona, procedere al deployment (ยง8)

Ogni flow puรฒ essere sviluppato e testato indipendentemente dagli altri. Il Flow 2 puรฒ essere testato leggendo un file Bronze giร  presente su disco, senza che il Flow 1 sia terminato.

6.0.1 Boilerplate standard di un flow โ€‹

Ogni file di flow inizia con la stessa intestazione in quattro parti. Copiarla verbatim e adattare l'indice parents[N] e la costante FONTE.

python
import os
import sys
from pathlib import Path

try:
    from dotenv import load_dotenv
    load_dotenv(Path(__file__).resolve().parents[3] / ".env")   # repo root
except ImportError:
    pass

import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task, get_run_logger

sys.path.insert(0, str(Path(__file__).resolve().parents[2]))    # aggiunge flows/ al path
from utils.bronze_writer import save_to_bronze, log_ingestion   # e altri utils se necessario

FONTE = "nome-ente"                                             # corrisponde alla directory Bronze
_REPO_ROOT = Path(__file__).resolve().parents[3]
BRONZE_BASE = os.getenv("BRONZE_BASE_PATH", str(_REPO_ROOT / "data" / "bronze"))


def _get_db_conn():
    return psycopg2.connect(
        host=os.getenv("POSTGRES_HOST", "localhost"),
        port=int(os.getenv("POSTGRES_PORT", 5432)),
        database=os.getenv("POSTGRES_DB", "maps_db"),
        user=os.getenv("POSTGRES_USER", "maps"),
        password=os.getenv("POSTGRES_PASSWORD", "maps_dev"),
    )

L'indice parents[N] dipende dalla profonditร  della directory: parents[2] da flows/{ente}/{dataset}/ raggiunge flows/ (dove si trova utils/); parents[3] raggiunge la root del repository (dove si trova .env). _get_db_conn() รจ definita localmente in ogni flow, non importata da utils, in modo che ogni script sia autocontenuto.

6.1 Flow 1 โ€” Ingestion (Bronze layer) โ€‹

Il Flow 1 scarica i dati grezzi dalla fonte esterna e li salva nel filesystem Bronze tramite l'utility save_to_bronze. Registra l'operazione in bronze.ingestion_log.

python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
from utils.bronze_writer import save_to_bronze, log_ingestion

@task(
    retries=3,
    retry_delay_seconds=[60, 300, 900],
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=24)
)
def download_fonte(anno: int) -> dict:
    url = "https://example.gov.it/data/dataset.xlsx"
    response = httpx.get(url, follow_redirects=True, timeout=300)
    response.raise_for_status()

    file_info = save_to_bronze(
        data=response.content,
        fonte="nome-ente",
        dataset="nome_dataset",
        anno=anno,
        ext="xlsx"
    )
    log_ingestion(file_info, n_record=0, url_source=url)
    return file_info

save_to_bronze crea la directory, scrive i byte grezzi, calcola il checksum SHA256 e restituisce un dizionario con file_path, size_bytes, checksum, fonte, dataset e anno. log_ingestion inserisce la riga corrispondente in bronze.ingestion_log.

Il Bronze layer contiene sempre la fonte grezza esattamente come ricevuta dall'origine: byte HTTP per file CSV, XLSX e PDF; HTML grezzo per le pipeline di scraping. La trasformazione in strutture dati avviene nel Flow 2.

Convenzione del path Bronze: {repo_root}/data/bronze/{ente}/{anno}/{dataset}.{ext} Esempi: /data/bronze/istat/2024/popolazione_residente.xlsx, /data/bronze/minsalute/2024/asl_boundaries.xlsx

Portali JSF/PrimeFaces. Alcuni portali pubblici italiani usano JavaServer Faces con paginazione Ajax. Ogni risposta contiene un token javax.faces.ViewState che deve essere ritrasmesso nella richiesta successiva. Usare una requests.Session per mantenere i cookie, estrarre il ViewState con BeautifulSoup e usare rate_limited_request() da utils/rate_limiter.py:

python
from utils.rate_limiter import rate_limited_request

def _get_viewstate(html: str) -> str:
    from bs4 import BeautifulSoup
    vs = BeautifulSoup(html, "html.parser").find("input", {"name": "javax.faces.ViewState"})
    return vs.get("value", "") if vs else ""

session = requests.Session()
response = rate_limited_request(
    url, method="POST",
    data={
        "javax.faces.ViewState": viewstate,
        "javax.faces.partial.ajax": "true",
        "javax.faces.partial.execute": "formBusca:regione",
        "javax.faces.partial.render": "formBusca:provincia",
        "formBusca:regione": regione_code,
    },
    session=session,
    delay=2,
)

Per un esempio completo vedere flows/agenzie-dogane/tabacchi/01_ingestion_flow.py.

6.2 Flow 2 โ€” Transform (Silver layer) โ€‹

Il Flow 2 legge dal Bronze, normalizza i dati e scrive in silver.territory_attributes. รˆ il flow piรน specifico per ogni dataset.

Scheletro standard:

python
@task
def trasforma_e_carica(file_info: dict, entity_type: str) -> int:
    # 1. Leggere il file sorgente
    if file_info['file_path'].endswith('.xlsx'):
        df = pd.read_excel(file_info['file_path'])
    elif file_info['file_path'].endswith('.csv'):
        df = pd.read_csv(file_info['file_path'], encoding='utf-8-sig', sep=';')

    # 2. Risolvere territory_id e costruire i record EAV
    conn = _get_db_conn()
    lookup = _build_territory_lookup(conn)   # (sigla, label_upper) โ†’ (territory_id, type_code)
    eav_records = []
    for _, row in df.iterrows():
        match = lookup.get((row['SIGLA'].upper(), row['COMUNE'].upper()))
        if not match:
            continue
        territory_id, type_code = match
        eav_records.extend([
            (territory_id, type_code, 'attributo_1', str(row['COL_A']),
             'integer', fonte, f'{anno}-01-01', None),
            (territory_id, type_code, 'attributo_2', str(row['COL_B']),
             'string',  fonte, f'{anno}-01-01', None),
        ])

    # 3. UPSERT nel Silver EAV
    try:
        with conn.cursor() as cur:
            execute_values(cur, """
                INSERT INTO silver.territory_attributes
                    (territory_id, type_code, attribute, value,
                     data_type, source, valid_from, valid_to)
                VALUES %s
                ON CONFLICT (territory_id, attribute, valid_from)
                DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
            """, eav_records)
        conn.commit()
    finally:
        conn.close()

    return len(eav_records)

Risoluzione del territorio: le pipeline devono risolvere territory_id da silver.territories prima di scrivere in silver.territory_attributes. Usare _build_territory_lookup() (v. ยง6.0.1) o il modulo condiviso territory_resolver.build_comune_lookup_from_conn() per costruire un dizionario (sigla_automobilistica, label_upper) โ†’ (territory_id, type_code). I record il cui comune non puรฒ essere risolto vengono registrati come warning e saltati โ€” comportamento atteso quando le pipeline fondazionali ISTAT non sono ancora state eseguite.

6.3 Flow 3 โ€” Data quality โ€‹

Il Flow 3 valida sia i file Bronze che i record Silver. Usa Great Expectations per la validazione strutturata e produce un report.

Validazione Bronze (verificare il file grezzo prima di fidarsi della trasformazione):

python
import great_expectations as gx

suite = gx.core.ExpectationSuite(name=f"{fonte}_{dataset}")

suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_to_exist",
    kwargs={"column": "colonna_codice_entita"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "colonna_codice_entita", "mostly": 0.99}
))

Validazione Silver (verificare la correttezza dei record EAV):

python
suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "territory_id"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={"column": "valore_cast", "min_value": 0, "max_value": 10_000_000}
))

Strategia di errore: i fallimenti non critici vengono registrati come warning; i fallimenti critici nella validazione Silver bloccano il Flow 4 (catalogo).

6.4 Flow 4 โ€” Catalogo metadata โ€‹

Il Flow 4 registra i file Bronze e la tabella Silver in OpenMetadata, creando una voce di lineage dalla fonte all'EAV.

python
@task
def catalog_bronze_files(bronze_path: str, fonte: str, dataset: str):
    """Scansiona la directory Bronze e registra i file in OpenMetadata."""
    from metadata.ingestion.ometa.ometa_api import OpenMetadata
    from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
        OpenMetadataConnection
    )
    # ... configurazione ingestion OpenMetadata e creazione lineage

Prerequisiti: pip install 'openmetadata-ingestion[datalake]' e variabile d'ambiente OPENMETADATA_JWT_TOKEN impostata (generare dalla UI OpenMetadata โ†’ Settings โ†’ Bots โ†’ ingestion-bot).

7. Utility condivise โ€‹

Sei moduli in flows/utils/ sono disponibili per tutte le pipeline.

utils/bronze_writer.py โ€” scrittura Bronze standardizzata con checksum e log:

python
from utils.bronze_writer import save_to_bronze, log_ingestion

file_info = save_to_bronze(data=response.content, fonte="agenzie-dogane", dataset="tabacchi", anno=2025, ext="html")
log_ingestion(file_info, n_record=0, url_source=url)

utils/rate_limiter.py โ€” rate limiting globale thread-safe per il web scraping:

python
from utils.rate_limiter import rate_limited_request

response = rate_limited_request(url, method="POST", delay=2)

utils/html_parser.py โ€” parsing HTML condiviso:

python
from utils.html_parser import parse_table, extract_form_data

data = parse_table(soup, table_id="results")

utils/situas_client.py โ€” wrapper API ISTAT SITUAS:

python
from utils.situas_client import SituasClient

client = SituasClient()
catalogue = client.get_catalogue()              # lista di 74 dataset
records = client.get_all_records(pfun=64, pdata="01/01/2024")   # paginato
records = client.get_records(pfun=61, pdata="01/01/2024")       # singola pagina

get_all_records() gestisce la paginazione iterando pagine da 500 record. Alcuni pfun โ€” in particolare pfun=61 (comuni) โ€” restituiscono tutti i risultati in un'unica risposta e non supportano la paginazione per offset; usare get_records() direttamente per quelli.

utils/territory_resolver.py โ€” lookup condiviso (sigla, comune) โ†’ territory_id:

python
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune

# Costruire il lookup una volta per run (una query DB)
lookup = build_comune_lookup_from_conn(conn)

# Risolvere ogni record โ€” lookup O(1) su dizionario
match = resolve_comune(lookup, sigla="MI", nome="Milano")
if match:
    territory_id, type_code = match

Il lookup cerca sia in silver.territories.label che in silver.territory_names, che include nomi alternativi, forme bilingui e alias ADM. Se resolve_comune restituisce None, il territorio non รจ stato trovato โ€” registrare un warning e continuare.

utils/service_writer.py โ€” helper upsert idempotenti per silver.services e le tabelle satellite:

python
from utils.service_writer import (
    upsert_service,             # โ†’ int (service id)
    upsert_service_identifier,  # allegare un codice esterno
    upsert_service_attribute,   # memorizzare una metrica tipizzata (EAV)
    upsert_service_category,    # allegare un tag categorico
)

Tutte le funzioni accettano un cursore psycopg2 aperto. Il chiamante gestisce la connessione e chiama conn.commit(). Per esempi d'uso vedere ยง5.3.

Usare queste utility invece di reimplementare pattern comuni in ogni pipeline.

8. Deployment con Prefect โ€‹

Un deployment trasforma un flow in una risorsa pianificata e monitorata sul server Prefect. Lo sviluppatore definisce prefect.yaml una volta; le esecuzioni successive avvengono tramite UI o CLI senza toccare il codice.

8.1 Struttura di prefect.yaml โ€‹

Ogni pipeline definisce quattro deployment in prefect.yaml. La sezione pull รจ obbligatoria in tutte le pipeline: imposta la directory di lavoro e installa le dipendenze del flow a runtime. Questo รจ necessario perchรฉ il container prefect-worker usa l'immagine base prefecthq/prefect:3-python3.11, che non pre-installa pacchetti specifici della pipeline come psycopg2-binary.

yaml
name: {ente}-{dataset}
prefect-version: "3.*"

work_pool:
  name: default-pool

pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /flows/{ente}/{dataset}
  - prefect.deployments.steps.pip_install_requirements:
      directory: /flows/{ente}/{dataset}
      requirements_file: requirements.txt

deployments:
  - name: {ente}-{dataset}-01-ingestion      # Flow 1 โ€” download โ†’ Bronze
    entrypoint: 01_ingestion_flow.py:ingestion_flow
    schedules: []                            # nessuno schedule automatico โ€” avviare manualmente
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

  - name: {ente}-{dataset}-02-transform      # Flow 2 โ€” Bronze โ†’ Silver
    entrypoint: 02_transform_flow.py:transform_flow
    schedules: []
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

  - name: {ente}-{dataset}-03-quality        # Flow 3 โ€” monitoraggio qualitร 
    entrypoint: 03_data_quality_flow.py:quality_flow
    schedules: []
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

  - name: {ente}-{dataset}-04-metadata       # Flow 4 โ€” aggiornamento catalogo
    entrypoint: 04_metadata_flow.py:metadata_flow
    schedules: []
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

Il prefisso numerico (01, 02, ...) mantiene i deployment in ordine di esecuzione nella UI di Prefect. schedules: [] rimuove esplicitamente qualsiasi schedule โ€” omettere la chiave non elimina gli schedule precedentemente registrati.

8.2 Registrazione dei deployment โ€‹

I deployment devono essere registrati dall'interno del container prefect-worker, non dalla shell dell'host. Questo perchรฉ Prefect registra il path della directory di lavoro al momento della registrazione e lo usa letteralmente al momento dell'esecuzione. Il worker esegue i flow in /flows/{ente}/{dataset}/ (il mount del volume Docker); registrare dall'host registrerebbe il path dell'host che non esiste dentro il container.

bash
docker exec maps-prefect-worker bash -c \
  "cd /flows/{ente}/{dataset} && prefect deploy --all"

Dopo la registrazione, i deployment sono visibili su http://localhost:4200/deployments.

8.3 Quando re-fare il deploy โ€‹

Il worker Prefect legge il codice dal filesystem ad ogni esecuzione:

  • Modifiche al codice dei flow (logica, bug fix): non richiedono prefect deploy --all. Il worker eseguirร  il file aggiornato alla prossima run.
  • Modifiche a prefect.yaml (nuovi deployment, cambi di schedule, nuovi parametri, sezione pull): richiedono prefect deploy --all eseguito dall'interno del container.
  • Rimozione di uno schedule: impostare schedules: [] in prefect.yaml e ri-registrare. Omettere la chiave non rimuove uno schedule esistente.

8.4 Avviare un deployment โ€‹

Dalla UI โ€” andare su http://localhost:4200/deployments, cliccare un deployment, poi "Run". Il form dei parametri รจ pre-compilato con i valori predefiniti e puรฒ essere modificato prima di ogni run.

Dalla CLI:

bash
export PREFECT_API_URL=http://localhost:4200/api

# Eseguire con parametri predefiniti
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion'

# Override dei parametri a runtime
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion' \
  --param anno=2024

# Elencare tutti i deployment registrati
prefect deployment ls

Il nome CLI segue il pattern {flow-name}/{deployment-name}. Il nome del flow viene dal decoratore @flow(name=...); il nome del deployment da prefect.yaml. Verificare i nomi esatti con prefect deployment ls o dalla UI.

Non avviare un worker Prefect locale (prefect worker start) quando si usa lo stack Docker. Il container maps-prefect-worker fa giร  polling su default-pool. Un worker locale concorrente prenderebbe in carico le run ma fallirebbe perchรฉ i path /flows/ non esistono sull'host.

Monitoraggio: Prefect UI su http://localhost:4200.

9. Flusso di lavoro di squadra โ€‹

Ognuno dei quattro flow puรฒ essere sviluppato e testato indipendentemente. Una suddivisione naturale per un team di due sviluppatori che lavorano sulla stessa pipeline:

Sviluppatore ASviluppatore B
Flow 1: Ingestion (specifico per la fonte, spesso scraping o API)Flow 2: Transform (normalizzazione EAV, risoluzione territorio)
Flow 4: Catalogo metadataFlow 3: Data quality (aspettative GX, rilevamento anomalie)
README.md + prefect.yamltests/test_flow.py

Criterio di passaggio di consegne tra i flow: il Flow 1 รจ considerato completo quando almeno un file Bronze รจ scritto nel path corretto e il suo checksum รจ registrato in bronze.ingestion_log. Il Flow 2 puรฒ iniziare con quel file indipendentemente dal completamento del Flow 1.

Tracciamento su GitLab: aprire un issue per ogni flow. Collegarli all'issue principale della pipeline. Chiudere ogni sub-issue quando il flow raggiunge i criteri di accettazione Bronze/Silver (v. ยง10).

10. Checklist per fase โ€‹

Flow 1 (Ingestion) completo quando: โ€‹

  • File scaricato nel path Bronze corretto (/data/bronze/{ente}/{anno}/)
  • Checksum SHA256 registrato in bronze.ingestion_log
  • Logica di retry attiva (retries=3 con backoff)
  • README.md documenta URL della fonte, formato e frequenza di aggiornamento

Flow 2 (Transform) completo quando: โ€‹

  • Record inseriti in silver.territory_attributes con il type_code corretto
  • ON CONFLICT DO UPDATE attivo (riesecuzioni idempotenti)
  • territory_id risolto per tutti i record (comuni non risolti registrati e contati)
  • Riesecuzione su Bronze invariato produce output Silver identico

Flow 3 (Quality) completo quando: โ€‹

  • Validazione Bronze copre: schema, tasso di null, formati dei valori
  • Validazione Silver copre: presenza di territory_id, range dei valori, campi temporali
  • Report prodotto ad ogni run (pass o fail)
  • Soglia di fallimento definita e documentata

Flow 4 (Metadata) completo quando: โ€‹

  • File Bronze visibili in OpenMetadata
  • Lineage Bronze โ†’ Silver creato
  • Schema estratto e disponibile nell'interfaccia del catalogo

11. Errori frequenti โ€‹

ON CONFLICT fallisce con "no unique constraint": il vincolo UNIQUE su (territory_id, attribute, valid_from) deve essere presente. Verificare con \d silver.territory_attributes in psql.

territory_id รจ NULL / nessun record risolto: le pipeline fondazionali ISTAT (flows/istat/) devono essere eseguite prima di qualsiasi pipeline EAV. Se _build_territory_lookup() restituisce un dizionario vuoto, silver.territories non รจ ancora popolato.

valid_from causa una chiave duplicata: due righe per la stessa entitร /attributo/anno con date valid_from diverse. Usare una data fissa (es. f"{anno}-01-01") invece di CURRENT_DATE per garantire l'idempotenza.

Il Flow 2 legge un file Bronze obsoleto: la cache dei task Prefect potrebbe restituire un risultato in cache. Disabilitare la cache per quel task in quella run, oppure eliminare la voce dalla UI di Prefect.

JWT OpenMetadata scaduto: ruotare il token dalla UI di OpenMetadata (Settings โ†’ Bots) e aggiornare la variabile d'ambiente OPENMETADATA_JWT_TOKEN nel container prefect-worker.

La run del flow va in crash con FileNotFoundError: /flows/{ente}/{dataset}: il deployment รจ stato registrato dalla shell dell'host invece di dall'interno del container. Ri-registrare usando docker exec maps-prefect-worker bash -c "cd /flows/... && prefect deploy --all" (v. ยง8.2).

La run del flow va in crash con ModuleNotFoundError: No module named 'psycopg2': la sezione pull รจ assente o incompleta in prefect.yaml. Assicurarsi che entrambi i passi set_working_directory e pip_install_requirements siano presenti (v. ยง8.1), poi ri-registrare il deployment.

La run รจ pianificata ma non si avvia mai: un worker locale (prefect worker start) รจ in esecuzione insieme al worker Docker e si aggiudica la run, poi va in crash. Arrestare i processi worker locali e lasciare che maps-prefect-worker gestisca l'esecuzione.

get_all_records() solleva JSONDecodeError: il pfun restituisce tutti i record in un'unica risposta e non supporta la paginazione per offset tramite pdatada. Usare get_records() direttamente (v. ยง7 โ€” situas_client.py).