Skip to content

Appendix A2 โ€” ETL Pipeline Developer Guide โ€‹

Deliverable D2.2 โ€” Appendix: Operational guide for building and maintaining ETL pipelines in the MAPS Data Lake.

This guide is intended for developers joining the project who need to build a new data pipeline or maintain an existing one. It describes the standard workflow, conventions, and patterns used across all pipelines in the MAPS infrastructure. The reference implementation throughout this guide is the Agenzia Dogane e Monopoli โ€” tabaccherie pipeline; the complete source code is available at flows/agenzie-dogane/tabacchi/ in the repository.

1. Quickstart โ€” Development environment โ€‹

To start the local development environment you need Python 3.11, Docker, and Docker Compose. The following steps configure the database, the Prefect server, and the Python interpreter in about ten minutes.

1.1 Prerequisites โ€‹

  • Python 3.11
  • Docker 24+
  • Docker Compose v2

1.2 Clone the repository โ€‹

bash
git clone https://gitlab.openpolis.io/openpolis/gst/gst-maps-pipelines.git
cd gst-maps-pipelines

1.3 Environment variables โ€‹

bash
cp .env.example .env

The default values in .env.example work without modification for local development. The one variable worth checking is BRONZE_BASE_PATH:

# .env.example โ€” commented out, not needed in local development
# BRONZE_BASE_PATH=/data/bronze

In local development, flows automatically compute the Bronze path as {repo_root}/data/bronze (derived from the file's location). In production, inside the prefect-worker container, set BRONZE_BASE_PATH=/data/bronze โ€” the Docker volume is already mounted there.

Flows load .env automatically at startup via python-dotenv (included in requirements-dev.txt): there is no need to manually export variables in the shell before running a script.

1.4 Start services โ€‹

bash
docker compose -f docker-compose.local.yml up -d

This starts PostgreSQL 17 + PostGIS 3.5, the Prefect server, and OpenMetadata (with its dedicated database and Elasticsearch). The MAPS database schema (bronze, silver, gold schemas, tables, and indexes) is created automatically on first startup via postgres/init-scripts/01-init-schemas.sql. Verify that the services are running:

bash
docker compose -f docker-compose.local.yml ps

If you modify docker-compose.local.yml or .env after the containers are already running, docker compose restart does not re-read environment variables. Use instead:

bash
docker compose -f docker-compose.local.yml up -d --force-recreate

1.5 Install Python dependencies โ€‹

bash
python3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt

This installs Prefect, psycopg2, requests, BeautifulSoup, and python-dotenv. The virtualenv must be activated each time you open a new terminal session (source .venv/bin/activate).

1.6 Configure Prefect โ€‹

bash
export PREFECT_API_URL=http://localhost:4200/api
prefect work-pool create default-pool --type process

The default-pool work pool is the execution pool referenced in each pipeline's prefect.yaml.

1.7 Verify the setup โ€‹

Verify that the schemas and tables were created correctly:

sql
-- expected schemas: bronze, silver, gold
\dn

-- expected tables: bronze.ingestion_log, silver.territory_types,
--   silver.territories, silver.territory_containments,
--   silver.territory_identifiers, silver.territory_names,
--   silver.territory_attributes,
--   silver.service_types, silver.services, silver.service_identifiers,
--   silver.service_attributes, silver.service_categories
\dt bronze.*
\dt silver.*

The environment is ready. The Bronze layer is a Docker volume (bronze-data) mounted at /data inside the containers; raw files downloaded by flows will be stored in that volume.

OpenMetadata takes a few minutes to complete its database migration on first startup; wait for the maps-openmetadata container to report a healthy status before opening the interface.

2. Pipeline architecture โ€‹

2.1 Prefect: client/server architecture โ€‹

Prefect follows a client/server architecture. The server (container maps-prefect-server) tracks flow runs, stores logs, and serves the UI at http://localhost:4200. The worker (container maps-prefect-worker) is the process that actually executes flows: it polls the server for work assigned to its pool (default-pool) and launches each flow in a subprocess.

Developers interact with the server through the Prefect client โ€” the CLI and Python SDK โ€” whose target is configured by the PREFECT_API_URL environment variable. During local development the client points to the server running in Docker; in production it will point to the remote server, with no changes to the flow code itself.

A deployment associates a flow with a worker pool and defines its execution parameters (schedule, default parameter values). Once registered, a flow can be triggered from the UI or CLI without the developer needing a local process running.

sequenceDiagram
    participant Dev as Developer
    participant Server as Prefect Server
    participant Worker as Worker (default-pool)

    Dev->>Server: register deployment
    Note over Server: stores flow config,
schedule, parameters Dev->>Server: trigger run (UI or CLI) Server->>Worker: dispatch work item Worker->>Worker: execute flow subprocess Worker-->>Server: report state + logs

2.2 Medallion pattern โ€‹

Every MAPS pipeline follows the Medallion pattern: raw files are downloaded to the Bronze layer, transformed and validated in the Silver layer, and finally aggregated into the Gold layer. Pipelines are orchestrated by Prefect and follow a standard four-flow structure.

External source โ†’ Flow 1: Ingestion โ†’ Bronze (filesystem)
               โ†’ Flow 2: Transform  โ†’ Silver (PostgreSQL)
               โ†’ Flow 3: Quality    โ†’ Validation report
               โ†’ Flow 4: Metadata   โ†’ OpenMetadata catalogue

Each flow is independent and can be re-run without executing the preceding flows. This is deliberate: if the transformation logic changes, only Flow 2 needs to be re-run; there is no need to re-download the source data.

3. Directory structure โ€‹

Each pipeline lives in a dedicated subdirectory under flows/:

flows/
โ”œโ”€โ”€ utils/                       # Shared modules for all pipelines
โ”‚   โ”œโ”€โ”€ bronze_writer.py         # save_to_bronze(), log_ingestion()
โ”‚   โ”œโ”€โ”€ rate_limiter.py          # rate_limited_request()
โ”‚   โ”œโ”€โ”€ html_parser.py           # parse_table(), extract_form_data()
โ”‚   โ”œโ”€โ”€ situas_client.py         # SituasClient โ€” ISTAT SITUAS API wrapper
โ”‚   โ”œโ”€โ”€ territory_resolver.py    # build_comune_lookup_from_conn(), resolve_comune()
โ”‚   โ””โ”€โ”€ service_writer.py        # upsert_service(), upsert_service_identifier(), ...
โ””โ”€โ”€ {ente}/{dataset}/
    โ”œโ”€โ”€ 01_ingestion_flow.py     # Flow 1: download โ†’ Bronze
    โ”œโ”€โ”€ 02_transform_flow.py     # Flow 2: Bronze โ†’ Silver
    โ”œโ”€โ”€ 03_data_quality_flow.py  # Flow 3: validation
    โ”œโ”€โ”€ 04_metadata_flow.py      # Flow 4: OpenMetadata catalogue
    โ”œโ”€โ”€ prefect.yaml             # Deployment definitions (all 4 flows)
    โ”œโ”€โ”€ requirements.txt         # Python dependencies
    โ”œโ”€โ”€ README.md                # Pipeline documentation
    โ”œโ”€โ”€ docs/
    โ”‚   โ”œโ”€โ”€ DEPLOY_GUIDE.md      # Step-by-step deployment instructions
    โ”‚   โ””โ”€โ”€ KNOWN_ISSUES.md      # History of issues and their solutions
    โ””โ”€โ”€ tests/
        โ””โ”€โ”€ test_flow.py         # Unit tests

Naming conventions:

  • Directories: lowercase with hyphens (agenzie-dogane/tabacchi/)
  • Flow files: always numbered 01_, 02_, 03_, 04_
  • README: mandatory, describes purpose, source URL, format, and update frequency

4. ISTAT territorial pipelines โ€‹

The flows/istat/ directory contains the foundation pipelines that populate the Silver territorial model. All other data pipelines depend on this data being present.

PipelineDirectorySourceScope
Province, regioni, ripartizioniflows/istat/province-regioni/SITUAS pfun=64/68/71 + variazioni pfun=106โ€“108, 112โ€“114~107 province, 20 regioni, 5 ripartizioni per year (2000โ€“today)
Comuniflows/istat/comuni/SITUAS pfun=61 + variazioni pfun=129~7,900 comuni per year (2000โ€“today)
Variazioni amministrativeflows/istat/variazioni-amministrative/SITUAS pfun=106โ€“108, 112โ€“114, 98, 104, 105, 129Ingestion only โ€” downloads full history from 1861 into Bronze
Territory correctionsflows/istat/territory-corrections/Static data โ€” ADM alias names and data-quality fixesIdempotent flow; must run after comuni are populated

Run order matters. Variazioni processing is integrated into the province-regioni and comuni transform flows. The variazioni Bronze data must be ingested first. The territory-corrections flow must run last, after comuni territories are populated.

Step 1 โ€” Ingestion (any order, can run in parallel):
  istat-province-regioni โ†’ Flow 1 (ingestion)
  istat-comuni           โ†’ Flow 1 (ingestion)
  istat-variazioni       โ†’ Flow 1 (ingestion)   โ† must complete before Step 2

Step 2 โ€” Transform (strict order):
  istat-province-regioni โ†’ Flow 2 (transform)   โ† reads province bronze + variazioni bronze
  istat-comuni           โ†’ Flow 2 (transform)   โ† reads comuni bronze + variazioni bronze + province silver

Step 3 โ€” Corrections (after comuni are populated):
  istat-territory-corrections โ†’ Flow 1 (corrections)

4.1 Running during development โ€‹

Each flow file accepts --anni as a CLI argument. Flows can be run directly via Python (if the virtualenv is active) or via Docker (recommended):

bash
# Option A: Python directly (venv active)

# Step 1: Ingestion (any order)
python flows/istat/variazioni-amministrative/01_ingestion_flow.py
python flows/istat/province-regioni/01_ingestion_flow.py
python flows/istat/comuni/01_ingestion_flow.py

# Step 2: Transform (strict order โ€” variazioni bronze must exist first)
python flows/istat/province-regioni/02_transform_flow.py
python flows/istat/comuni/02_transform_flow.py

# Step 3: Corrections (after comuni are populated)
python flows/istat/territory-corrections/01_corrections_flow.py

# Specific years only
python flows/istat/province-regioni/01_ingestion_flow.py --anni 2024 2025 2026

# Option B: Docker (recommended)

# Step 1: Ingestion
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/variazioni-amministrative && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/province-regioni && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/comuni && python 01_ingestion_flow.py"

# Step 2: Transform (province-regioni first, then comuni)
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/province-regioni && python 02_transform_flow.py"
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/comuni && python 02_transform_flow.py"

# Step 3: Corrections
docker exec maps-prefect-worker bash -c \
  "cd /flows/istat/territory-corrections && python 01_corrections_flow.py"

4.2 Running via Prefect deployments โ€‹

All ISTAT pipelines are registered on the Prefect server with no automatic schedule; runs are triggered manually. To register or re-register after changing prefect.yaml:

bash
docker exec maps-prefect-worker bash -c "cd /flows/istat/province-regioni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/comuni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/variazioni-amministrative && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/territory-corrections && prefect deploy --all"

To trigger a run via CLI (respect the execution order):

bash
export PREFECT_API_URL=http://localhost:4200/api

# Step 1: Ingestion (any order)
prefect deployment run 'istat-variazioni-ingestion/istat-variazioni-01-ingestion'
prefect deployment run 'istat-province-regioni-ingestion/istat-pr-01-ingestion' \
  --param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-ingestion/istat-comuni-01-ingestion' \
  --param anni='[2024,2025,2026]'

# Step 2: Transform (wait for ingestion; province-regioni before comuni)
prefect deployment run 'istat-province-regioni-transform/istat-pr-02-transform' \
  --param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-transform/istat-comuni-02-transform' \
  --param anni='[2024,2025,2026]'

# Step 3: Corrections (after comuni transform completes)
prefect deployment run 'istat-territory-corrections/istat-territory-corrections-01'

All ISTAT flows are idempotent: re-running a transform on unchanged Bronze produces identical Silver output.

5. Silver layer โ€‹

5.1 Territorial entity model โ€‹

The Silver layer uses a stable surrogate identity model for territories. Each territory has an internal id that never changes; ISTAT codes are stored as identifiers in silver.territory_identifiers. This eliminates record duplication when comuni change province โ€” the settlement is one row, with multiple ISTAT code periods.

TablePurpose
silver.territory_typesLookup of entity types โ€” pre-seeded with 9 types
silver.territoriesMaster registry: stable surrogate id, type_code, label, lifecycle dates (valid_from/valid_to)
silver.territory_identifiersTemporal identifiers: ISTAT codes (scheme='istat'), cadastral codes, fiscal codes, UTS codes
silver.territory_namesTemporal names with language support (Italian and bilingual/local)
silver.territory_containmentsTemporal hierarchical containment (comune โ†’ provincia, with valid_from/valid_to for province changes)
silver.territory_relationshipsPredecessor/successor links from variazioni events (ES, CS, RN, AQES, CECS)
              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
              โ”‚        territory_types       โ”‚
              โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
              โ”‚ PK code VARCHAR(30)          โ”‚
              โ”‚    label                     โ”‚
              โ”‚    hierarchy_level           โ”‚
              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                             โ”‚ 1
                             โ”‚ (type_code)
                             โ”‚ N
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚                      territories                        โ”‚
   โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
   โ”‚ PK id SERIAL                                            โ”‚
   โ”‚ FK type_code        label           subtype             โ”‚
   โ”‚    valid_from DATE  valid_to DATE   end_reason TEXT     โ”‚
   โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
      โ”‚ 1:N      โ”‚ 1:N                          โ”‚ 1:N (member + container)
      โ”‚          โ”‚                              โ”‚
      โ–ผ          โ–ผ                              โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ territory_    โ”‚ โ”‚ territory_    โ”‚  โ”‚    territory_containments       โ”‚
โ”‚ identifiers   โ”‚ โ”‚ names         โ”‚  โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค  โ”‚ FK member_id โ†’ territories      โ”‚
โ”‚FK territory_  โ”‚ โ”‚FK territory_  โ”‚  โ”‚ FK container_id โ†’ territories   โ”‚
โ”‚   id          โ”‚ โ”‚   id          โ”‚  โ”‚    valid_from DATE              โ”‚
โ”‚   scheme      โ”‚ โ”‚   name        โ”‚  โ”‚    valid_to DATE                โ”‚
โ”‚   identifier  โ”‚ โ”‚   language    โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚   valid_from  โ”‚ โ”‚   name_type   โ”‚
โ”‚   valid_to    โ”‚ โ”‚   valid_from  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   fonte       โ”‚ โ”‚   valid_to    โ”‚  โ”‚    territory_relationships      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
                                     โ”‚ FK source_id โ†’ territories      โ”‚
                                     โ”‚ FK dest_id โ†’ territories        โ”‚
                                     โ”‚    classification TEXT          โ”‚
                                     โ”‚    valid_from DATE              โ”‚
                                     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Territories are resolved by ISTAT code via a join on territory_identifiers:

sql
-- Find the territory for ISTAT code '092001'
SELECT t.id, t.label FROM silver.territories t
JOIN silver.territory_identifiers ti ON ti.territory_id = t.id
WHERE t.type_code = 'comune' AND ti.scheme = 'istat' AND ti.identifier = '092001';

-- Which province was Arbus in on 2015-01-01?
SELECT p.label FROM silver.territory_containments tc
JOIN silver.territories p ON p.id = tc.container_id AND p.type_code = 'provincia'
JOIN silver.territories t ON t.id = tc.member_id
WHERE t.type_code = 'comune' AND t.label = 'Arbus'
  AND (tc.valid_from IS NULL OR tc.valid_from <= '2015-01-01')
  AND (tc.valid_to IS NULL OR tc.valid_to > '2015-01-01');

valid_from = NULL means the start of validity is unknown (predates the available data). valid_to = NULL means the territory is still active.

5.2 EAV table โ€‹

All pipelines write to silver.territory_attributes, the generalised EAV table that supports any type of territorial entity.

sql
INSERT INTO silver.territory_attributes
    (territory_id, type_code, attribute, value, data_type, source, valid_from, valid_to)
VALUES
    (42,  'comune',    'popolazione',   '2635',  'integer', 'ISTAT', '2024-01-01', NULL),
    (107, 'provincia', 'n_tabaccherie', '245',   'integer', 'ADM',   '2024-01-01', '2025-12-31'),
    (8,   'sll',       'addetti',       '15200', 'integer', 'ISTAT', '2021-01-01', NULL);
FieldDescriptionExample values
territory_idFK to silver.territories(id) โ€” stable surrogate, survives ISTAT code changesresolved via territory_resolver
type_codeEntity type (denormalised from territories)'comune', 'provincia', 'regione', 'sll', 'slo'
attributeAttribute name'popolazione', 'n_tabaccherie_adm'
valueValue stored as text'2635'
data_typeType hint for casting'integer', 'float', 'string', 'boolean'
sourceData source'ISTAT', 'ADM', 'MinSalute'
valid_fromStart of validity'2024-01-01'
valid_toEnd of validity โ€” NULL means currentNULL

The ON CONFLICT ... DO UPDATE pattern handles re-runs safely:

python
execute_values(
    cur,
    """
    INSERT INTO silver.territory_attributes
        (territory_id, type_code, attribute, value,
         data_type, source, valid_from, valid_to)
    VALUES %s
    ON CONFLICT (territory_id, attribute, valid_from)
    DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
    """,
    eav_records
)

5.3 Local services model โ€‹

silver.territory_attributes stores one row per (territory, attribute, period): it is optimised for aggregated territorial statistics such as population counts or number of pharmacies per comune. It is not designed to store individual point-of-interest records.

Several data sources publish lists of individual local services (pharmacies, hospitals, schools, tobacco shops, and others). Keeping individual records in Silver offers tangible benefits over writing only aggregated counts: the same raw data can be aggregated by comune, province, ASL catchment area, or SLL without re-ingesting from the source; records from multiple datasets can be deduplicated via service_identifiers using external codes such as codice fiscale or codice HSP; and service-specific attributes can be stored without schema changes.

TablePurpose
silver.service_typesRegistry of service types โ€” pre-seeded with 5 types (farmacia, ospedale, scuola, biblioteca, tabaccheria)
silver.servicesOne row per individual service, with resolved territory_id, lifecycle dates, and a uniqueness constraint for idempotent upserts
silver.service_identifiersExternal codes per service (codice_fiscale, codice_ministeriale, codice_hsp, ...) โ€” same identifier means the same logical service across datasets
silver.service_attributesEAV for service-specific metrics (posti_letto, num_studenti, ...) โ€” mirrors territory_attributes but scoped to a service row
silver.service_categoriesMulti-valued typed tags (tipo_ospedale: DEA II livello, livello_scuola: primaria, ...)

Writing services from a pipeline uses the helpers in utils/service_writer.py:

python
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
from utils.service_writer import (
    upsert_service, upsert_service_identifier,
    upsert_service_attribute, upsert_service_category,
)

conn = _get_db_conn()
lookup = build_comune_lookup_from_conn(conn)

with conn.cursor() as cur:
    for record in source_records:
        # 1. Resolve territory
        match = resolve_comune(lookup, sigla=record["provincia"], nome=record["comune"])
        territory_id = match[0] if match else None

        # 2. Upsert the service row
        service_id = upsert_service(
            cur,
            type_code="farmacia",
            name=record["denominazione"],
            address=record["indirizzo"],
            territory_id=territory_id,
            source="AIFA",
            valid_from="2024-01-01",
        )

        # 3. Attach external identifier (for cross-source deduplication)
        upsert_service_identifier(cur, service_id=service_id,
                                  scheme="codice_fiscale", identifier=record["cf"])

        # 4. Store service-specific metric
        upsert_service_attribute(cur, service_id=service_id,
                                 attribute="tipo_farmacia", value=record["tipo"],
                                 data_type="text", source="AIFA", valid_from="2024-01-01")

        # 5. Tag with category
        upsert_service_category(cur, service_id=service_id,
                                scheme="tipo_farmacia", value=record["tipo"])

conn.commit()
conn.close()

Once individual records are in Silver, counts per territory are a simple GROUP BY query that can feed silver.territory_attributes as aggregated statistics.

6. The reference pipeline: ADM tabaccherie โ€‹

The Agenzia Dogane e Monopoli โ€” tabaccherie pipeline is the reference implementation because it is the most complex pipeline in the project: it requires HTML scraping of a JSF portal with Ajax pagination, session management, rate limiting, and territorial code normalisation. A pipeline that downloads a CSV or XLSX file directly follows the same structure but with a much simpler Flow 1.

6.0 Local development โ€‹

Before registering a deployment on Prefect, every flow can be run directly as a Python script. This is the normal way to work: write the flow, run it, fix errors, run it again โ€” without touching the server.

Every flow file must include an if __name__ == "__main__" block with argparse so that parameters can be passed from the command line without editing the source:

python
# 01_ingestion_flow.py
from prefect import flow

@flow
def ingestion_flow(anno: int = 2025):
    ...

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--anno", type=int, default=2025)
    args = parser.parse_args()
    ingestion_flow(anno=args.anno)

Be consistent: always use the same execution method within a pipeline run. Python writes Bronze files to {repo_root}/data/bronze/ on the host; Docker writes to /data/bronze/ inside the container. If you ingest with Python and transform with Docker (or vice versa), the transform will not find the Bronze files. In docker-compose.local.yml the Bronze directory is bind-mounted from ./data, so both methods share the same path โ€” but only if BRONZE_BASE_PATH is set consistently (see ยง1.3).

Option A โ€” Python directly (virtualenv must be active):

bash
source .venv/bin/activate
cd flows/agenzie-dogane/tabacchi/

python 01_ingestion_flow.py              # uses default parameters
python 01_ingestion_flow.py --anno 2024  # override at runtime

Option B โ€” Docker (recommended โ€” matches the production environment exactly):

bash
docker exec maps-prefect-worker bash -c \
  "cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py"

docker exec maps-prefect-worker bash -c \
  "cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py --anno 2024"

If PREFECT_API_URL is set and the server is running, the run appears in the UI with logs and status. If the server is not available, the flow runs as a plain Python script with no server dependency.

The typical iteration is:

  1. Write or modify the flow
  2. Run via Docker (or Python directly for quick edits)
  3. Fix errors, repeat
  4. When working correctly, proceed to deployment (ยง8)

Each flow can be developed and tested independently. Flow 2 can be tested by reading a Bronze file already present on disk, without Flow 1 having completed.

6.0.1 Standard flow boilerplate โ€‹

Every flow file starts with the same four-part header. Copy it verbatim and adjust the parents[N] depth and the FONTE constant.

python
import os
import sys
from pathlib import Path

try:
    from dotenv import load_dotenv
    load_dotenv(Path(__file__).resolve().parents[3] / ".env")   # repo root
except ImportError:
    pass

import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task, get_run_logger

sys.path.insert(0, str(Path(__file__).resolve().parents[2]))    # adds flows/ to import path
from utils.bronze_writer import save_to_bronze, log_ingestion   # and other utils as needed

FONTE = "ente-name"                                             # matches the Bronze directory
_REPO_ROOT = Path(__file__).resolve().parents[3]
BRONZE_BASE = os.getenv("BRONZE_BASE_PATH", str(_REPO_ROOT / "data" / "bronze"))


def _get_db_conn():
    return psycopg2.connect(
        host=os.getenv("POSTGRES_HOST", "localhost"),
        port=int(os.getenv("POSTGRES_PORT", 5432)),
        database=os.getenv("POSTGRES_DB", "maps_db"),
        user=os.getenv("POSTGRES_USER", "maps"),
        password=os.getenv("POSTGRES_PASSWORD", "maps_dev"),
    )

The parents[N] index depends on directory depth: parents[2] from flows/{ente}/{dataset}/ reaches flows/ (where utils/ lives); parents[3] reaches the repo root (where .env lives). _get_db_conn() is defined locally in each flow rather than imported from utils, so each script is self-contained.

6.1 Flow 1 โ€” Ingestion (Bronze layer) โ€‹

Flow 1 downloads raw data from the external source and saves it to the Bronze filesystem via the save_to_bronze utility. It records the operation in bronze.ingestion_log.

python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
from utils.bronze_writer import save_to_bronze, log_ingestion

@task(
    retries=3,
    retry_delay_seconds=[60, 300, 900],
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=24)
)
def download_fonte(anno: int) -> dict:
    url = "https://example.gov.it/data/dataset.xlsx"
    response = httpx.get(url, follow_redirects=True, timeout=300)
    response.raise_for_status()

    file_info = save_to_bronze(
        data=response.content,
        fonte="nome-ente",
        dataset="nome_dataset",
        anno=anno,
        ext="xlsx"
    )
    log_ingestion(file_info, n_record=0, url_source=url)
    return file_info

save_to_bronze creates the directory, writes the raw bytes, computes the SHA256 checksum, and returns a standard dictionary with file_path, size_bytes, checksum, fonte, dataset, and anno. log_ingestion inserts the corresponding row into bronze.ingestion_log.

The Bronze layer always stores the raw source exactly as received: bytes downloaded via HTTP for CSV, XLSX, and PDF files; raw HTML for scraping pipelines. Transformation into data structures happens in Flow 2.

Bronze path convention: {repo_root}/data/bronze/{ente}/{anno}/{dataset}.{ext} Examples: /data/bronze/istat/2024/popolazione_residente.xlsx, /data/bronze/minsalute/2024/asl_boundaries.xlsx

JSF/PrimeFaces portals. Some Italian public portals use JavaServer Faces with Ajax-driven pagination. Each response contains a javax.faces.ViewState token that must be echoed back in the next request. Use a requests.Session to maintain cookies, extract the ViewState with BeautifulSoup, and use rate_limited_request() from utils/rate_limiter.py:

python
from utils.rate_limiter import rate_limited_request

def _get_viewstate(html: str) -> str:
    from bs4 import BeautifulSoup
    vs = BeautifulSoup(html, "html.parser").find("input", {"name": "javax.faces.ViewState"})
    return vs.get("value", "") if vs else ""

session = requests.Session()
response = rate_limited_request(
    url, method="POST",
    data={
        "javax.faces.ViewState": viewstate,
        "javax.faces.partial.ajax": "true",
        "javax.faces.partial.execute": "formBusca:regione",
        "javax.faces.partial.render": "formBusca:provincia",
        "formBusca:regione": regione_code,
    },
    session=session,
    delay=2,
)

For a complete working example see flows/agenzie-dogane/tabacchi/01_ingestion_flow.py.

6.2 Flow 2 โ€” Transform (Silver layer) โ€‹

Flow 2 reads from Bronze, normalises the data, and writes to silver.territory_attributes. This is the most dataset-specific flow; the structure of the source file determines most of the logic.

Standard skeleton:

python
@task
def trasforma_e_carica(file_info: dict, entity_type: str) -> int:
    # 1. Read source file
    if file_info['file_path'].endswith('.xlsx'):
        df = pd.read_excel(file_info['file_path'])
    elif file_info['file_path'].endswith('.csv'):
        df = pd.read_csv(file_info['file_path'], encoding='utf-8-sig', sep=';')

    # 2. Resolve territory_id and build EAV records
    conn = _get_db_conn()
    lookup = _build_territory_lookup(conn)   # (sigla, label_upper) โ†’ (territory_id, type_code)
    eav_records = []
    for _, row in df.iterrows():
        match = lookup.get((row['SIGLA'].upper(), row['COMUNE'].upper()))
        if not match:
            continue
        territory_id, type_code = match
        eav_records.extend([
            (territory_id, type_code, 'attributo_1', str(row['COL_A']),
             'integer', fonte, f'{anno}-01-01', None),
            (territory_id, type_code, 'attributo_2', str(row['COL_B']),
             'string',  fonte, f'{anno}-01-01', None),
        ])

    # 3. UPSERT into Silver EAV
    try:
        with conn.cursor() as cur:
            execute_values(cur, """
                INSERT INTO silver.territory_attributes
                    (territory_id, type_code, attribute, value,
                     data_type, source, valid_from, valid_to)
                VALUES %s
                ON CONFLICT (territory_id, attribute, valid_from)
                DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
            """, eav_records)
        conn.commit()
    finally:
        conn.close()

    return len(eav_records)

Territory resolution: pipelines must resolve territory_id from silver.territories before writing to silver.territory_attributes. Use _build_territory_lookup() (see ยง6.0.1) or the shared territory_resolver.build_comune_lookup_from_conn() to build a (sigla_automobilistica, label_upper) โ†’ (territory_id, type_code) dict. Records whose comune cannot be resolved are logged as warnings and skipped โ€” this is expected when the ISTAT foundation pipelines have not yet run.

6.3 Flow 3 โ€” Data quality โ€‹

Flow 3 validates both the Bronze files and the Silver records. It uses Great Expectations for structured validation and produces a report.

Bronze validation (check the raw file before trusting the transform):

python
import great_expectations as gx

suite = gx.core.ExpectationSuite(name=f"{fonte}_{dataset}")

suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_to_exist",
    kwargs={"column": "colonna_codice_entita"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "colonna_codice_entita", "mostly": 0.99}
))

Silver validation (verify the EAV records are correct):

python
suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "territory_id"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={"column": "valore_cast", "min_value": 0, "max_value": 10_000_000}
))

Fail strategy: log warnings for non-critical failures; block Flow 4 (catalogue) if Silver validation fails critically.

6.4 Flow 4 โ€” Metadata catalogue โ€‹

Flow 4 registers the Bronze files and the Silver table in OpenMetadata, creating a lineage entry from source to EAV.

python
@task
def catalog_bronze_files(bronze_path: str, fonte: str, dataset: str):
    """Scan Bronze directory and register files in OpenMetadata."""
    from metadata.ingestion.ometa.ometa_api import OpenMetadata
    from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
        OpenMetadataConnection
    )
    # ... OpenMetadata ingestion config and lineage creation

Prerequisites: pip install 'openmetadata-ingestion[datalake]' and OPENMETADATA_JWT_TOKEN environment variable set (generate from OpenMetadata UI โ†’ Settings โ†’ Bots โ†’ ingestion-bot).

7. Shared utilities โ€‹

Six modules in flows/utils/ are available to all pipelines.

utils/bronze_writer.py โ€” standardised Bronze write with checksum and log:

python
from utils.bronze_writer import save_to_bronze, log_ingestion

file_info = save_to_bronze(data=response.content, fonte="agenzie-dogane", dataset="tabacchi", anno=2025, ext="html")
log_ingestion(file_info, n_record=0, url_source=url)

utils/rate_limiter.py โ€” global thread-safe rate limiting for web scraping:

python
from utils.rate_limiter import rate_limited_request

response = rate_limited_request(url, method="POST", delay=2)

utils/html_parser.py โ€” shared HTML table parsing:

python
from utils.html_parser import parse_table, extract_form_data

data = parse_table(soup, table_id="results")

utils/situas_client.py โ€” ISTAT SITUAS API wrapper:

python
from utils.situas_client import SituasClient

client = SituasClient()
catalogue = client.get_catalogue()              # list of 74 datasets
records = client.get_all_records(pfun=64, pdata="01/01/2024")   # paginated
records = client.get_records(pfun=61, pdata="01/01/2024")       # single-page

get_all_records() handles pagination by iterating pages of 500 records. Note that some pfuns โ€” notably pfun=61 (comuni) โ€” return all results in a single response and do not support row-offset pagination; use get_records() directly for those.

utils/territory_resolver.py โ€” shared (sigla, comune) โ†’ territory_id lookup:

python
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune

# Build the lookup once per flow run (one DB query)
lookup = build_comune_lookup_from_conn(conn)

# Resolve each record โ€” O(1) dict lookup
match = resolve_comune(lookup, sigla="MI", nome="Milano")
if match:
    territory_id, type_code = match

The lookup searches both silver.territories.label and silver.territory_names, which includes alternate names, bilingual forms, and ADM alias names. If resolve_comune returns None, the territory could not be matched โ€” log a warning and continue.

utils/service_writer.py โ€” idempotent upsert helpers for silver.services and its satellite tables:

python
from utils.service_writer import (
    upsert_service,             # โ†’ int (service id)
    upsert_service_identifier,  # attach an external code
    upsert_service_attribute,   # store a typed metric (EAV)
    upsert_service_category,    # attach a categorical tag
)

All functions accept an open psycopg2 cursor. The caller manages the connection and calls conn.commit(). See ยง5.3 for usage examples.

Use these utilities rather than reimplementing common patterns in each pipeline.

8. Deployment with Prefect โ€‹

A deployment turns a flow into a scheduled, monitored resource on the Prefect server. The developer defines prefect.yaml once; subsequent executions happen through the UI or CLI without touching the code.

8.1 Structure of prefect.yaml โ€‹

Each pipeline defines four deployments in prefect.yaml. The pull section is required in all pipelines: it sets the working directory and installs flow dependencies at runtime. This is necessary because the prefect-worker container uses the base image prefecthq/prefect:3-python3.11, which does not pre-install pipeline-specific packages such as psycopg2-binary.

yaml
name: {ente}-{dataset}
prefect-version: "3.*"

work_pool:
  name: default-pool

pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /flows/{ente}/{dataset}
  - prefect.deployments.steps.pip_install_requirements:
      directory: /flows/{ente}/{dataset}
      requirements_file: requirements.txt

deployments:
  - name: {ente}-{dataset}-01-ingestion      # Flow 1 โ€” download โ†’ Bronze
    entrypoint: 01_ingestion_flow.py:ingestion_flow
    schedules: []                            # no automatic schedule โ€” trigger manually
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

  - name: {ente}-{dataset}-02-transform      # Flow 2 โ€” Bronze โ†’ Silver
    entrypoint: 02_transform_flow.py:transform_flow
    schedules: []
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

  - name: {ente}-{dataset}-03-quality        # Flow 3 โ€” quality monitoring
    entrypoint: 03_data_quality_flow.py:quality_flow
    schedules: []
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

  - name: {ente}-{dataset}-04-metadata       # Flow 4 โ€” catalogue update
    entrypoint: 04_metadata_flow.py:metadata_flow
    schedules: []
    parameters:
      anno: 2025
    work_pool:
      name: default-pool

The numbered prefix (01, 02, ...) keeps deployments in execution order in the Prefect UI. schedules: [] explicitly removes any schedule โ€” omitting the key does not delete schedules that were previously registered.

8.2 Registering deployments โ€‹

Deployments must be registered from inside the prefect-worker container, not from the host shell. This is because Prefect records the working directory path at registration time and uses it verbatim at execution time. The worker executes flows at /flows/{ente}/{dataset}/ (the Docker volume mount); registering from the host would record the host path which does not exist inside the container.

bash
docker exec maps-prefect-worker bash -c \
  "cd /flows/{ente}/{dataset} && prefect deploy --all"

After registration, deployments are visible at http://localhost:4200/deployments.

8.3 When to redeploy โ€‹

The Prefect worker reads code from the filesystem on each execution:

  • Flow code changes (logic, bug fixes): do not require prefect deploy --all. The worker will execute the updated file on the next run.
  • prefect.yaml changes (new deployments, schedule changes, new parameters, pull steps): require prefect deploy --all run from inside the container.
  • Removing a schedule: set schedules: [] in prefect.yaml and re-register. Omitting the key does not remove an existing schedule.

8.4 Running a deployment โ€‹

From the UI โ€” go to http://localhost:4200/deployments, click a deployment, then "Run". The parameter form is pre-filled with defaults and can be edited before each run.

From the CLI:

bash
export PREFECT_API_URL=http://localhost:4200/api

# Run with default parameters
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion'

# Override parameters at runtime
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion' \
  --param anno=2024

# List all registered deployments
prefect deployment ls

The CLI name follows the pattern {flow-name}/{deployment-name}. The flow name comes from the @flow(name=...) decorator; the deployment name from prefect.yaml. Check exact names with prefect deployment ls or the UI.

Do not start a local Prefect worker (prefect worker start) when using the Docker stack. The maps-prefect-worker container already polls default-pool. A competing local worker would pick up runs but fail because the /flows/ paths do not exist on the host.

Monitor: Prefect UI at http://localhost:4200.

9. Team workflow โ€‹

Each of the four flows can be developed and tested independently. A natural split for a two-developer team working on the same pipeline:

Developer ADeveloper B
Flow 1: Ingestion (source-specific, often scraping or API)Flow 2: Transform (EAV normalisation, territory resolution)
Flow 4: Metadata catalogueFlow 3: Data quality (GX expectations, outlier detection)
README.md + prefect.yamltests/test_flow.py

Handoff criterion between flows: Flow 1 is considered complete when at least one Bronze file is written to the correct path and its checksum is recorded in bronze.ingestion_log. Flow 2 can begin with that file independently of Flow 1 being fully finished.

Tracking on GitLab: open one issue per flow. Link them to the parent pipeline issue. Close each sub-issue as the flow reaches its Bronze/Silver acceptance criteria (see ยง10).

10. Per-phase checklists โ€‹

Flow 1 (Ingestion) complete when: โ€‹

  • File downloaded to correct Bronze path (/data/bronze/{ente}/{anno}/)
  • SHA256 checksum recorded in bronze.ingestion_log
  • Retry logic in place (retries=3 with backoff)
  • README.md documents source URL, format, and update frequency

Flow 2 (Transform) complete when: โ€‹

  • Records inserted into silver.territory_attributes with the correct type_code
  • ON CONFLICT DO UPDATE in place (idempotent re-runs)
  • territory_id resolved for all records (unresolved comuni logged and counted)
  • Re-run on unchanged Bronze produces identical Silver output

Flow 3 (Quality) complete when: โ€‹

  • Bronze validation covers: schema, null rate, value formats
  • Silver validation covers: territory_id presence, value ranges, temporal fields
  • Report produced on every run (pass or fail)
  • Failure threshold defined and documented

Flow 4 (Metadata) complete when: โ€‹

  • Bronze files visible in OpenMetadata
  • Lineage Bronze โ†’ Silver created
  • Schema extracted and available in catalogue interface

11. Common errors โ€‹

ON CONFLICT fails with "no unique constraint": the UNIQUE constraint on (territory_id, attribute, valid_from) must be present. Verify with \d silver.territory_attributes in psql.

territory_id is NULL / no records resolved: the ISTAT foundation pipelines (flows/istat/) must run before any EAV pipeline. If _build_territory_lookup() returns an empty dict, silver.territories is not yet populated.

valid_from causes a duplicate key: two rows for the same entity/attribute/year with different valid_from dates. Use a fixed date (e.g. f"{anno}-01-01") rather than CURRENT_DATE to keep runs idempotent.

Flow 2 reads stale Bronze file: Prefect task caching may return a cached result. Either disable caching on the download task for that run, or delete the cache entry from the Prefect UI.

OpenMetadata JWT expired: rotate the token via the OpenMetadata UI (Settings โ†’ Bots) and update the OPENMETADATA_JWT_TOKEN environment variable in the prefect-worker container.

Flow run crashes with FileNotFoundError: /flows/{ente}/{dataset}: the deployment was registered from the host shell instead of from inside the container. Re-register using docker exec maps-prefect-worker bash -c "cd /flows/... && prefect deploy --all" (see ยง8.2).

Flow run crashes with ModuleNotFoundError: No module named 'psycopg2': the pull section is missing or incomplete in prefect.yaml. Ensure both set_working_directory and pip_install_requirements steps are present (see ยง8.1), then re-register the deployment.

Flow run is scheduled but never starts: a local worker (prefect worker start) is running alongside the Docker worker and picks up the run, then fails. Stop any local worker processes and let maps-prefect-worker handle execution exclusively.

get_all_records() raises JSONDecodeError: the pfun returns all records in a single response and does not support row-offset pagination via pdatada. Use get_records() directly (see ยง7 โ€” situas_client.py).