๐ฌ๐ง English
๐ฌ๐ง English
Appearance
๐ฌ๐ง English
๐ฌ๐ง English
Appearance
Deliverable D2.2 โ Appendix: Operational guide for building and maintaining ETL pipelines in the MAPS Data Lake.
This guide is intended for developers joining the project who need to build a new data pipeline or maintain an existing one. It describes the standard workflow, conventions, and patterns used across all pipelines in the MAPS infrastructure. The reference implementation throughout this guide is the Agenzia Dogane e Monopoli โ tabaccherie pipeline; the complete source code is available at flows/agenzie-dogane/tabacchi/ in the repository.
To start the local development environment you need Python 3.11, Docker, and Docker Compose. The following steps configure the database, the Prefect server, and the Python interpreter in about ten minutes.
git clone https://gitlab.openpolis.io/openpolis/gst/gst-maps-pipelines.git
cd gst-maps-pipelinescp .env.example .envThe default values in .env.example work without modification for local development. The one variable worth checking is BRONZE_BASE_PATH:
# .env.example โ commented out, not needed in local development
# BRONZE_BASE_PATH=/data/bronzeIn local development, flows automatically compute the Bronze path as {repo_root}/data/bronze (derived from the file's location). In production, inside the prefect-worker container, set BRONZE_BASE_PATH=/data/bronze โ the Docker volume is already mounted there.
Flows load .env automatically at startup via python-dotenv (included in requirements-dev.txt): there is no need to manually export variables in the shell before running a script.
docker compose -f docker-compose.local.yml up -dThis starts PostgreSQL 17 + PostGIS 3.5, the Prefect server, and OpenMetadata (with its dedicated database and Elasticsearch). The MAPS database schema (bronze, silver, gold schemas, tables, and indexes) is created automatically on first startup via postgres/init-scripts/01-init-schemas.sql. Verify that the services are running:
docker compose -f docker-compose.local.yml psIf you modify docker-compose.local.yml or .env after the containers are already running, docker compose restart does not re-read environment variables. Use instead:
docker compose -f docker-compose.local.yml up -d --force-recreatepython3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txtThis installs Prefect, psycopg2, requests, BeautifulSoup, and python-dotenv. The virtualenv must be activated each time you open a new terminal session (source .venv/bin/activate).
export PREFECT_API_URL=http://localhost:4200/api
prefect work-pool create default-pool --type processThe default-pool work pool is the execution pool referenced in each pipeline's prefect.yaml.
admin@open-metadata.org / admin)psql -h localhost -U maps -d maps_db (password: maps_dev)Verify that the schemas and tables were created correctly:
-- expected schemas: bronze, silver, gold
\dn
-- expected tables: bronze.ingestion_log, silver.territory_types,
-- silver.territories, silver.territory_containments,
-- silver.territory_identifiers, silver.territory_names,
-- silver.territory_attributes,
-- silver.service_types, silver.services, silver.service_identifiers,
-- silver.service_attributes, silver.service_categories
\dt bronze.*
\dt silver.*The environment is ready. The Bronze layer is a Docker volume (bronze-data) mounted at /data inside the containers; raw files downloaded by flows will be stored in that volume.
OpenMetadata takes a few minutes to complete its database migration on first startup; wait for the maps-openmetadata container to report a healthy status before opening the interface.
Prefect follows a client/server architecture. The server (container maps-prefect-server) tracks flow runs, stores logs, and serves the UI at http://localhost:4200. The worker (container maps-prefect-worker) is the process that actually executes flows: it polls the server for work assigned to its pool (default-pool) and launches each flow in a subprocess.
Developers interact with the server through the Prefect client โ the CLI and Python SDK โ whose target is configured by the PREFECT_API_URL environment variable. During local development the client points to the server running in Docker; in production it will point to the remote server, with no changes to the flow code itself.
A deployment associates a flow with a worker pool and defines its execution parameters (schedule, default parameter values). Once registered, a flow can be triggered from the UI or CLI without the developer needing a local process running.
sequenceDiagram
participant Dev as Developer
participant Server as Prefect Server
participant Worker as Worker (default-pool)
Dev->>Server: register deployment
Note over Server: stores flow config,
schedule, parameters
Dev->>Server: trigger run (UI or CLI)
Server->>Worker: dispatch work item
Worker->>Worker: execute flow subprocess
Worker-->>Server: report state + logs
Every MAPS pipeline follows the Medallion pattern: raw files are downloaded to the Bronze layer, transformed and validated in the Silver layer, and finally exposed in the Gold layer. Pipelines are orchestrated by Prefect and follow a standard four-flow structure; a fifth phase integrates the dataset into the Datastore.
External source โ Flow 1: Ingestion โ Bronze (filesystem)
โ Flow 2: Transform โ Silver EAV (PostgreSQL)
โ Flow 3: Data quality โ Quality tag on OpenMetadata
โ Flow 4: Metadata โ lineage on OpenMetadata
โ Phase 5: Datastore โ catalogue + Gold tables (ยง6.5)The four flows are independent and each can be re-run without executing the preceding ones: if the transformation logic changes, only Flow 2 needs to be re-run, with no need to re-download the source data. The fifth phase โ Datastore integration โ is not a pipeline flow and is carried out by the verification responsible (ยง6.5).
Each pipeline lives in a dedicated subdirectory under flows/:
flows/
โโโ utils/ # Shared modules for all pipelines (excerpt)
โ โโโ bronze_writer.py # save_to_bronze(), log_ingestion()
โ โโโ bronze_readers.py # find_latest_anno(), read_bronze_tabular()
โ โโโ rate_limiter.py # rate_limited_request()
โ โโโ html_parser.py # parse_table(), extract_form_data()
โ โโโ comune_extract.py # extract (province, comune) from tabular/JSON
โ โโโ territory_resolver.py # build_comune_lookup_from_conn(), resolve_comune()
โ โโโ territory_attributes_writer.py # upsert EAV into silver.territory_attributes
โ โโโ service_writer.py # upsert_service(), upsert_service_identifier(), ...
โ โโโ situas_client.py # SituasClient โ ISTAT SITUAS API
โ โโโ istat_esploradati.py # esploradati client (SDMX + onlyFile)
โ โโโ istat_aster.py # ASTER client
โ โโโ om_tag_writer.py # patch_classification_tag() โ Quality tag
โ โโโ om_lineage_writer.py # Bronze โ Silver lineage in OpenMetadata
โ โโโ search_index.py # refresh of the catalogue materialised views
โโโ {ente}/{dataset}/
โโโ 01_ingestion_flow.py # Flow 1: download โ Bronze
โโโ 02_transform_flow.py # Flow 2: Bronze โ Silver
โโโ 03_data_quality_flow.py # Flow 3: validation
โโโ 04_metadata_flow.py # Flow 4: OpenMetadata catalogue
โโโ prefect.yaml # Deployment definitions (all 4 flows)
โโโ requirements.txt # Python dependencies
โโโ README.md # Pipeline documentation
โโโ docs/
โ โโโ DEPLOY_GUIDE.md # Step-by-step deployment instructions
โ โโโ KNOWN_ISSUES.md # History of issues and their solutions
โโโ tests/
โโโ test_flow.py # Unit testsNaming conventions:
agenzie-dogane/tabacchi/)01_, 02_, 03_, 04_The flows/istat/ directory contains the foundation pipelines that populate the Silver territorial model. All other data pipelines depend on this data being present.
| Pipeline | Directory | Source | Scope |
|---|---|---|---|
| Province, regioni, ripartizioni | flows/istat/province-regioni/ | SITUAS pfun=64/68/71 + variazioni pfun=106โ108, 112โ114 | ~107 province, 20 regioni, 5 ripartizioni per year (2000โtoday) |
| Comuni | flows/istat/comuni/ | SITUAS pfun=61 + variazioni pfun=129 | ~7,900 comuni per year (2000โtoday) |
| Variazioni amministrative | flows/istat/variazioni-amministrative/ | SITUAS pfun=106โ108, 112โ114, 98, 104, 105, 129 | Ingestion only โ downloads full history from 1861 into Bronze |
| Territory corrections | flows/istat/territory-corrections/ | Static data โ ADM alias names and data-quality fixes | Idempotent flow; must run after comuni are populated |
Run order matters. Variazioni processing is integrated into the province-regioni and comuni transform flows. The variazioni Bronze data must be ingested first. The territory-corrections flow must run last, after comuni territories are populated.
Step 1 โ Ingestion (any order, can run in parallel):
istat-province-regioni โ Flow 1 (ingestion)
istat-comuni โ Flow 1 (ingestion)
istat-variazioni โ Flow 1 (ingestion) โ must complete before Step 2
Step 2 โ Transform (strict order):
istat-province-regioni โ Flow 2 (transform) โ reads province bronze + variazioni bronze
istat-comuni โ Flow 2 (transform) โ reads comuni bronze + variazioni bronze + province silver
Step 3 โ Corrections (after comuni are populated):
istat-territory-corrections โ Flow 1 (corrections)Each flow file accepts --anni as a CLI argument. Flows can be run directly via Python (if the virtualenv is active) or via Docker (recommended):
# Option A: Python directly (venv active)
# Step 1: Ingestion (any order)
python flows/istat/variazioni-amministrative/01_ingestion_flow.py
python flows/istat/province-regioni/01_ingestion_flow.py
python flows/istat/comuni/01_ingestion_flow.py
# Step 2: Transform (strict order โ variazioni bronze must exist first)
python flows/istat/province-regioni/02_transform_flow.py
python flows/istat/comuni/02_transform_flow.py
# Step 3: Corrections (after comuni are populated)
python flows/istat/territory-corrections/01_corrections_flow.py
# Specific years only
python flows/istat/province-regioni/01_ingestion_flow.py --anni 2024 2025 2026
# Option B: Docker (recommended)
# Step 1: Ingestion
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/variazioni-amministrative && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/province-regioni && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/comuni && python 01_ingestion_flow.py"
# Step 2: Transform (province-regioni first, then comuni)
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/province-regioni && python 02_transform_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/comuni && python 02_transform_flow.py"
# Step 3: Corrections
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/territory-corrections && python 01_corrections_flow.py"All ISTAT pipelines are registered on the Prefect server with no automatic schedule; runs are triggered manually. To register or re-register after changing prefect.yaml:
docker exec maps-prefect-worker bash -c "cd /flows/istat/province-regioni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/comuni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/variazioni-amministrative && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/territory-corrections && prefect deploy --all"To trigger a run via CLI (respect the execution order):
export PREFECT_API_URL=http://localhost:4200/api
# Step 1: Ingestion (any order)
prefect deployment run 'istat-variazioni-ingestion/istat-variazioni-01-ingestion'
prefect deployment run 'istat-province-regioni-ingestion/istat-pr-01-ingestion' \
--param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-ingestion/istat-comuni-01-ingestion' \
--param anni='[2024,2025,2026]'
# Step 2: Transform (wait for ingestion; province-regioni before comuni)
prefect deployment run 'istat-province-regioni-transform/istat-pr-02-transform' \
--param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-transform/istat-comuni-02-transform' \
--param anni='[2024,2025,2026]'
# Step 3: Corrections (after comuni transform completes)
prefect deployment run 'istat-territory-corrections/istat-territory-corrections-01'All ISTAT flows are idempotent: re-running a transform on unchanged Bronze produces identical Silver output.
The Silver layer uses a stable surrogate identity model for territories. Each territory has an internal id that never changes; ISTAT codes are stored as identifiers in silver.territory_identifiers. This eliminates record duplication when comuni change province โ the settlement is one row, with multiple ISTAT code periods.
| Table | Purpose |
|---|---|
silver.territory_types | Lookup of entity types โ pre-seeded with 9 types |
silver.territories | Master registry: stable surrogate id, type_code, label, lifecycle dates (valid_from/valid_to) |
silver.territory_identifiers | Temporal identifiers: ISTAT codes (scheme='istat'), cadastral codes, fiscal codes, UTS codes |
silver.territory_names | Temporal names with language support (Italian and bilingual/local) |
silver.territory_containments | Temporal hierarchical containment (comune โ provincia, with valid_from/valid_to for province changes) |
silver.territory_relationships | Predecessor/successor links from variazioni events (ES, CS, RN, AQES, CECS) |
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territory_types โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ PK code VARCHAR(30) โ
โ label โ
โ hierarchy_level โ
โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ
โ 1
โ (type_code)
โ N
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territories โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ PK id SERIAL โ
โ FK type_code label subtype โ
โ valid_from DATE valid_to DATE end_reason TEXT โ
โโโโฌโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ 1:N โ 1:N โ 1:N (member + container)
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territory_ โ โ territory_ โ โ territory_containments โ
โ identifiers โ โ names โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโค โ FK member_id โ territories โ
โFK territory_ โ โFK territory_ โ โ FK container_id โ territories โ
โ id โ โ id โ โ valid_from DATE โ
โ scheme โ โ name โ โ valid_to DATE โ
โ identifier โ โ language โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ valid_from โ โ name_type โ
โ valid_to โ โ valid_from โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ fonte โ โ valid_to โ โ territory_relationships โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ FK source_id โ territories โ
โ FK dest_id โ territories โ
โ classification TEXT โ
โ valid_from DATE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโTerritories are resolved by ISTAT code via a join on territory_identifiers:
-- Find the territory for ISTAT code '092001'
SELECT t.id, t.label FROM silver.territories t
JOIN silver.territory_identifiers ti ON ti.territory_id = t.id
WHERE t.type_code = 'comune' AND ti.scheme = 'istat' AND ti.identifier = '092001';
-- Which province was Arbus in on 2015-01-01?
SELECT p.label FROM silver.territory_containments tc
JOIN silver.territories p ON p.id = tc.container_id AND p.type_code = 'provincia'
JOIN silver.territories t ON t.id = tc.member_id
WHERE t.type_code = 'comune' AND t.label = 'Arbus'
AND (tc.valid_from IS NULL OR tc.valid_from <= '2015-01-01')
AND (tc.valid_to IS NULL OR tc.valid_to > '2015-01-01');valid_from = NULL means the start of validity is unknown (predates the available data). valid_to = NULL means the territory is still active.
All pipelines write to silver.territory_attributes, the generalised EAV table that supports any type of territorial entity.
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value, data_type, source, valid_from, valid_to)
VALUES
(42, 'comune', 'popolazione', '2635', 'integer', 'ISTAT', '2024-01-01', NULL),
(107, 'provincia', 'n_tabaccherie', '245', 'integer', 'ADM', '2024-01-01', '2025-12-31'),
(8, 'sll', 'addetti', '15200', 'integer', 'ISTAT', '2021-01-01', NULL);| Field | Description | Example values |
|---|---|---|
territory_id | FK to silver.territories(id) โ stable surrogate, survives ISTAT code changes | resolved via territory_resolver |
type_code | Entity type (denormalised from territories) | 'comune', 'provincia', 'regione', 'sll', 'slo' |
attribute | Attribute name | 'popolazione', 'n_tabaccherie_adm' |
value | Value stored as text | '2635' |
data_type | Type hint for casting | 'integer', 'float', 'string', 'boolean' |
source | Data source | 'ISTAT', 'ADM', 'MinSalute' |
valid_from | Start of validity | '2024-01-01' |
valid_to | End of validity โ NULL means current | NULL |
The ON CONFLICT ... DO UPDATE pattern handles re-runs safely:
execute_values(
cur,
"""
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value,
data_type, source, valid_from, valid_to)
VALUES %s
ON CONFLICT (territory_id, attribute, valid_from)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""",
eav_records
)silver.territory_attributes stores one row per (territory, attribute, period): it is optimised for aggregated territorial statistics such as population counts or number of pharmacies per comune. It is not designed to store individual point-of-interest records.
Several data sources publish lists of individual local services (pharmacies, hospitals, schools, tobacco shops, and others). Keeping individual records in Silver offers tangible benefits over writing only aggregated counts: the same raw data can be aggregated by comune, province, ASL catchment area, or SLL without re-ingesting from the source; records from multiple datasets can be deduplicated via service_identifiers using external codes such as codice fiscale or codice HSP; and service-specific attributes can be stored without schema changes.
| Table | Purpose |
|---|---|
silver.service_types | Registry of service types โ pre-seeded with 5 types (farmacia, ospedale, scuola, biblioteca, tabaccheria) |
silver.services | One row per individual service, with resolved territory_id, lifecycle dates, and a uniqueness constraint for idempotent upserts |
silver.service_identifiers | External codes per service (codice_fiscale, codice_ministeriale, codice_hsp, ...) โ same identifier means the same logical service across datasets |
silver.service_attributes | EAV for service-specific metrics (posti_letto, num_studenti, ...) โ mirrors territory_attributes but scoped to a service row |
silver.service_categories | Multi-valued typed tags (tipo_ospedale: DEA II livello, livello_scuola: primaria, ...) |
Writing services from a pipeline uses the helpers in utils/service_writer.py:
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
from utils.service_writer import (
upsert_service, upsert_service_identifier,
upsert_service_attribute, upsert_service_category,
)
conn = _get_db_conn()
lookup = build_comune_lookup_from_conn(conn)
with conn.cursor() as cur:
for record in source_records:
# 1. Resolve territory
match = resolve_comune(lookup, sigla=record["provincia"], nome=record["comune"])
territory_id = match[0] if match else None
# 2. Upsert the service row
service_id = upsert_service(
cur,
type_code="farmacia",
name=record["denominazione"],
address=record["indirizzo"],
territory_id=territory_id,
source="AIFA",
valid_from="2024-01-01",
)
# 3. Attach external identifier (for cross-source deduplication)
upsert_service_identifier(cur, service_id=service_id,
scheme="codice_fiscale", identifier=record["cf"])
# 4. Store service-specific metric
upsert_service_attribute(cur, service_id=service_id,
attribute="tipo_farmacia", value=record["tipo"],
data_type="text", source="AIFA", valid_from="2024-01-01")
# 5. Tag with category
upsert_service_category(cur, service_id=service_id,
scheme="tipo_farmacia", value=record["tipo"])
conn.commit()
conn.close()Once individual records are in Silver, counts per territory are a simple GROUP BY query that can feed silver.territory_attributes as aggregated statistics.
The Agenzia Dogane e Monopoli โ tabaccherie pipeline is the reference implementation because it is the most complex pipeline in the project: it requires HTML scraping of a JSF portal with Ajax pagination, session management, rate limiting, and territorial code normalisation. A pipeline that downloads a CSV or XLSX file directly follows the same structure but with a much simpler Flow 1.
Before registering a deployment on Prefect, every flow can be run directly as a Python script. This is the normal way to work: write the flow, run it, fix errors, run it again โ without touching the server.
Every flow file must include an if __name__ == "__main__" block with argparse so that parameters can be passed from the command line without editing the source:
# 01_ingestion_flow.py
from prefect import flow
@flow
def ingestion_flow(anno: int = 2025):
...
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--anno", type=int, default=2025)
args = parser.parse_args()
ingestion_flow(anno=args.anno)Be consistent: always use the same execution method within a pipeline run. Python writes Bronze files to {repo_root}/data/bronze/ on the host; Docker writes to /data/bronze/ inside the container. If you ingest with Python and transform with Docker (or vice versa), the transform will not find the Bronze files. In docker-compose.local.yml the Bronze directory is bind-mounted from ./data, so both methods share the same path โ but only if BRONZE_BASE_PATH is set consistently (see ยง1.3).
Option A โ Python directly (virtualenv must be active):
source .venv/bin/activate
cd flows/agenzie-dogane/tabacchi/
python 01_ingestion_flow.py # uses default parameters
python 01_ingestion_flow.py --anno 2024 # override at runtimeOption B โ Docker (recommended โ matches the production environment exactly):
docker exec maps-prefect-worker bash -c \
"cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py --anno 2024"If PREFECT_API_URL is set and the server is running, the run appears in the UI with logs and status. If the server is not available, the flow runs as a plain Python script with no server dependency.
The typical iteration is:
Each flow can be developed and tested independently. Flow 2 can be tested by reading a Bronze file already present on disk, without Flow 1 having completed.
Every flow file starts with the same four-part header. Copy it verbatim and adjust the parents[N] depth and the FONTE constant.
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[3] / ".env") # repo root
except ImportError:
pass
import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task, get_run_logger
sys.path.insert(0, str(Path(__file__).resolve().parents[2])) # adds flows/ to import path
from utils.bronze_writer import save_to_bronze, log_ingestion # and other utils as needed
FONTE = "ente-name" # matches the Bronze directory
_REPO_ROOT = Path(__file__).resolve().parents[3]
BRONZE_BASE = os.getenv("BRONZE_BASE_PATH", str(_REPO_ROOT / "data" / "bronze"))
def _get_db_conn():
return psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", 5432)),
database=os.getenv("POSTGRES_DB", "maps_db"),
user=os.getenv("POSTGRES_USER", "maps"),
password=os.getenv("POSTGRES_PASSWORD", "maps_dev"),
)The parents[N] index depends on directory depth: parents[2] from flows/{ente}/{dataset}/ reaches flows/ (where utils/ lives); parents[3] reaches the repo root (where .env lives). _get_db_conn() is defined locally in each flow rather than imported from utils, so each script is self-contained.
Flow 1 downloads raw data from the external source and saves it to the Bronze filesystem via the save_to_bronze utility. It records the operation in bronze.ingestion_log.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
from utils.bronze_writer import save_to_bronze, log_ingestion
@task(
retries=3,
retry_delay_seconds=[60, 300, 900],
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24)
)
def download_fonte(anno: int) -> dict:
url = "https://example.gov.it/data/dataset.xlsx"
response = httpx.get(url, follow_redirects=True, timeout=300)
response.raise_for_status()
file_info = save_to_bronze(
data=response.content,
fonte="nome-ente",
dataset="nome_dataset",
anno=anno,
ext="xlsx"
)
log_ingestion(file_info, n_record=0, url_source=url)
return file_infosave_to_bronze creates the directory, writes the raw bytes, computes the SHA256 checksum, and returns a standard dictionary with file_path, size_bytes, checksum, fonte, dataset, and anno. log_ingestion inserts the corresponding row into bronze.ingestion_log.
The Bronze layer always stores the raw source exactly as received: bytes downloaded via HTTP for CSV, XLSX, and PDF files; raw HTML for scraping pipelines. Transformation into data structures happens in Flow 2.
Bronze path convention: {repo_root}/data/bronze/{ente}/{anno}/{dataset}.{ext} Examples: /data/bronze/istat/2024/popolazione_residente.xlsx, /data/bronze/minsalute/2024/asl_boundaries.xlsx
JSF/PrimeFaces portals. Some Italian public portals use JavaServer Faces with Ajax-driven pagination. Each response contains a javax.faces.ViewState token that must be echoed back in the next request. Use a requests.Session to maintain cookies, extract the ViewState with BeautifulSoup, and use rate_limited_request() from utils/rate_limiter.py:
from utils.rate_limiter import rate_limited_request
def _get_viewstate(html: str) -> str:
from bs4 import BeautifulSoup
vs = BeautifulSoup(html, "html.parser").find("input", {"name": "javax.faces.ViewState"})
return vs.get("value", "") if vs else ""
session = requests.Session()
response = rate_limited_request(
url, method="POST",
data={
"javax.faces.ViewState": viewstate,
"javax.faces.partial.ajax": "true",
"javax.faces.partial.execute": "formBusca:regione",
"javax.faces.partial.render": "formBusca:provincia",
"formBusca:regione": regione_code,
},
session=session,
delay=2,
)For a complete working example see flows/agenzie-dogane/tabacchi/01_ingestion_flow.py.
Flow 2 reads from Bronze, normalises the data, and writes to silver.territory_attributes. This is the most dataset-specific flow; the structure of the source file determines most of the logic.
Standard skeleton:
@task
def trasforma_e_carica(file_info: dict, entity_type: str) -> int:
# 1. Read source file
if file_info['file_path'].endswith('.xlsx'):
df = pd.read_excel(file_info['file_path'])
elif file_info['file_path'].endswith('.csv'):
df = pd.read_csv(file_info['file_path'], encoding='utf-8-sig', sep=';')
# 2. Resolve territory_id and build EAV records
conn = _get_db_conn()
lookup = _build_territory_lookup(conn) # (sigla, label_upper) โ (territory_id, type_code)
eav_records = []
for _, row in df.iterrows():
match = lookup.get((row['SIGLA'].upper(), row['COMUNE'].upper()))
if not match:
continue
territory_id, type_code = match
eav_records.extend([
(territory_id, type_code, 'attributo_1', str(row['COL_A']),
'integer', fonte, f'{anno}-01-01', None),
(territory_id, type_code, 'attributo_2', str(row['COL_B']),
'string', fonte, f'{anno}-01-01', None),
])
# 3. UPSERT into Silver EAV
try:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value,
data_type, source, valid_from, valid_to)
VALUES %s
ON CONFLICT (territory_id, attribute, valid_from)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""", eav_records)
conn.commit()
finally:
conn.close()
return len(eav_records)Territory resolution: pipelines must resolve territory_id from silver.territories before writing to silver.territory_attributes. Use _build_territory_lookup() (see ยง6.0.1) or the shared territory_resolver.build_comune_lookup_from_conn() to build a (sigla_automobilistica, label_upper) โ (territory_id, type_code) dict. Records whose comune cannot be resolved are logged as warnings and skipped โ this is expected when the ISTAT foundation pipelines have not yet run.
Flow 3 validates both the Bronze files and the Silver records with a custom framework (Great Expectations is not used) and publishes the outcome as a Qualitร tag on OpenMetadata. The Bronze checks run with DuckDB directly on the raw file; the Silver checks run with SQL queries on the EAV records.
Bronze validation (DuckDB on the raw file): format and extension, minimum size, expected structure and columns, presence of the column identifying the territory, and row count.
import duckdb
n_rows = duckdb.connect().execute(
"SELECT count(*) FROM read_csv_auto(?)", [file_path]
).fetchone()[0]
assert n_rows >= MIN_ROWS, f"Bronze below threshold: {n_rows}"Silver validation (SQL on the EAV records): existence of the destination table, count of written records, territorial coverage against active municipalities, and referential integrity towards silver.territories.
-- count of records written for the source
SELECT count(*) FROM silver.territory_attributes WHERE source = %(source)s;
-- referential integrity (expected: 0 orphan records)
SELECT count(*)
FROM silver.territory_attributes ta
LEFT JOIN silver.territories t ON t.id = ta.territory_id
WHERE ta.source = %(source)s AND t.id IS NULL;Publishing the outcome. At the end, the flow writes the Qualitร tag on the Bronze entity in OpenMetadata via patch_classification_tag() in utils/om_tag_writer.py. The tag takes four values, two per layer: Qualitร .bronze-validato / Qualitร .bronze-in-revisione for the checks on the raw file, Qualitร .silver-validato / Qualitร .silver-in-revisione for the checks on the EAV records. The validato value is written when all the checks of the respective layer pass the thresholds; otherwise the dataset remains in-revisione.
from utils.om_tag_writer import patch_classification_tag
# writes Qualitร =silver-validato on the Bronze entity (exact signature in om_tag_writer.py)
patch_classification_tag(fqn_bronze, "Qualitร ", "silver-validato")Flow 3 contract: every data quality flow must write the Qualitร tag. This is what makes the validation status of each dataset visible in the catalogue and verifiable by the verification engine (deliverable D2.2.1). A critical failure on Silver leaves the dataset in-revisione and blocks Flow 4.
Flow 4 registers the Bronze files and the Silver table in OpenMetadata, creating a lineage entry from source to EAV.
@task
def catalog_bronze_files(bronze_path: str, fonte: str, dataset: str):
"""Scan Bronze directory and register files in OpenMetadata."""
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection
)
# ... OpenMetadata ingestion config and lineage creationPrerequisites: pip install 'openmetadata-ingestion[datalake]' and OPENMETADATA_JWT_TOKEN environment variable set (generate from OpenMetadata UI โ Settings โ Bots โ ingestion-bot).
The fifth phase makes the transformed dataset usable through the Datastore. It is not a pipeline flow and is not the responsibility of the developers, but of the verification responsible, who carries it out with the scripts in scripts/ and scripts/catalog/: it registers the variable definitions and the dimension labels, synchronises tags and descriptions from OpenMetadata, and refreshes the catalogue's materialised views.
Typical procedure (multidimensional esploradati/ASTER dataset):
source .venv/bin/activate
# 1. Generate the fact-table migration + populate cube_dimension_labels and variable_definitions
python scripts/generate_cube_ddl.py "IT1,DF_EXAMPLE_1_0"
# 2. Apply the migration
yoyo apply
# 3. Run Flow 02 (populates silver.fct_<safe_id>) and update statistics
psql -d maps_db -c "ANALYZE silver.fct_<safe_id>;"
# 4. Sync tags and descriptions from OpenMetadata โ datastore.variable_definitions
python scripts/catalog/sync_from_om.py --prod
# 5. Refresh the catalogue's materialised views (searchability + counts)
python scripts/catalog/refresh_search_index.pyFor generic EAV sources, variable registration happens through the catalog/datastore-prepare-pipeline pipeline (or the individual deployments sync-variable-defs, sync-from-om, generate-short-names).
Tables and views involved: datastore.variable_definitions (column metadata: display_name, data_type, aggregation_rule, source, dimensions); datastore.cube_dimension_labels (SDMX code โ Italian label); datastore.variable_tags (Fonte/Qualitร /Ambito tags); the materialised views datastore.dataset_catalog, datastore.dataset_search_index, and datastore.dataset_record_counts, refreshed together by refresh_search_index.py.
Column metadata and aggregation rules. Beyond labels and descriptions, each variable carries an aggregation rule in datastore.variable_definitions.aggregation_rule, which governs reconciliation to the 2025 territories (deliverable D2.3.1). The allowed values are sum, mean, mean_area, last, none. The rule is inferred automatically from heuristics on the attribute's type and name (e.g. *_pct/*_rate attributes โ mean; numeric โ sum; otherwise none) and must then be completed and validated column by column by the data stewards, because the inference does not always capture the nature of the quantity:
UPDATE datastore.variable_definitions
SET aggregation_rule = 'mean'
WHERE source = 'aster_<safe_id>' AND attribute = 'tasso_disoccupazione';Remediation of the most common misalignments:
refresh_search_index.py; check that variable_definitions contains rows for the source.ANALYZE on the fact table; check that the attribute names in variable_definitions match those in silver.territory_attributes.sync_from_om.py (for ASTER, generate_cube_ddl.py --force may be needed).generate_cube_ddl.py.Verifying phase 5. The dataset must appear in the Datastore catalogue with correct labels and an accurate record count, and be generatable as a Gold table. On this generation the data stewards perform the BronzeโGold content verification (deliverable D2.3.1).
The shared modules in flows/utils/ are available to all pipelines; the main ones are described below.
utils/bronze_writer.py โ standardised Bronze write with checksum and log:
from utils.bronze_writer import save_to_bronze, log_ingestion
file_info = save_to_bronze(data=response.content, fonte="agenzie-dogane", dataset="tabacchi", anno=2025, ext="html")
log_ingestion(file_info, n_record=0, url_source=url)utils/rate_limiter.py โ global thread-safe rate limiting for web scraping:
from utils.rate_limiter import rate_limited_request
response = rate_limited_request(url, method="POST", delay=2)utils/html_parser.py โ shared HTML table parsing:
from utils.html_parser import parse_table, extract_form_data
data = parse_table(soup, table_id="results")utils/situas_client.py โ ISTAT SITUAS API wrapper:
from utils.situas_client import SituasClient
client = SituasClient()
catalogue = client.get_catalogue() # list of 74 datasets
records = client.get_all_records(pfun=64, pdata="01/01/2024") # paginated
records = client.get_records(pfun=61, pdata="01/01/2024") # single-pageget_all_records() handles pagination by iterating pages of 500 records. Note that some pfuns โ notably pfun=61 (comuni) โ return all results in a single response and do not support row-offset pagination; use get_records() directly for those.
utils/territory_resolver.py โ shared (sigla, comune) โ territory_id lookup:
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
# Build the lookup once per flow run (one DB query)
lookup = build_comune_lookup_from_conn(conn)
# Resolve each record โ O(1) dict lookup
match = resolve_comune(lookup, sigla="MI", nome="Milano")
if match:
territory_id, type_code = matchThe lookup searches both silver.territories.label and silver.territory_names, which includes alternate names, bilingual forms, and ADM alias names. If resolve_comune returns None, the territory could not be matched โ log a warning and continue.
utils/service_writer.py โ idempotent upsert helpers for silver.services and its satellite tables:
from utils.service_writer import (
upsert_service, # โ int (service id)
upsert_service_identifier, # attach an external code
upsert_service_attribute, # store a typed metric (EAV)
upsert_service_category, # attach a categorical tag
)All functions accept an open psycopg2 cursor. The caller manages the connection and calls conn.commit(). See ยง5.3 for usage examples.
Use these utilities rather than reimplementing common patterns in each pipeline.
Also available, among others: bronze_readers (reading and discovery of Bronze files), comune_extract (extraction of province/comune pairs from tabular and JSON sources), territory_attributes_writer (batch upsert into the Silver EAV), istat_esploradati and istat_aster (clients for the ISTAT data explorers), om_tag_writer (Quality tag, ยง6.3), om_lineage_writer (lineage in OpenMetadata), and search_index (refresh of the catalogue views, ยง6.5).
A deployment turns a flow into a scheduled, monitored resource on the Prefect server. The developer defines prefect.yaml once; subsequent executions happen through the UI or CLI without touching the code.
Each pipeline defines four deployments in prefect.yaml. The pull section is required in all pipelines: it sets the working directory and installs flow dependencies at runtime. This is necessary because the prefect-worker container uses the base image prefecthq/prefect:3-python3.11, which does not pre-install pipeline-specific packages such as psycopg2-binary.
name: {ente}-{dataset}
prefect-version: "3.*"
work_pool:
name: default-pool
pull:
- prefect.deployments.steps.set_working_directory:
directory: /flows/{ente}/{dataset}
- prefect.deployments.steps.pip_install_requirements:
directory: /flows/{ente}/{dataset}
requirements_file: requirements.txt
deployments:
- name: {ente}-{dataset}-01-ingestion # Flow 1 โ download โ Bronze
entrypoint: 01_ingestion_flow.py:ingestion_flow
schedules: [] # no automatic schedule โ trigger manually
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-02-transform # Flow 2 โ Bronze โ Silver
entrypoint: 02_transform_flow.py:transform_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-03-quality # Flow 3 โ quality monitoring
entrypoint: 03_data_quality_flow.py:quality_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-04-metadata # Flow 4 โ catalogue update
entrypoint: 04_metadata_flow.py:metadata_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-poolThe numbered prefix (01, 02, ...) keeps deployments in execution order in the Prefect UI. schedules: [] explicitly removes any schedule โ omitting the key does not delete schedules that were previously registered.
Deployments must be registered from inside the prefect-worker container, not from the host shell. This is because Prefect records the working directory path at registration time and uses it verbatim at execution time. The worker executes flows at /flows/{ente}/{dataset}/ (the Docker volume mount); registering from the host would record the host path which does not exist inside the container.
docker exec maps-prefect-worker bash -c \
"cd /flows/{ente}/{dataset} && prefect deploy --all"After registration, deployments are visible at http://localhost:4200/deployments.
The Prefect worker reads code from the filesystem on each execution:
prefect deploy --all. The worker will execute the updated file on the next run.prefect.yaml changes (new deployments, schedule changes, new parameters, pull steps): require prefect deploy --all run from inside the container.schedules: [] in prefect.yaml and re-register. Omitting the key does not remove an existing schedule.From the UI โ go to http://localhost:4200/deployments, click a deployment, then "Run". The parameter form is pre-filled with defaults and can be edited before each run.
From the CLI:
export PREFECT_API_URL=http://localhost:4200/api
# Run with default parameters
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion'
# Override parameters at runtime
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion' \
--param anno=2024
# List all registered deployments
prefect deployment lsThe CLI name follows the pattern {flow-name}/{deployment-name}. The flow name comes from the @flow(name=...) decorator; the deployment name from prefect.yaml. Check exact names with prefect deployment ls or the UI.
Do not start a local Prefect worker (prefect worker start) when using the Docker stack. The maps-prefect-worker container already polls default-pool. A competing local worker would pick up runs but fail because the /flows/ paths do not exist on the host.
Monitor: Prefect UI at http://localhost:4200.
Each of the four flows can be developed and tested independently. A natural split for a two-developer team working on the same pipeline:
| Developer A | Developer B |
|---|---|
| Flow 1: Ingestion (source-specific, often scraping or API) | Flow 2: Transform (EAV normalisation, territory resolution) |
| Flow 4: Metadata catalogue | Flow 3: Data quality (DuckDB/SQL checks, Quality tag) |
README.md + prefect.yaml | tests/test_flow.py |
Handoff criterion between flows: Flow 1 is considered complete when at least one Bronze file is written to the correct path and its checksum is recorded in bronze.ingestion_log. Flow 2 can begin with that file independently of Flow 1 being fully finished.
Tracking on GitLab: open one issue per flow. Link them to the parent pipeline issue. Close each sub-issue as the flow reaches its Bronze/Silver acceptance criteria (see ยง10).
/data/bronze/{ente}/{anno}/)bronze.ingestion_logretries=3 with backoff)README.md documents source URL, format, and update frequencysilver.territory_attributes with the correct type_codeON CONFLICT DO UPDATE in place (idempotent re-runs)territory_id resolved for all records (unresolved comuni logged and counted)Qualitร tag written to OpenMetadata on every run (bronze-/silver-validato or in-revisione)datastore.variable_definitions populated for the source, with aggregation_rule reviewed by the data stewardsrefresh_search_index.py): dataset searchable and record count correctON CONFLICT fails with "no unique constraint": the UNIQUE constraint on (territory_id, attribute, valid_from) must be present. Verify with \d silver.territory_attributes in psql.
territory_id is NULL / no records resolved: the ISTAT foundation pipelines (flows/istat/) must run before any EAV pipeline. If _build_territory_lookup() returns an empty dict, silver.territories is not yet populated.
valid_from causes a duplicate key: two rows for the same entity/attribute/year with different valid_from dates. Use a fixed date (e.g. f"{anno}-01-01") rather than CURRENT_DATE to keep runs idempotent.
Flow 2 reads stale Bronze file: Prefect task caching may return a cached result. Either disable caching on the download task for that run, or delete the cache entry from the Prefect UI.
OpenMetadata JWT expired: rotate the token via the OpenMetadata UI (Settings โ Bots) and update the OPENMETADATA_JWT_TOKEN environment variable in the prefect-worker container.
Flow run crashes with FileNotFoundError: /flows/{ente}/{dataset}: the deployment was registered from the host shell instead of from inside the container. Re-register using docker exec maps-prefect-worker bash -c "cd /flows/... && prefect deploy --all" (see ยง8.2).
Flow run crashes with ModuleNotFoundError: No module named 'psycopg2': the pull section is missing or incomplete in prefect.yaml. Ensure both set_working_directory and pip_install_requirements steps are present (see ยง8.1), then re-register the deployment.
Flow run is scheduled but never starts: a local worker (prefect worker start) is running alongside the Docker worker and picks up the run, then fails. Stop any local worker processes and let maps-prefect-worker handle execution exclusively.
get_all_records() raises JSONDecodeError: the pfun returns all records in a single response and does not support row-offset pagination via pdatada. Use get_records() directly (see ยง7 โ situas_client.py).