Skip to content

ETL Pipeline and Documented Scripts ​

Deliverable D2.2: Documented and tested ETL scripts (Python/SQL)

Deliverable D2.2 consists of the ETL scripts that acquire, transform, and validate data for the MAPS Data Lake. The scripts cover approximately 200 datasets from ISTAT and public institutions, normalised to municipal or supra-municipal granularity and stored in the Silver EAV layer. This chapter provides an overview of the pipeline architecture and describes the implemented pipelines. The complete developer guide — conventions, patterns, deployment procedures, and a reference implementation — is in Appendix A2 — ETL Pipeline Developer Guide.

Pipeline development process ​

Each pipeline in the MAPS Data Lake follows a standardised four-flow structure that separates data acquisition from transformation, validation, and cataloguing. The separation is deliberate: if the transformation logic changes, only the transform flow needs to be re-run without re-downloading source data; if a validation rule is updated, only the quality flow is triggered. This independence makes iterative development significantly faster and reduces the risk of re-introducing errors when correcting a single stage.

The development process for a new pipeline proceeds through four phases:

  1. Flow 1 — Ingestion: the developer identifies the data source, implements the download or scraping logic, and writes the raw files to the Bronze layer. The flow is considered complete when at least one well-formed file is available at the expected Bronze path and logged in bronze.ingestion_log.
  2. Flow 2 — Transform: the developer reads the Bronze files, resolves territory identifiers via territory_resolver, maps the source columns to EAV attributes, and writes to silver.territory_attributes. This is the most dataset-specific flow.
  3. Flow 3 — Data quality: Great Expectations suites validate both the Bronze files and the Silver records against the acceptance criteria defined for completeness, accuracy, and timeliness.
  4. Flow 4 — Metadata: the pipeline registers Bronze files and Silver tables in OpenMetadata, creating a complete lineage entry from the source to the EAV layer.

Each pipeline lives in a dedicated directory under flows/{ente}/{dataset}/ in the gst-maps-pipelines repository, containing the four flow files, a prefect.yaml deployment manifest, a requirements.txt, and a README.md describing the source, format, and update frequency. The naming convention, shared utilities, team workflow patterns, and deployment procedures are described in detail in Appendix A2.

Implemented pipelines ​

ISTAT territorial foundation ​

The territorial foundation is a set of four coordinated pipelines that must run in sequence before any attribute pipeline can load data into Silver. All attribute pipelines resolve territory identifiers through silver.territories; this table — and its associated identifier, name, containment, and relationship tables — is populated exclusively by the territorial foundation.

flows/istat/variazioni-amministrative downloads the complete ISTAT administrative change history via the SITUAS REST API at situas-servizi.istat.it/publish. SITUAS organises its datasets by pfun code; the pipeline requests ten datasets covering regioni (pfun 107, 106, 108), province (pfun 113, 112, 114), and comuni (pfun 129, 98, 104, 105), each as a full time series from 17 March 1861. The parameter type differs by dataset: some accept a period range (pdatada/pdataa), others a single reference date (pdata). The shared situas_client.py utility handles both variants. Raw responses are written to Bronze as CSV under data/bronze/istat/variazioni-amministrative/.

flows/istat/province-regioni reads the variazioni Bronze files and populates silver.territories, silver.territory_identifiers, silver.territory_names, and silver.territory_containments for ripartizioni, regioni, and province. The processing order is deliberate: ripartizioni first, then regioni, then province, then variazioni events (CS = Costituzione, ES = Estinzione). Administrative change events that affected a province's name or boundaries are applied inline, so that valid_from and valid_to timestamps on each record reflect the actual lifecycle of that administrative unit.

flows/istat/comuni is the most complex transform, structured in two phases. Phase 1 uses a union-find algorithm to identify equivalence chains: when a municipality's ISTAT code changes due to an AP (Assegnazione Provinciale) or RN (Ridenominazione) event, the linked settlements are grouped under a single stable territory_id. The stable key is the Belfiore cadastral code (COD_CATASTO), which is invariant across province reorganisations. Phase 2 writes annual snapshots of names, ISTAT codes, and containments for each municipality, with valid_from/valid_to derived from the corresponding variazioni event.

flows/istat/territory-corrections applies a curated set of corrections that cannot be derived algorithmically from the SITUAS source data. It uses the Belfiore code as the lookup key and performs two tasks: inserting ADM alias names for municipalities whose official label differs from the ADM administrative registry label (enabling territory resolution from ADM-sourced datasets), and patching known data-quality anomalies in the SITUAS source (stale valid_to values, missing containment entries).

Together, these four pipelines establish a complete, temporally versioned graph of Italian territorial entities from 1861 to the present. Every downstream attribute pipeline resolves its source identifiers to territory_id values via territory_resolver before writing to silver.territory_attributes.

ADM Tabaccherie ​

The ADM Tabaccherie pipeline acquires the list of licensed tobacco retailers from the Agenzia delle Dogane e dei Monopoli (ADM) web portal. The portal uses a JSF/PrimeFaces interface that requires javax.faces.ViewState token management; the html_parser utility handles this transparently. For each municipality, the pipeline queries the portal, parses the HTML result table, and writes raw HTML pages to Bronze under data/bronze/agenzie-dogane/tabacchi/.

The transform flow resolves municipal identifiers through territory_resolver using the province abbreviation and municipality name from the ADM response, then writes one EAV record per attribute (outlet count, addresses) to silver.territory_attributes. This pipeline is the reference implementation for the service-count attribute pattern used by other ADM-sourced datasets.

Additional pipeline entries will be added as each pipeline reaches production.

Data quality framework ​

Great Expectations suites ​

Each pipeline includes two Great Expectations suites: one validating the Bronze file structure and one validating the Silver records. The Bronze suite checks that the raw source file is well-formed and contains the expected columns; the Silver suite checks that the EAV records are complete, consistent, and correctly linked to silver.territories.

The example below shows the Bronze suite for the ISTAT variazioni-amministrative pipeline:

python
# expectations/istat_variazioni_bronze.py
import great_expectations as gx

suite = gx.core.ExpectationSuite(name="istat_variazioni_bronze")

# Required columns present
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_columns_to_match_set",
        kwargs={"column_set": ["COD_COM_STORICO", "COD_CATASTO", "COMUNE_IT",
                               "COD_PROV_STORICO", "DATA_INI_EFF", "DATA_FIN_EFF"]}
    )
)

# Cadastral code never null (stable key for union-find equivalence chains)
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "COD_CATASTO"}
    )
)

# Date format
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_match_regex",
        kwargs={
            "column": "DATA_INI_EFF",
            "regex": r"^\d{2}/\d{2}/\d{4}$"
        }
    )
)

The Silver suite validates that silver.territory_attributes records written by an attribute pipeline are correctly linked:

python
# expectations/territory_attributes_silver.py
import great_expectations as gx

suite = gx.core.ExpectationSuite(name="territory_attributes_silver")

# No null territory_id (every record must resolve to a known territory)
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "territory_id"}
    )
)

# Coverage: at least 95% of active municipalities must have a record
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={
            "min_value": 7500,  # ~95% of municipalities (7,904 active as of 2024)
            "max_value": 8500
        }
    )
)

Monitoring and alerting ​

Prefect dashboard ​

Logging ​

Worker logs are available in the Prefect UI under each flow run and written to the mapped data/logs/workers/ volume on the host.


[WIP] This chapter will be completed with:

  • Complete documentation for all priority pipelines
  • Unit tests for each task
  • End-to-end integration tests
  • Performance benchmarks