Skip to content

Pipeline ETL e Script Documentati ​

Deliverable D2.2: Script ETL (Python/SQL) documentati e testati

Il deliverable D2.2 è costituito dagli script ETL che acquisiscono, trasformano e validano i dati per il Data Lake MAPS. Gli script coprono circa 200 dataset provenienti da ISTAT e da istituzioni pubbliche, normalizzati a granularità comunale o sovracomunale e archiviati nel layer Silver EAV. Questo capitolo fornisce una panoramica dell'architettura delle pipeline e descrive le pipeline implementate. La guida completa per gli sviluppatori — convenzioni, pattern, procedure di deployment e un'implementazione di riferimento — è disponibile nell'Appendice A2 — Manuale operativo per lo sviluppo di pipeline ETL.

Processo di sviluppo delle pipeline ​

Ogni pipeline del Data Lake MAPS segue una struttura standardizzata a quattro flow che separa l'acquisizione dei dati dalla trasformazione, dalla validazione e dalla catalogazione. La separazione è deliberata: se la logica di trasformazione cambia, è sufficiente rieseguire solo il flow di trasformazione senza riscaricare i dati sorgente; se una regola di validazione viene aggiornata, viene avviato solo il flow di qualità. Questa indipendenza rende lo sviluppo iterativo significativamente più rapido e riduce il rischio di reintrodurre errori quando si corregge una singola fase.

Il processo di sviluppo di una nuova pipeline si articola in quattro fasi:

  1. Flow 1 — Ingestion: lo sviluppatore identifica la fonte dati, implementa la logica di download o di scraping e scrive i file grezzi nel Bronze layer. Il flow si considera completo quando almeno un file ben formato è disponibile al percorso Bronze atteso e registrato in bronze.ingestion_log.
  2. Flow 2 — Transform: lo sviluppatore legge i file Bronze, risolve gli identificatori territoriali tramite territory_resolver, mappa le colonne sorgente sugli attributi EAV e scrive in silver.territory_attributes. È il flow più specifico per ogni dataset.
  3. Flow 3 — Data quality: le suite Great Expectations validano sia i file Bronze che i record Silver rispetto ai criteri di accettazione definiti per completezza, accuratezza e tempestività.
  4. Flow 4 — Metadata: la pipeline registra i file Bronze e le tabelle Silver in OpenMetadata, creando un'entry di lineage completa dalla fonte al layer EAV.

Ogni pipeline risiede in una directory dedicata sotto flows/{ente}/{dataset}/ nel repository gst-maps-pipelines, contenente i quattro file flow, un manifesto di deployment prefect.yaml, un requirements.txt e un README.md che descrive la fonte, il formato e la frequenza di aggiornamento. La convenzione di naming, le utility condivise, i pattern di lavoro in team e le procedure di deployment sono descritti in dettaglio nell'Appendice A2.

Pipeline implementate ​

Fondazione territoriale ISTAT ​

La fondazione territoriale è un insieme di quattro pipeline coordinate che devono essere eseguite in sequenza prima che qualsiasi pipeline attributo possa caricare dati in Silver. Tutte le pipeline attributo risolvono gli identificatori territoriali tramite silver.territories; questa tabella — insieme alle tabelle associate di identificatori, nomi, contenimenti e relazioni — è popolata esclusivamente dalla fondazione territoriale.

flows/istat/variazioni-amministrative scarica la storia completa delle variazioni amministrative ISTAT tramite l'API REST SITUAS all'indirizzo situas-servizi.istat.it/publish. SITUAS organizza i dataset per codice pfun; la pipeline richiede dieci dataset che coprono regioni (pfun 107, 106, 108), province (pfun 113, 112, 114) e comuni (pfun 129, 98, 104, 105), ciascuno come serie storica completa dal 17 marzo 1861. Il tipo di parametro varia per dataset: alcuni accettano un intervallo di date (pdatada/pdataa), altri una singola data di riferimento (pdata). La utility condivisa situas_client.py gestisce entrambe le varianti. Le risposte grezze vengono scritte nel Bronze come CSV sotto data/bronze/istat/variazioni-amministrative/.

flows/istat/province-regioni legge i file Bronze delle variazioni e popola silver.territories, silver.territory_identifiers, silver.territory_names e silver.territory_containments per ripartizioni, regioni e province. L'ordine di elaborazione è deliberato: prima le ripartizioni, poi le regioni, poi le province, poi gli eventi di variazione (CS = Costituzione, ES = Estinzione). Gli eventi di variazione che hanno interessato il nome o i confini di una provincia vengono applicati in linea, in modo che i timestamp valid_from e valid_to su ciascun record riflettano il ciclo di vita effettivo dell'ente amministrativo.

flows/istat/comuni è la trasformazione più complessa, articolata in due fasi. La fase 1 utilizza un algoritmo union-find per identificare catene di equivalenza: quando il codice ISTAT di un comune cambia a seguito di un evento AP (Assegnazione Provinciale) o RN (Ridenominazione), i comuni collegati vengono raggruppati sotto un unico territory_id stabile. La chiave stabile è il codice catastale Belfiore (COD_CATASTO), invariante rispetto alle riorganizzazioni provinciali. La fase 2 scrive snapshot annuali di nomi, codici ISTAT e contenimenti per ciascun comune, con valid_from/valid_to derivati dall'evento di variazione corrispondente.

flows/istat/territory-corrections applica un insieme curato di correzioni che non possono essere derivate algoritmicamente dai dati sorgente SITUAS. Utilizza il codice Belfiore come chiave di ricerca ed esegue due operazioni: l'inserimento di nomi alias ADM per i comuni il cui nome ufficiale differisce dall'etichetta del registro amministrativo ADM (consentendo la risoluzione territoriale dai dataset ADM), e la correzione di anomalie di qualità dei dati note nel sorgente SITUAS (valori valid_to non aggiornati, record di contenimento mancanti).

Insieme, queste quattro pipeline stabiliscono un grafo completo e versionato temporalmente delle entità territoriali italiane dall'861 a oggi. Ogni pipeline attributo risolve i propri identificatori sorgente in valori territory_id tramite territory_resolver prima di scrivere in silver.territory_attributes.

ADM Tabaccherie ​

La pipeline ADM Tabaccherie acquisisce l'elenco dei rivenditori di tabacchi autorizzati dal portale web dell'Agenzia delle Dogane e dei Monopoli (ADM). Il portale utilizza un'interfaccia JSF/PrimeFaces che richiede la gestione del token javax.faces.ViewState; la utility html_parser gestisce questa complessità in modo trasparente. Per ciascun comune, la pipeline interroga il portale, analizza la tabella HTML risultante e scrive le pagine HTML grezze nel Bronze sotto data/bronze/agenzie-dogane/tabacchi/.

Il flow di trasformazione risolve gli identificatori comunali tramite territory_resolver utilizzando la sigla provinciale e il nome del comune dalla risposta ADM, quindi scrive un record EAV per attributo (numero di punti vendita, indirizzi) in silver.territory_attributes. Questa pipeline è l'implementazione di riferimento per il pattern attributo service-count utilizzato dagli altri dataset provenienti da ADM.

Le voci aggiuntive delle pipeline saranno aggiunte man mano che ciascuna pipeline raggiunge la produzione.

Data quality framework ​

Suite Great Expectations ​

Ogni pipeline include due suite Great Expectations: una che valida la struttura del file Bronze e una che valida i record Silver. La suite Bronze verifica che il file sorgente grezzo sia ben formato e contenga le colonne attese; la suite Silver verifica che i record EAV siano completi, coerenti e correttamente collegati a silver.territories.

L'esempio seguente mostra la suite Bronze per la pipeline ISTAT variazioni-amministrative:

python
# expectations/istat_variazioni_bronze.py
import great_expectations as gx

suite = gx.core.ExpectationSuite(name="istat_variazioni_bronze")

# Colonne richieste presenti
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_columns_to_match_set",
        kwargs={"column_set": ["COD_COM_STORICO", "COD_CATASTO", "COMUNE_IT",
                               "COD_PROV_STORICO", "DATA_INI_EFF", "DATA_FIN_EFF"]}
    )
)

# Il codice catastale non deve essere null (chiave stabile per le catene di equivalenza)
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "COD_CATASTO"}
    )
)

# Formato data
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_match_regex",
        kwargs={
            "column": "DATA_INI_EFF",
            "regex": r"^\d{2}/\d{2}/\d{4}$"
        }
    )
)

La suite Silver valida che i record di silver.territory_attributes scritti da una pipeline attributo siano correttamente collegati:

python
# expectations/territory_attributes_silver.py
import great_expectations as gx

suite = gx.core.ExpectationSuite(name="territory_attributes_silver")

# Nessun territory_id null (ogni record deve risolvere a un territorio noto)
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "territory_id"}
    )
)

# Copertura: almeno il 95% dei comuni attivi deve avere un record
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={
            "min_value": 7500,  # ~95% dei comuni (7.904 attivi al 2024)
            "max_value": 8500
        }
    )
)

Monitoring e alerting ​

Prefect dashboard ​

Logging ​

I log dei worker sono disponibili nell'interfaccia Prefect per ogni flow run e vengono scritti nel volume mappato data/logs/workers/ sull'host.


[WIP] Questo capitolo sarà completato con:

  • Documentazione completa di tutte le pipeline prioritarie
  • Unit test per ogni task
  • Integration test end-to-end
  • Performance benchmarks