๐ฌ๐ง English
๐ฌ๐ง English
Appearance
๐ฌ๐ง English
๐ฌ๐ง English
Appearance
Deliverable D2.2 โ Appendix: Operational guide for building and maintaining ETL pipelines in the MAPS Data Lake.
This guide is intended for developers joining the project who need to build a new data pipeline or maintain an existing one. It describes the standard workflow, conventions, and patterns used across all pipelines in the MAPS infrastructure. The reference implementation throughout this guide is the Agenzia Dogane e Monopoli โ tabaccherie pipeline; the complete source code is available at flows/agenzie-dogane/tabacchi/ in the repository.
To start the local development environment you need Python 3.11, Docker, and Docker Compose. The following steps configure the database, the Prefect server, and the Python interpreter in about ten minutes.
git clone https://gitlab.openpolis.io/openpolis/gst/gst-maps-pipelines.git
cd gst-maps-pipelinescp .env.example .envThe default values in .env.example work without modification for local development. The one variable worth checking is BRONZE_BASE_PATH:
# .env.example โ commented out, not needed in local development
# BRONZE_BASE_PATH=/data/bronzeIn local development, flows automatically compute the Bronze path as {repo_root}/data/bronze (derived from the file's location). In production, inside the prefect-worker container, set BRONZE_BASE_PATH=/data/bronze โ the Docker volume is already mounted there.
Flows load .env automatically at startup via python-dotenv (included in requirements-dev.txt): there is no need to manually export variables in the shell before running a script.
docker compose -f docker-compose.local.yml up -dThis starts PostgreSQL 17 + PostGIS 3.5, the Prefect server, and OpenMetadata (with its dedicated database and Elasticsearch). The MAPS database schema (bronze, silver, gold schemas, tables, and indexes) is created automatically on first startup via postgres/init-scripts/01-init-schemas.sql. Verify that the services are running:
docker compose -f docker-compose.local.yml psIf you modify docker-compose.local.yml or .env after the containers are already running, docker compose restart does not re-read environment variables. Use instead:
docker compose -f docker-compose.local.yml up -d --force-recreatepython3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txtThis installs Prefect, psycopg2, requests, BeautifulSoup, and python-dotenv. The virtualenv must be activated each time you open a new terminal session (source .venv/bin/activate).
export PREFECT_API_URL=http://localhost:4200/api
prefect work-pool create default-pool --type processThe default-pool work pool is the execution pool referenced in each pipeline's prefect.yaml.
admin@open-metadata.org / admin)psql -h localhost -U maps -d maps_db (password: maps_dev)Verify that the schemas and tables were created correctly:
-- expected schemas: bronze, silver, gold
\dn
-- expected tables: bronze.ingestion_log, silver.territory_types,
-- silver.territories, silver.territory_containments,
-- silver.territory_identifiers, silver.territory_names,
-- silver.territory_attributes,
-- silver.service_types, silver.services, silver.service_identifiers,
-- silver.service_attributes, silver.service_categories
\dt bronze.*
\dt silver.*The environment is ready. The Bronze layer is a Docker volume (bronze-data) mounted at /data inside the containers; raw files downloaded by flows will be stored in that volume.
OpenMetadata takes a few minutes to complete its database migration on first startup; wait for the maps-openmetadata container to report a healthy status before opening the interface.
Prefect follows a client/server architecture. The server (container maps-prefect-server) tracks flow runs, stores logs, and serves the UI at http://localhost:4200. The worker (container maps-prefect-worker) is the process that actually executes flows: it polls the server for work assigned to its pool (default-pool) and launches each flow in a subprocess.
Developers interact with the server through the Prefect client โ the CLI and Python SDK โ whose target is configured by the PREFECT_API_URL environment variable. During local development the client points to the server running in Docker; in production it will point to the remote server, with no changes to the flow code itself.
A deployment associates a flow with a worker pool and defines its execution parameters (schedule, default parameter values). Once registered, a flow can be triggered from the UI or CLI without the developer needing a local process running.
sequenceDiagram
participant Dev as Developer
participant Server as Prefect Server
participant Worker as Worker (default-pool)
Dev->>Server: register deployment
Note over Server: stores flow config,
schedule, parameters
Dev->>Server: trigger run (UI or CLI)
Server->>Worker: dispatch work item
Worker->>Worker: execute flow subprocess
Worker-->>Server: report state + logs
Every MAPS pipeline follows the Medallion pattern: raw files are downloaded to the Bronze layer, transformed and validated in the Silver layer, and finally aggregated into the Gold layer. Pipelines are orchestrated by Prefect and follow a standard four-flow structure.
External source โ Flow 1: Ingestion โ Bronze (filesystem)
โ Flow 2: Transform โ Silver (PostgreSQL)
โ Flow 3: Quality โ Validation report
โ Flow 4: Metadata โ OpenMetadata catalogueEach flow is independent and can be re-run without executing the preceding flows. This is deliberate: if the transformation logic changes, only Flow 2 needs to be re-run; there is no need to re-download the source data.
Each pipeline lives in a dedicated subdirectory under flows/:
flows/
โโโ utils/ # Shared modules for all pipelines
โ โโโ bronze_writer.py # save_to_bronze(), log_ingestion()
โ โโโ rate_limiter.py # rate_limited_request()
โ โโโ html_parser.py # parse_table(), extract_form_data()
โ โโโ situas_client.py # SituasClient โ ISTAT SITUAS API wrapper
โ โโโ territory_resolver.py # build_comune_lookup_from_conn(), resolve_comune()
โ โโโ service_writer.py # upsert_service(), upsert_service_identifier(), ...
โโโ {ente}/{dataset}/
โโโ 01_ingestion_flow.py # Flow 1: download โ Bronze
โโโ 02_transform_flow.py # Flow 2: Bronze โ Silver
โโโ 03_data_quality_flow.py # Flow 3: validation
โโโ 04_metadata_flow.py # Flow 4: OpenMetadata catalogue
โโโ prefect.yaml # Deployment definitions (all 4 flows)
โโโ requirements.txt # Python dependencies
โโโ README.md # Pipeline documentation
โโโ docs/
โ โโโ DEPLOY_GUIDE.md # Step-by-step deployment instructions
โ โโโ KNOWN_ISSUES.md # History of issues and their solutions
โโโ tests/
โโโ test_flow.py # Unit testsNaming conventions:
agenzie-dogane/tabacchi/)01_, 02_, 03_, 04_The flows/istat/ directory contains the foundation pipelines that populate the Silver territorial model. All other data pipelines depend on this data being present.
| Pipeline | Directory | Source | Scope |
|---|---|---|---|
| Province, regioni, ripartizioni | flows/istat/province-regioni/ | SITUAS pfun=64/68/71 + variazioni pfun=106โ108, 112โ114 | ~107 province, 20 regioni, 5 ripartizioni per year (2000โtoday) |
| Comuni | flows/istat/comuni/ | SITUAS pfun=61 + variazioni pfun=129 | ~7,900 comuni per year (2000โtoday) |
| Variazioni amministrative | flows/istat/variazioni-amministrative/ | SITUAS pfun=106โ108, 112โ114, 98, 104, 105, 129 | Ingestion only โ downloads full history from 1861 into Bronze |
| Territory corrections | flows/istat/territory-corrections/ | Static data โ ADM alias names and data-quality fixes | Idempotent flow; must run after comuni are populated |
Run order matters. Variazioni processing is integrated into the province-regioni and comuni transform flows. The variazioni Bronze data must be ingested first. The territory-corrections flow must run last, after comuni territories are populated.
Step 1 โ Ingestion (any order, can run in parallel):
istat-province-regioni โ Flow 1 (ingestion)
istat-comuni โ Flow 1 (ingestion)
istat-variazioni โ Flow 1 (ingestion) โ must complete before Step 2
Step 2 โ Transform (strict order):
istat-province-regioni โ Flow 2 (transform) โ reads province bronze + variazioni bronze
istat-comuni โ Flow 2 (transform) โ reads comuni bronze + variazioni bronze + province silver
Step 3 โ Corrections (after comuni are populated):
istat-territory-corrections โ Flow 1 (corrections)Each flow file accepts --anni as a CLI argument. Flows can be run directly via Python (if the virtualenv is active) or via Docker (recommended):
# Option A: Python directly (venv active)
# Step 1: Ingestion (any order)
python flows/istat/variazioni-amministrative/01_ingestion_flow.py
python flows/istat/province-regioni/01_ingestion_flow.py
python flows/istat/comuni/01_ingestion_flow.py
# Step 2: Transform (strict order โ variazioni bronze must exist first)
python flows/istat/province-regioni/02_transform_flow.py
python flows/istat/comuni/02_transform_flow.py
# Step 3: Corrections (after comuni are populated)
python flows/istat/territory-corrections/01_corrections_flow.py
# Specific years only
python flows/istat/province-regioni/01_ingestion_flow.py --anni 2024 2025 2026
# Option B: Docker (recommended)
# Step 1: Ingestion
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/variazioni-amministrative && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/province-regioni && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/comuni && python 01_ingestion_flow.py"
# Step 2: Transform (province-regioni first, then comuni)
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/province-regioni && python 02_transform_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/comuni && python 02_transform_flow.py"
# Step 3: Corrections
docker exec maps-prefect-worker bash -c \
"cd /flows/istat/territory-corrections && python 01_corrections_flow.py"All ISTAT pipelines are registered on the Prefect server with no automatic schedule; runs are triggered manually. To register or re-register after changing prefect.yaml:
docker exec maps-prefect-worker bash -c "cd /flows/istat/province-regioni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/comuni && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/variazioni-amministrative && prefect deploy --all"
docker exec maps-prefect-worker bash -c "cd /flows/istat/territory-corrections && prefect deploy --all"To trigger a run via CLI (respect the execution order):
export PREFECT_API_URL=http://localhost:4200/api
# Step 1: Ingestion (any order)
prefect deployment run 'istat-variazioni-ingestion/istat-variazioni-01-ingestion'
prefect deployment run 'istat-province-regioni-ingestion/istat-pr-01-ingestion' \
--param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-ingestion/istat-comuni-01-ingestion' \
--param anni='[2024,2025,2026]'
# Step 2: Transform (wait for ingestion; province-regioni before comuni)
prefect deployment run 'istat-province-regioni-transform/istat-pr-02-transform' \
--param anni='[2024,2025,2026]'
prefect deployment run 'istat-comuni-transform/istat-comuni-02-transform' \
--param anni='[2024,2025,2026]'
# Step 3: Corrections (after comuni transform completes)
prefect deployment run 'istat-territory-corrections/istat-territory-corrections-01'All ISTAT flows are idempotent: re-running a transform on unchanged Bronze produces identical Silver output.
The Silver layer uses a stable surrogate identity model for territories. Each territory has an internal id that never changes; ISTAT codes are stored as identifiers in silver.territory_identifiers. This eliminates record duplication when comuni change province โ the settlement is one row, with multiple ISTAT code periods.
| Table | Purpose |
|---|---|
silver.territory_types | Lookup of entity types โ pre-seeded with 9 types |
silver.territories | Master registry: stable surrogate id, type_code, label, lifecycle dates (valid_from/valid_to) |
silver.territory_identifiers | Temporal identifiers: ISTAT codes (scheme='istat'), cadastral codes, fiscal codes, UTS codes |
silver.territory_names | Temporal names with language support (Italian and bilingual/local) |
silver.territory_containments | Temporal hierarchical containment (comune โ provincia, with valid_from/valid_to for province changes) |
silver.territory_relationships | Predecessor/successor links from variazioni events (ES, CS, RN, AQES, CECS) |
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territory_types โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ PK code VARCHAR(30) โ
โ label โ
โ hierarchy_level โ
โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ
โ 1
โ (type_code)
โ N
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territories โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ PK id SERIAL โ
โ FK type_code label subtype โ
โ valid_from DATE valid_to DATE end_reason TEXT โ
โโโโฌโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ 1:N โ 1:N โ 1:N (member + container)
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ territory_ โ โ territory_ โ โ territory_containments โ
โ identifiers โ โ names โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโค โ FK member_id โ territories โ
โFK territory_ โ โFK territory_ โ โ FK container_id โ territories โ
โ id โ โ id โ โ valid_from DATE โ
โ scheme โ โ name โ โ valid_to DATE โ
โ identifier โ โ language โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ valid_from โ โ name_type โ
โ valid_to โ โ valid_from โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ fonte โ โ valid_to โ โ territory_relationships โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ FK source_id โ territories โ
โ FK dest_id โ territories โ
โ classification TEXT โ
โ valid_from DATE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโTerritories are resolved by ISTAT code via a join on territory_identifiers:
-- Find the territory for ISTAT code '092001'
SELECT t.id, t.label FROM silver.territories t
JOIN silver.territory_identifiers ti ON ti.territory_id = t.id
WHERE t.type_code = 'comune' AND ti.scheme = 'istat' AND ti.identifier = '092001';
-- Which province was Arbus in on 2015-01-01?
SELECT p.label FROM silver.territory_containments tc
JOIN silver.territories p ON p.id = tc.container_id AND p.type_code = 'provincia'
JOIN silver.territories t ON t.id = tc.member_id
WHERE t.type_code = 'comune' AND t.label = 'Arbus'
AND (tc.valid_from IS NULL OR tc.valid_from <= '2015-01-01')
AND (tc.valid_to IS NULL OR tc.valid_to > '2015-01-01');valid_from = NULL means the start of validity is unknown (predates the available data). valid_to = NULL means the territory is still active.
All pipelines write to silver.territory_attributes, the generalised EAV table that supports any type of territorial entity.
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value, data_type, source, valid_from, valid_to)
VALUES
(42, 'comune', 'popolazione', '2635', 'integer', 'ISTAT', '2024-01-01', NULL),
(107, 'provincia', 'n_tabaccherie', '245', 'integer', 'ADM', '2024-01-01', '2025-12-31'),
(8, 'sll', 'addetti', '15200', 'integer', 'ISTAT', '2021-01-01', NULL);| Field | Description | Example values |
|---|---|---|
territory_id | FK to silver.territories(id) โ stable surrogate, survives ISTAT code changes | resolved via territory_resolver |
type_code | Entity type (denormalised from territories) | 'comune', 'provincia', 'regione', 'sll', 'slo' |
attribute | Attribute name | 'popolazione', 'n_tabaccherie_adm' |
value | Value stored as text | '2635' |
data_type | Type hint for casting | 'integer', 'float', 'string', 'boolean' |
source | Data source | 'ISTAT', 'ADM', 'MinSalute' |
valid_from | Start of validity | '2024-01-01' |
valid_to | End of validity โ NULL means current | NULL |
The ON CONFLICT ... DO UPDATE pattern handles re-runs safely:
execute_values(
cur,
"""
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value,
data_type, source, valid_from, valid_to)
VALUES %s
ON CONFLICT (territory_id, attribute, valid_from)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""",
eav_records
)silver.territory_attributes stores one row per (territory, attribute, period): it is optimised for aggregated territorial statistics such as population counts or number of pharmacies per comune. It is not designed to store individual point-of-interest records.
Several data sources publish lists of individual local services (pharmacies, hospitals, schools, tobacco shops, and others). Keeping individual records in Silver offers tangible benefits over writing only aggregated counts: the same raw data can be aggregated by comune, province, ASL catchment area, or SLL without re-ingesting from the source; records from multiple datasets can be deduplicated via service_identifiers using external codes such as codice fiscale or codice HSP; and service-specific attributes can be stored without schema changes.
| Table | Purpose |
|---|---|
silver.service_types | Registry of service types โ pre-seeded with 5 types (farmacia, ospedale, scuola, biblioteca, tabaccheria) |
silver.services | One row per individual service, with resolved territory_id, lifecycle dates, and a uniqueness constraint for idempotent upserts |
silver.service_identifiers | External codes per service (codice_fiscale, codice_ministeriale, codice_hsp, ...) โ same identifier means the same logical service across datasets |
silver.service_attributes | EAV for service-specific metrics (posti_letto, num_studenti, ...) โ mirrors territory_attributes but scoped to a service row |
silver.service_categories | Multi-valued typed tags (tipo_ospedale: DEA II livello, livello_scuola: primaria, ...) |
Writing services from a pipeline uses the helpers in utils/service_writer.py:
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
from utils.service_writer import (
upsert_service, upsert_service_identifier,
upsert_service_attribute, upsert_service_category,
)
conn = _get_db_conn()
lookup = build_comune_lookup_from_conn(conn)
with conn.cursor() as cur:
for record in source_records:
# 1. Resolve territory
match = resolve_comune(lookup, sigla=record["provincia"], nome=record["comune"])
territory_id = match[0] if match else None
# 2. Upsert the service row
service_id = upsert_service(
cur,
type_code="farmacia",
name=record["denominazione"],
address=record["indirizzo"],
territory_id=territory_id,
source="AIFA",
valid_from="2024-01-01",
)
# 3. Attach external identifier (for cross-source deduplication)
upsert_service_identifier(cur, service_id=service_id,
scheme="codice_fiscale", identifier=record["cf"])
# 4. Store service-specific metric
upsert_service_attribute(cur, service_id=service_id,
attribute="tipo_farmacia", value=record["tipo"],
data_type="text", source="AIFA", valid_from="2024-01-01")
# 5. Tag with category
upsert_service_category(cur, service_id=service_id,
scheme="tipo_farmacia", value=record["tipo"])
conn.commit()
conn.close()Once individual records are in Silver, counts per territory are a simple GROUP BY query that can feed silver.territory_attributes as aggregated statistics.
The Agenzia Dogane e Monopoli โ tabaccherie pipeline is the reference implementation because it is the most complex pipeline in the project: it requires HTML scraping of a JSF portal with Ajax pagination, session management, rate limiting, and territorial code normalisation. A pipeline that downloads a CSV or XLSX file directly follows the same structure but with a much simpler Flow 1.
Before registering a deployment on Prefect, every flow can be run directly as a Python script. This is the normal way to work: write the flow, run it, fix errors, run it again โ without touching the server.
Every flow file must include an if __name__ == "__main__" block with argparse so that parameters can be passed from the command line without editing the source:
# 01_ingestion_flow.py
from prefect import flow
@flow
def ingestion_flow(anno: int = 2025):
...
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--anno", type=int, default=2025)
args = parser.parse_args()
ingestion_flow(anno=args.anno)Be consistent: always use the same execution method within a pipeline run. Python writes Bronze files to {repo_root}/data/bronze/ on the host; Docker writes to /data/bronze/ inside the container. If you ingest with Python and transform with Docker (or vice versa), the transform will not find the Bronze files. In docker-compose.local.yml the Bronze directory is bind-mounted from ./data, so both methods share the same path โ but only if BRONZE_BASE_PATH is set consistently (see ยง1.3).
Option A โ Python directly (virtualenv must be active):
source .venv/bin/activate
cd flows/agenzie-dogane/tabacchi/
python 01_ingestion_flow.py # uses default parameters
python 01_ingestion_flow.py --anno 2024 # override at runtimeOption B โ Docker (recommended โ matches the production environment exactly):
docker exec maps-prefect-worker bash -c \
"cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py"
docker exec maps-prefect-worker bash -c \
"cd /flows/agenzie-dogane/tabacchi && python 01_ingestion_flow.py --anno 2024"If PREFECT_API_URL is set and the server is running, the run appears in the UI with logs and status. If the server is not available, the flow runs as a plain Python script with no server dependency.
The typical iteration is:
Each flow can be developed and tested independently. Flow 2 can be tested by reading a Bronze file already present on disk, without Flow 1 having completed.
Every flow file starts with the same four-part header. Copy it verbatim and adjust the parents[N] depth and the FONTE constant.
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[3] / ".env") # repo root
except ImportError:
pass
import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task, get_run_logger
sys.path.insert(0, str(Path(__file__).resolve().parents[2])) # adds flows/ to import path
from utils.bronze_writer import save_to_bronze, log_ingestion # and other utils as needed
FONTE = "ente-name" # matches the Bronze directory
_REPO_ROOT = Path(__file__).resolve().parents[3]
BRONZE_BASE = os.getenv("BRONZE_BASE_PATH", str(_REPO_ROOT / "data" / "bronze"))
def _get_db_conn():
return psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", 5432)),
database=os.getenv("POSTGRES_DB", "maps_db"),
user=os.getenv("POSTGRES_USER", "maps"),
password=os.getenv("POSTGRES_PASSWORD", "maps_dev"),
)The parents[N] index depends on directory depth: parents[2] from flows/{ente}/{dataset}/ reaches flows/ (where utils/ lives); parents[3] reaches the repo root (where .env lives). _get_db_conn() is defined locally in each flow rather than imported from utils, so each script is self-contained.
Flow 1 downloads raw data from the external source and saves it to the Bronze filesystem via the save_to_bronze utility. It records the operation in bronze.ingestion_log.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
from utils.bronze_writer import save_to_bronze, log_ingestion
@task(
retries=3,
retry_delay_seconds=[60, 300, 900],
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24)
)
def download_fonte(anno: int) -> dict:
url = "https://example.gov.it/data/dataset.xlsx"
response = httpx.get(url, follow_redirects=True, timeout=300)
response.raise_for_status()
file_info = save_to_bronze(
data=response.content,
fonte="nome-ente",
dataset="nome_dataset",
anno=anno,
ext="xlsx"
)
log_ingestion(file_info, n_record=0, url_source=url)
return file_infosave_to_bronze creates the directory, writes the raw bytes, computes the SHA256 checksum, and returns a standard dictionary with file_path, size_bytes, checksum, fonte, dataset, and anno. log_ingestion inserts the corresponding row into bronze.ingestion_log.
The Bronze layer always stores the raw source exactly as received: bytes downloaded via HTTP for CSV, XLSX, and PDF files; raw HTML for scraping pipelines. Transformation into data structures happens in Flow 2.
Bronze path convention: {repo_root}/data/bronze/{ente}/{anno}/{dataset}.{ext} Examples: /data/bronze/istat/2024/popolazione_residente.xlsx, /data/bronze/minsalute/2024/asl_boundaries.xlsx
JSF/PrimeFaces portals. Some Italian public portals use JavaServer Faces with Ajax-driven pagination. Each response contains a javax.faces.ViewState token that must be echoed back in the next request. Use a requests.Session to maintain cookies, extract the ViewState with BeautifulSoup, and use rate_limited_request() from utils/rate_limiter.py:
from utils.rate_limiter import rate_limited_request
def _get_viewstate(html: str) -> str:
from bs4 import BeautifulSoup
vs = BeautifulSoup(html, "html.parser").find("input", {"name": "javax.faces.ViewState"})
return vs.get("value", "") if vs else ""
session = requests.Session()
response = rate_limited_request(
url, method="POST",
data={
"javax.faces.ViewState": viewstate,
"javax.faces.partial.ajax": "true",
"javax.faces.partial.execute": "formBusca:regione",
"javax.faces.partial.render": "formBusca:provincia",
"formBusca:regione": regione_code,
},
session=session,
delay=2,
)For a complete working example see flows/agenzie-dogane/tabacchi/01_ingestion_flow.py.
Flow 2 reads from Bronze, normalises the data, and writes to silver.territory_attributes. This is the most dataset-specific flow; the structure of the source file determines most of the logic.
Standard skeleton:
@task
def trasforma_e_carica(file_info: dict, entity_type: str) -> int:
# 1. Read source file
if file_info['file_path'].endswith('.xlsx'):
df = pd.read_excel(file_info['file_path'])
elif file_info['file_path'].endswith('.csv'):
df = pd.read_csv(file_info['file_path'], encoding='utf-8-sig', sep=';')
# 2. Resolve territory_id and build EAV records
conn = _get_db_conn()
lookup = _build_territory_lookup(conn) # (sigla, label_upper) โ (territory_id, type_code)
eav_records = []
for _, row in df.iterrows():
match = lookup.get((row['SIGLA'].upper(), row['COMUNE'].upper()))
if not match:
continue
territory_id, type_code = match
eav_records.extend([
(territory_id, type_code, 'attributo_1', str(row['COL_A']),
'integer', fonte, f'{anno}-01-01', None),
(territory_id, type_code, 'attributo_2', str(row['COL_B']),
'string', fonte, f'{anno}-01-01', None),
])
# 3. UPSERT into Silver EAV
try:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO silver.territory_attributes
(territory_id, type_code, attribute, value,
data_type, source, valid_from, valid_to)
VALUES %s
ON CONFLICT (territory_id, attribute, valid_from)
DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""", eav_records)
conn.commit()
finally:
conn.close()
return len(eav_records)Territory resolution: pipelines must resolve territory_id from silver.territories before writing to silver.territory_attributes. Use _build_territory_lookup() (see ยง6.0.1) or the shared territory_resolver.build_comune_lookup_from_conn() to build a (sigla_automobilistica, label_upper) โ (territory_id, type_code) dict. Records whose comune cannot be resolved are logged as warnings and skipped โ this is expected when the ISTAT foundation pipelines have not yet run.
Flow 3 validates both the Bronze files and the Silver records. It uses Great Expectations for structured validation and produces a report.
Bronze validation (check the raw file before trusting the transform):
import great_expectations as gx
suite = gx.core.ExpectationSuite(name=f"{fonte}_{dataset}")
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_to_exist",
kwargs={"column": "colonna_codice_entita"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "colonna_codice_entita", "mostly": 0.99}
))Silver validation (verify the EAV records are correct):
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "territory_id"}
))
suite.add_expectation(gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "valore_cast", "min_value": 0, "max_value": 10_000_000}
))Fail strategy: log warnings for non-critical failures; block Flow 4 (catalogue) if Silver validation fails critically.
Flow 4 registers the Bronze files and the Silver table in OpenMetadata, creating a lineage entry from source to EAV.
@task
def catalog_bronze_files(bronze_path: str, fonte: str, dataset: str):
"""Scan Bronze directory and register files in OpenMetadata."""
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection
)
# ... OpenMetadata ingestion config and lineage creationPrerequisites: pip install 'openmetadata-ingestion[datalake]' and OPENMETADATA_JWT_TOKEN environment variable set (generate from OpenMetadata UI โ Settings โ Bots โ ingestion-bot).
Six modules in flows/utils/ are available to all pipelines.
utils/bronze_writer.py โ standardised Bronze write with checksum and log:
from utils.bronze_writer import save_to_bronze, log_ingestion
file_info = save_to_bronze(data=response.content, fonte="agenzie-dogane", dataset="tabacchi", anno=2025, ext="html")
log_ingestion(file_info, n_record=0, url_source=url)utils/rate_limiter.py โ global thread-safe rate limiting for web scraping:
from utils.rate_limiter import rate_limited_request
response = rate_limited_request(url, method="POST", delay=2)utils/html_parser.py โ shared HTML table parsing:
from utils.html_parser import parse_table, extract_form_data
data = parse_table(soup, table_id="results")utils/situas_client.py โ ISTAT SITUAS API wrapper:
from utils.situas_client import SituasClient
client = SituasClient()
catalogue = client.get_catalogue() # list of 74 datasets
records = client.get_all_records(pfun=64, pdata="01/01/2024") # paginated
records = client.get_records(pfun=61, pdata="01/01/2024") # single-pageget_all_records() handles pagination by iterating pages of 500 records. Note that some pfuns โ notably pfun=61 (comuni) โ return all results in a single response and do not support row-offset pagination; use get_records() directly for those.
utils/territory_resolver.py โ shared (sigla, comune) โ territory_id lookup:
from utils.territory_resolver import build_comune_lookup_from_conn, resolve_comune
# Build the lookup once per flow run (one DB query)
lookup = build_comune_lookup_from_conn(conn)
# Resolve each record โ O(1) dict lookup
match = resolve_comune(lookup, sigla="MI", nome="Milano")
if match:
territory_id, type_code = matchThe lookup searches both silver.territories.label and silver.territory_names, which includes alternate names, bilingual forms, and ADM alias names. If resolve_comune returns None, the territory could not be matched โ log a warning and continue.
utils/service_writer.py โ idempotent upsert helpers for silver.services and its satellite tables:
from utils.service_writer import (
upsert_service, # โ int (service id)
upsert_service_identifier, # attach an external code
upsert_service_attribute, # store a typed metric (EAV)
upsert_service_category, # attach a categorical tag
)All functions accept an open psycopg2 cursor. The caller manages the connection and calls conn.commit(). See ยง5.3 for usage examples.
Use these utilities rather than reimplementing common patterns in each pipeline.
A deployment turns a flow into a scheduled, monitored resource on the Prefect server. The developer defines prefect.yaml once; subsequent executions happen through the UI or CLI without touching the code.
Each pipeline defines four deployments in prefect.yaml. The pull section is required in all pipelines: it sets the working directory and installs flow dependencies at runtime. This is necessary because the prefect-worker container uses the base image prefecthq/prefect:3-python3.11, which does not pre-install pipeline-specific packages such as psycopg2-binary.
name: {ente}-{dataset}
prefect-version: "3.*"
work_pool:
name: default-pool
pull:
- prefect.deployments.steps.set_working_directory:
directory: /flows/{ente}/{dataset}
- prefect.deployments.steps.pip_install_requirements:
directory: /flows/{ente}/{dataset}
requirements_file: requirements.txt
deployments:
- name: {ente}-{dataset}-01-ingestion # Flow 1 โ download โ Bronze
entrypoint: 01_ingestion_flow.py:ingestion_flow
schedules: [] # no automatic schedule โ trigger manually
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-02-transform # Flow 2 โ Bronze โ Silver
entrypoint: 02_transform_flow.py:transform_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-03-quality # Flow 3 โ quality monitoring
entrypoint: 03_data_quality_flow.py:quality_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-pool
- name: {ente}-{dataset}-04-metadata # Flow 4 โ catalogue update
entrypoint: 04_metadata_flow.py:metadata_flow
schedules: []
parameters:
anno: 2025
work_pool:
name: default-poolThe numbered prefix (01, 02, ...) keeps deployments in execution order in the Prefect UI. schedules: [] explicitly removes any schedule โ omitting the key does not delete schedules that were previously registered.
Deployments must be registered from inside the prefect-worker container, not from the host shell. This is because Prefect records the working directory path at registration time and uses it verbatim at execution time. The worker executes flows at /flows/{ente}/{dataset}/ (the Docker volume mount); registering from the host would record the host path which does not exist inside the container.
docker exec maps-prefect-worker bash -c \
"cd /flows/{ente}/{dataset} && prefect deploy --all"After registration, deployments are visible at http://localhost:4200/deployments.
The Prefect worker reads code from the filesystem on each execution:
prefect deploy --all. The worker will execute the updated file on the next run.prefect.yaml changes (new deployments, schedule changes, new parameters, pull steps): require prefect deploy --all run from inside the container.schedules: [] in prefect.yaml and re-register. Omitting the key does not remove an existing schedule.From the UI โ go to http://localhost:4200/deployments, click a deployment, then "Run". The parameter form is pre-filled with defaults and can be edited before each run.
From the CLI:
export PREFECT_API_URL=http://localhost:4200/api
# Run with default parameters
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion'
# Override parameters at runtime
prefect deployment run '{flow-name}/{ente}-{dataset}-01-ingestion' \
--param anno=2024
# List all registered deployments
prefect deployment lsThe CLI name follows the pattern {flow-name}/{deployment-name}. The flow name comes from the @flow(name=...) decorator; the deployment name from prefect.yaml. Check exact names with prefect deployment ls or the UI.
Do not start a local Prefect worker (prefect worker start) when using the Docker stack. The maps-prefect-worker container already polls default-pool. A competing local worker would pick up runs but fail because the /flows/ paths do not exist on the host.
Monitor: Prefect UI at http://localhost:4200.
Each of the four flows can be developed and tested independently. A natural split for a two-developer team working on the same pipeline:
| Developer A | Developer B |
|---|---|
| Flow 1: Ingestion (source-specific, often scraping or API) | Flow 2: Transform (EAV normalisation, territory resolution) |
| Flow 4: Metadata catalogue | Flow 3: Data quality (GX expectations, outlier detection) |
README.md + prefect.yaml | tests/test_flow.py |
Handoff criterion between flows: Flow 1 is considered complete when at least one Bronze file is written to the correct path and its checksum is recorded in bronze.ingestion_log. Flow 2 can begin with that file independently of Flow 1 being fully finished.
Tracking on GitLab: open one issue per flow. Link them to the parent pipeline issue. Close each sub-issue as the flow reaches its Bronze/Silver acceptance criteria (see ยง10).
/data/bronze/{ente}/{anno}/)bronze.ingestion_logretries=3 with backoff)README.md documents source URL, format, and update frequencysilver.territory_attributes with the correct type_codeON CONFLICT DO UPDATE in place (idempotent re-runs)territory_id resolved for all records (unresolved comuni logged and counted)territory_id presence, value ranges, temporal fieldsON CONFLICT fails with "no unique constraint": the UNIQUE constraint on (territory_id, attribute, valid_from) must be present. Verify with \d silver.territory_attributes in psql.
territory_id is NULL / no records resolved: the ISTAT foundation pipelines (flows/istat/) must run before any EAV pipeline. If _build_territory_lookup() returns an empty dict, silver.territories is not yet populated.
valid_from causes a duplicate key: two rows for the same entity/attribute/year with different valid_from dates. Use a fixed date (e.g. f"{anno}-01-01") rather than CURRENT_DATE to keep runs idempotent.
Flow 2 reads stale Bronze file: Prefect task caching may return a cached result. Either disable caching on the download task for that run, or delete the cache entry from the Prefect UI.
OpenMetadata JWT expired: rotate the token via the OpenMetadata UI (Settings โ Bots) and update the OPENMETADATA_JWT_TOKEN environment variable in the prefect-worker container.
Flow run crashes with FileNotFoundError: /flows/{ente}/{dataset}: the deployment was registered from the host shell instead of from inside the container. Re-register using docker exec maps-prefect-worker bash -c "cd /flows/... && prefect deploy --all" (see ยง8.2).
Flow run crashes with ModuleNotFoundError: No module named 'psycopg2': the pull section is missing or incomplete in prefect.yaml. Ensure both set_working_directory and pip_install_requirements steps are present (see ยง8.1), then re-register the deployment.
Flow run is scheduled but never starts: a local worker (prefect worker start) is running alongside the Docker worker and picks up the run, then fails. Stop any local worker processes and let maps-prefect-worker handle execution exclusively.
get_all_records() raises JSONDecodeError: the pfun returns all records in a single response and does not support row-offset pagination via pdatada. Use get_records() directly (see ยง7 โ situas_client.py).