Architettura Tecnica Data-Lake
Deliverable D2.1.1: Documento Progettazione Tecnica Data-Lake
2.1 Pattern Medallion: Bronze → Silver → Gold
2.1.1 Visione d'Insieme
L'architettura del Data Lake MAPS adotta il pattern Medallion (anche denominato Multi-Hop Architecture), best practice nell'ecosistema Modern Data Stack, che organizza i dati in tre layer di progressivo raffinamento con crescente livello di qualità, pulizia e business-readiness:
FONTI ESTERNE → BRONZE → SILVER → GOLD → APPLICAZIONI2.1.2 Rationale della Scelta
Per il progetto MAPS, l'architettura Medallion è particolarmente indicata per le seguenti motivazioni:
A. Eterogeneità delle Fonti Dati
- ~200 dataset da fonti pubbliche diverse (ISTAT, Ministeri, OpenData)
- Formati multipli: CSV, Excel, PDF, JSON, HTML
- Qualità variabile: da dataset strutturati a documenti semi-strutturati
- Necessità: Layer progressivi per standardizzare gradualmente l'eterogeneità
B. Requisiti di Audit e Compliance
- Dati pubblici ma necessità di tracciabilità per ricerca scientifica
- GDPR Article 30: documentazione delle attività di trattamento dati
- Necessità: Bronze layer immutabile come "source of truth" originale
C. Complessità delle Trasformazioni
- Pipeline multi-step: parsing PDF → validazione → normalizzazione EAV → aggregazioni territoriali
- Time-series con fusioni/scissioni comunali (2010-2025)
- Necessità: Separazione logica tra raw ingestion, cleaning e business logic
D. Riutilizzo dei Dati
- Stesso dataset usato per multiple analisi (demografia, servizi, mobilità)
- Necessità di evitare re-processing da fonte esterna ogni volta
- Necessità: Silver layer come cache enterprise validata
E. Performance e Scalabilità
- Query analitiche su 8.000+ comuni con decine di attributi
- Spatial operations PostGIS computazionalmente intensive
- Necessità: Gold layer denormalizzato per fast queries
2.1.3 Data Flow Completo
graph TB
A[Fonti Esterne
ISTAT, GTFS, Ministeri, OpenData] --> B[Prefect Orchestration
Worker Pools: istat, pdf, analytics]
B --> C[Bronze Layer
File System - Raw Files]
C --> D[Silver Layer
PostgreSQL - EAV Schema]
D --> E[Gold Layer
PostgreSQL + PostGIS]
B -.metadata.-> F[OpenMetadata
Internal Governance]
E --> F
E --> G[DuckDB
Analytics]
E --> H[API v2
Web Apps]
E --> I[CKAN
Open Data Catalog]
I --> J[dati.gov.it
National Portal]
style C fill:#cd7f32
style D fill:#c0c0c0
style E fill:#ffd700
2.1.4 Bronze Layer: Raw Data Archive
Ruolo: Archivio immutabile dei dati originali esattamente come scaricati dalla fonte.
Principio chiave: "Never modify, always preserve"
Caratteristiche Tecniche
| Aspetto | Specifica MAPS |
|---|---|
| Storage | File system locale /data/bronze/ (server op-linkurious) |
| Formato | File originali senza trasformazioni (CSV, XLSX, PDF, JSON, HTML) |
| Naming convention | /data/bronze/{fonte}/{anno}/{dataset}_{timestamp}.{ext} |
| Retention policy | Indefinita (storage cost è basso: ~50GB totali per 200 dataset) |
| Backup | Snapshot giornalieri via backup.sh script |
| Access pattern | Write-once, read-rarely (solo per reprocessing o audit) |
Struttura Directory
/data/bronze/
├── istat/
│ ├── 2024/
│ │ ├── popolazione_comuni_20240218.csv
│ │ ├── pendolarismo_matrix_20240218.csv
│ │ ├── confini_amministrativi_20240218.geojson
│ │ └── _metadata/
│ │ ├── popolazione_comuni_20240218.json # Metadata file
│ │ └── checksums.sha256
│ └── 2023/
│ └── popolazione_comuni_20230315.csv
├── minlavoro/
│ └── 2024/
│ ├── tabacchi_adm_report_20240218.pdf
│ └── _metadata/
│ └── tabacchi_adm_report_20240218.json
├── minsalute/
│ └── 2023/
│ ├── strutture_sanitarie_asl_20231120.xlsx
│ └── _metadata/
│ └── strutture_sanitarie_asl_20231120.json
└── minambiente/
└── 2024/
├── ato_gas_page_20240115.html # ← HTML file
└── _metadata/
└── ato_gas_page_20240115.jsonMetadata Tracking
Ogni file Bronze ha un corrispondente file JSON con metadata:
Esempio: /data/bronze/istat/2024/_metadata/popolazione_comuni_20240218.json
{
"file_path": "/data/bronze/istat/2024/popolazione_comuni_20240218.csv",
"fonte": "istat",
"dataset": "popolazione",
"anno_riferimento": 2024,
"download_timestamp": "2024-02-18T03:00:15Z",
"download_url": "https://www.istat.it/storage/cartografia/popolazione_comuni_2024.csv",
"file_size_bytes": 3355482,
"file_hash_sha256": "a3f5b8c9d2e1f4a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0",
"mime_type": "text/csv",
"encoding": "UTF-8",
"rows_detected": 7901,
"columns_detected": 12,
"prefect_flow_run_id": "abc123-456-def-789",
"ingestion_status": "completed"
}Database tracking (PostgreSQL):
-- Schema: bronze
CREATE TABLE bronze.ingestion_log (
id BIGSERIAL PRIMARY KEY,
fonte VARCHAR(50) NOT NULL,
dataset VARCHAR(100) NOT NULL,
anno_rif INTEGER NOT NULL,
file_path TEXT NOT NULL,
file_size_bytes BIGINT,
file_hash_sha256 CHAR(64),
download_url TEXT,
download_timestamp TIMESTAMP NOT NULL,
prefect_flow_run_id UUID,
status VARCHAR(20) NOT NULL, -- 'completed', 'failed', 'in_progress'
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE (fonte, dataset, anno_rif, file_hash_sha256)
);
-- Index per query frequenti
CREATE INDEX idx_ingestion_log_fonte_dataset ON bronze.ingestion_log(fonte, dataset);
CREATE INDEX idx_ingestion_log_status ON bronze.ingestion_log(status);
CREATE INDEX idx_ingestion_log_timestamp ON bronze.ingestion_log(download_timestamp DESC);Garanzie Bronze Layer
Immutabilità:
- File Bronze non vengono mai modificati dopo scrittura
- Re-download stesso dataset → nuovo file con timestamp diverso
- History completa: tutti i download conservati
Idempotenza:
- Re-esecuzione flow → skip se file con stesso hash già presente
- Deduplication basata su
(fonte, dataset, anno_rif, file_hash)
Disaster Recovery:
- Bronze è la "golden copy" per rebuild completo
- Se Silver/Gold si corrompono → reprocess da Bronze
- Backup script:
bash /root/maps-docker/backup.sh
Caso d'Uso: HTML → Structured Data
Scenario reale: Molte fonti pubbliche italiane pubblicano dati come tabelle HTML embedded in pagine web invece di file CSV scaricabili.
Esempio concreto: MinAmbiente pubblica elenco ATO Gas come tabella HTML su pagina web (no CSV/Excel disponibile).
Flusso Bronze-HTML:
sequenceDiagram
participant S as Source Website
participant P as Prefect Flow
participant B as Bronze Layer
participant SI as Silver Layer
S->>P: HTTP GET HTML page
P->>B: Save raw HTML file
P->>B: Log metadata JSON
Note over B: HTML preserved unchanged
P->>P: Parse HTML with BeautifulSoup
P->>P: Clean & Validate DataFrame
P->>SI: Load to EAV schema
Vantaggi Pattern Bronze-HTML:
Scenario senza Bronze layer:
→ Script scarica HTML e parsa immediatamente
→ Se parsing fallisce (HTML structure changed) → dati persi
→ Se server web va offline → impossibile reprocessare
Scenario con Bronze layer:
→ HTML salvato in Bronze (immutabile)
→ Parsing fallisce? → Fix parser, reprocess da Bronze
→ Server offline? → Bronze ha copia originale
→ HTML structure cambia? → Version history in Bronze2.1.5 Silver Layer: Cleaned & Validated Data
Ruolo: Single Source of Truth (SSOT) enterprise - dati puliti, validati, normalizzati.
Principio chiave: "Trust but verify, then store"
Caratteristiche Tecniche
| Aspetto | Specifica MAPS |
|---|---|
| Storage | PostgreSQL schema silver |
| Formato | Tabelle relazionali normalizzate (EAV schema) |
| Data model | Entity-Attribute-Value per gestire eterogeneità |
| Retention | Temporal versioning (valid_from, valid_to) per time-series |
| Backup | Snapshot giornalieri PostgreSQL + WAL archiving |
| Access pattern | Read-heavy (queries analitiche), write-moderate (batch ingestion) |
Schema EAV (Entity-Attribute-Value)
Rationale: Il progetto MAPS gestisce ~200 dataset con attributi eterogenei (popolazione, servizi, infrastrutture). Un modello EAV offre flessibilità senza continue schema migrations.
erDiagram
COMUNI_ANAGRAFICA ||--o{ COMUNI_ATTRIBUTI_EAV : has
FUSIONI_COMUNALI ||--o{ COMUNI_ANAGRAFICA : affects
COMUNI_ANAGRAFICA {
varchar codice_istat PK
varchar denominazione
varchar codice_regione
geometry geometria
}
COMUNI_ATTRIBUTI_EAV {
bigserial id PK
varchar codice_istat FK
varchar attributo
text valore
varchar fonte
varchar dataset
integer anno_rif
date valid_from
date valid_to
}
FUSIONI_COMUNALI {
serial id PK
varchar codice_istat_old
varchar codice_istat_new
date data_fusione
varchar tipo_operazione
}
Schema principale:
-- Schema: silver
CREATE TABLE silver.comuni_attributi_eav (
-- Primary key
id BIGSERIAL PRIMARY KEY,
-- Entity: Comune
codice_istat VARCHAR(6) NOT NULL REFERENCES gold.comuni_anagrafica(codice_istat),
-- Attribute: Nome attributo
attributo VARCHAR(255) NOT NULL,
-- Value: Valore come testo (type inference in Gold)
valore TEXT NOT NULL,
-- Metadata: Provenienza dati
fonte VARCHAR(50) NOT NULL, -- 'istat', 'minlavoro', 'minsalute', etc.
dataset VARCHAR(100) NOT NULL, -- 'popolazione', 'tabacchi', 'asl', etc.
anno_rif INTEGER NOT NULL,
-- Temporal validity
valid_from DATE NOT NULL DEFAULT CURRENT_DATE,
valid_to DATE,
-- Audit trail
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_by VARCHAR(100), -- Prefect flow name
-- Constraints
CONSTRAINT unique_attribute_version
UNIQUE (codice_istat, attributo, fonte, dataset, anno_rif),
CONSTRAINT valid_temporal_range
CHECK (valid_to IS NULL OR valid_to >= valid_from),
CONSTRAINT valid_anno_rif
CHECK (anno_rif BETWEEN 2000 AND 2100)
);
-- Indexes for performance
CREATE INDEX idx_silver_codice ON silver.comuni_attributi_eav(codice_istat);
CREATE INDEX idx_silver_attributo ON silver.comuni_attributi_eav(attributo);
CREATE INDEX idx_silver_fonte_dataset ON silver.comuni_attributi_eav(fonte, dataset);
CREATE INDEX idx_silver_anno ON silver.comuni_attributi_eav(anno_rif);
CREATE INDEX idx_silver_valid ON silver.comuni_attributi_eav(valid_from, valid_to)
WHERE valid_to IS NULL; -- Partial index per dati correntiLookup Fusioni Comunali:
CREATE TABLE silver.fusioni_comunali (
id SERIAL PRIMARY KEY,
codice_istat_old VARCHAR(6),
codice_istat_new VARCHAR(6),
nome_comune_old VARCHAR(255),
nome_comune_new VARCHAR(255),
data_fusione DATE,
tipo_operazione VARCHAR(20), -- 'fusione', 'separazione', 'modifica'
UNIQUE (codice_istat_old, codice_istat_new, data_fusione)
);Esempio Dati EAV
Comune AGLIÈ (001001) con attributi da multiple fonti:
-- Da ISTAT popolazione
INSERT INTO silver.comuni_attributi_eav VALUES
(1, '001001', 'popolazione_2024', '2635', 'istat', 'popolazione', 2024, '2024-02-18', NULL, ...),
(2, '001001', 'superficie_kmq', '13.98', 'istat', 'popolazione', 2024, '2024-02-18', NULL, ...),
(3, '001001', 'denominazione', 'AGLIÈ', 'istat', 'popolazione', 2024, '2024-02-18', NULL, ...);
-- Da MinLavoro tabacchi (PDF parsed with Docling)
INSERT INTO silver.comuni_attributi_eav VALUES
(4, '001001', 'num_tabacchi', '2', 'minlavoro', 'tabacchi', 2024, '2024-02-18', NULL, ...),
(5, '001001', 'tabacchi_ids', 'TAB001,TAB002', 'minlavoro', 'tabacchi', 2024, '2024-02-18', NULL, ...);
-- Da MinSalute strutture sanitarie
INSERT INTO silver.comuni_attributi_eav VALUES
(6, '001001', 'num_asl', '1', 'minsalute', 'strutture_sanitarie', 2023, '2024-01-15', NULL, ...),
(7, '001001', 'asl_denominazione', 'ASL TO4', 'minsalute', 'strutture_sanitarie', 2023, '2024-01-15', NULL, ...);
-- Da MinAmbiente ATO Gas (parsed from HTML)
INSERT INTO silver.comuni_attributi_eav VALUES
(8, '001001', 'ato_gas_id', 'ATO-PIE-01', 'minambiente', 'ato_gas', 2024, '2024-02-18', NULL, ...),
(9, '001001', 'ato_gas_gestore', 'SMAT S.p.A.', 'minambiente', 'ato_gas', 2024, '2024-02-18', NULL, ...);Trasformazioni Bronze → Silver
Pipeline Prefect standard:
@task(name="bronze-to-silver-transform")
def transform_bronze_to_silver(bronze_file_path: str, fonte: str, dataset: str, anno: int):
"""
Standard transformation pipeline: Bronze → Silver
Steps:
1. Parse: Read Bronze file (CSV/PDF/Excel/HTML)
2. Clean: Normalize encoding, trim whitespace, fix typos
3. Validate: Check data quality, enforce business rules
4. Load: Insert into Silver EAV schema
"""
# Step 1: Parse
if bronze_file_path.endswith('.csv'):
df = pd.read_csv(bronze_file_path)
elif bronze_file_path.endswith('.pdf'):
df = extract_pdf_tables(bronze_file_path) # Docling
elif bronze_file_path.endswith('.xlsx'):
df = pd.read_excel(bronze_file_path)
elif bronze_file_path.endswith('.html'):
df = parse_html_to_dataframe(bronze_file_path) # BeautifulSoup
# Step 2: Clean
df = clean_dataframe(df)
# Step 3: Validate with Great Expectations
validation_results = validate_dataframe(df, fonte, dataset)
if not validation_results.passed:
raise ValueError(f"Validation failed: {validation_results.errors}")
# Step 4: Load to Silver (EAV)
load_to_silver_eav(df, fonte, dataset, anno)Temporal Versioning
Use case: Gestire modifiche nel tempo (fusioni/scissioni comunali).
Esempio: Fusione comunale nel 2019 (Comune A + Comune B → Comune C)
-- Prima della fusione (2018)
INSERT INTO silver.comuni_attributi_eav VALUES
(100, '001234', 'popolazione', '1500', 'istat', 'popolazione', 2018, '2018-01-01', '2019-01-01', ...),
(101, '001235', 'popolazione', '800', 'istat', 'popolazione', 2018, '2018-01-01', '2019-01-01', ...);
-- Dopo la fusione (2019+)
INSERT INTO silver.comuni_attributi_eav VALUES
(102, '001236', 'popolazione', '2300', 'istat', 'popolazione', 2019, '2019-01-01', NULL, ...);
-- Query: Popolazione comune 001234 nel 2018
SELECT valore FROM silver.comuni_attributi_eav
WHERE codice_istat = '001234'
AND attributo = 'popolazione'
AND '2018-12-31' BETWEEN valid_from AND COALESCE(valid_to, '9999-12-31');
-- Result: 15002.1.6 Gold Layer: Business-Ready Analytics
Ruolo: Dati ottimizzati per use case specifici - aggregati, denormalizzati, arricchiti.
Principio chiave: "Optimize for queries, not for storage"
Caratteristiche Tecniche
| Aspetto | Specifica MAPS |
|---|---|
| Storage | PostgreSQL schema gold + PostGIS extensions |
| Formato | Tabelle denormalizzate (wide tables), spatial geometries |
| Data model | Domain-specific (comuni, DLS attractors, time-series) |
| Retention | Snapshot refreshed periodically (daily/weekly) |
| Backup | Snapshot giornalieri (ma rebuiltable da Silver) |
| Access pattern | Read-very-heavy (dashboards, APIs, analytics) |
Data Mart Principali
A. gold.comuni_aggregati (Wide Table)
Denormalizzazione di tutti attributi comuni per fast queries.
CREATE TABLE gold.comuni_aggregati (
-- Identificativi
codice_istat VARCHAR(6) PRIMARY KEY,
denominazione VARCHAR(255) NOT NULL,
denominazione_full VARCHAR(255), -- Con sigla provincia
-- Gerarchia amministrativa
codice_regione CHAR(2) NOT NULL,
denominazione_regione VARCHAR(100) NOT NULL,
codice_provincia CHAR(3),
denominazione_provincia VARCHAR(100),
sigla_provincia CHAR(2),
-- Attributi demografici (da ISTAT)
popolazione_2024 INTEGER,
popolazione_2023 INTEGER,
popolazione_2022 INTEGER,
crescita_popolazione_pct NUMERIC(5,2), -- Calculated: (2024-2023)/2023*100
-- Attributi geografici
superficie_kmq NUMERIC(10,2),
densita_abitanti_kmq NUMERIC(10,2), -- Calculated: pop/superficie
altitudine_m INTEGER,
zona_altimetrica VARCHAR(50), -- 'montagna', 'collina', 'pianura'
-- Geometria PostGIS
geometria GEOMETRY(MultiPolygon, 4326) NOT NULL,
centroide GEOMETRY(Point, 4326),
-- Servizi (da Ministeri)
num_strutture_sanitarie INTEGER DEFAULT 0,
num_asl INTEGER DEFAULT 0,
num_scuole_primarie INTEGER DEFAULT 0,
num_scuole_secondarie INTEGER DEFAULT 0,
num_tabacchi INTEGER DEFAULT 0,
num_uffici_postali INTEGER DEFAULT 0,
-- Infrastrutture (da OpenData)
ha_stazione_ferroviaria BOOLEAN DEFAULT FALSE,
ha_casello_autostradale BOOLEAN DEFAULT FALSE,
ha_aeroporto BOOLEAN DEFAULT FALSE,
-- Utilities (da MinAmbiente - parsed from HTML)
ato_gas_id VARCHAR(50),
ato_gas_denominazione VARCHAR(255),
ato_gas_gestore VARCHAR(255),
ato_acqua_id VARCHAR(50),
ato_rifiuti_id VARCHAR(50),
-- Attributi DLS (Calculated)
attractor_level VARCHAR(50), -- 'metropolitan', 'urban', 'semi-urban', 'rural'
cluster_dls_id INTEGER,
isochrone_60min GEOMETRY(MultiPolygon, 4326),
-- Metadata
last_updated TIMESTAMP NOT NULL DEFAULT NOW(),
data_completeness_pct NUMERIC(5,2), -- % attributi popolati
-- Constraints
CONSTRAINT valid_codice CHECK (codice_istat ~ '^\d{6}$')
);
-- Spatial indexes
CREATE INDEX idx_gold_comuni_geometria ON gold.comuni_aggregati USING GIST(geometria);
CREATE INDEX idx_gold_comuni_centroide ON gold.comuni_aggregati USING GIST(centroide);
CREATE INDEX idx_gold_comuni_isochrone ON gold.comuni_aggregati USING GIST(isochrone_60min);
-- Attribute indexes
CREATE INDEX idx_gold_comuni_regione ON gold.comuni_aggregati(codice_regione);
CREATE INDEX idx_gold_comuni_provincia ON gold.comuni_aggregati(codice_provincia);
CREATE INDEX idx_gold_comuni_attractor ON gold.comuni_aggregati(attractor_level);Esempio query (performance ottimale):
-- Query: Comuni in ATO Gas "ATO-PIE-01" con popolazione > 5000
SELECT
denominazione,
popolazione_2024,
ato_gas_gestore,
num_tabacchi,
attractor_level
FROM gold.comuni_aggregati
WHERE ato_gas_id = 'ATO-PIE-01' -- Da HTML MinAmbiente
AND popolazione_2024 > 5000
ORDER BY popolazione_2024 DESC;
-- Execution: Index scan su ato_gas_id, no joins, < 10msB. gold.dls_attractors (DLS Analysis)
Risultati analisi Daily Life Systems (attrattori territoriali).
CREATE TABLE gold.dls_attractors (
codice_istat VARCHAR(6) PRIMARY KEY REFERENCES gold.comuni_aggregati(codice_istat),
denominazione VARCHAR(255) NOT NULL,
-- Attractor classification
attractor_level VARCHAR(50) NOT NULL, -- 'metropolitan', 'urban', 'semi-urban', 'rural'
attractor_score NUMERIC(5,2), -- 0-100 score
-- Service availability (weighted scores)
servizi_sanitari_score NUMERIC(5,2),
servizi_educativi_score NUMERIC(5,2),
servizi_commerciali_score NUMERIC(5,2),
servizi_trasporti_score NUMERIC(5,2),
-- Isochrone analysis (60 min travel time)
isochrone_60min GEOMETRY(MultiPolygon, 4326),
comuni_raggiungibili_60min INTEGER[], -- Array codici ISTAT
popolazione_raggiungibile_60min INTEGER,
-- Clustering
cluster_id INTEGER NOT NULL,
cluster_centroid GEOMETRY(Point, 4326),
-- Metadata
calculation_date TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT valid_attractor_level CHECK (attractor_level IN ('metropolitan', 'urban', 'semi-urban', 'rural'))
);C. gold.time_series_popolazione (Temporal Aggregations)
Time-series aggregati per analisi trend.
CREATE TABLE gold.time_series_popolazione (
regione VARCHAR(100) NOT NULL,
anno INTEGER NOT NULL,
popolazione_totale BIGINT NOT NULL,
popolazione_media_comune INTEGER,
num_comuni INTEGER,
densita_media_kmq NUMERIC(10,2),
-- Calculated metrics
crescita_assoluta INTEGER, -- vs anno precedente
crescita_percentuale NUMERIC(5,2),
PRIMARY KEY (regione, anno)
);Trasformazioni Silver → Gold
Materialized views approach:
-- Refresh Gold tables da Silver (scheduled daily)
CREATE OR REPLACE FUNCTION gold.refresh_comuni_aggregati()
RETURNS void AS $$
BEGIN
-- Truncate e rebuild (snapshot approach)
TRUNCATE gold.comuni_aggregati;
-- Pivot EAV → Wide table
INSERT INTO gold.comuni_aggregati
SELECT
c.codice_istat,
c.denominazione,
c.geometria,
ST_Centroid(c.geometria) AS centroide,
-- Pivot attributi da Silver
MAX(CASE WHEN e.attributo = 'popolazione_2024' THEN e.valore::INTEGER END) AS popolazione_2024,
MAX(CASE WHEN e.attributo = 'popolazione_2023' THEN e.valore::INTEGER END) AS popolazione_2023,
MAX(CASE WHEN e.attributo = 'superficie_kmq' THEN e.valore::NUMERIC END) AS superficie_kmq,
-- Calculated fields
(MAX(CASE WHEN e.attributo = 'popolazione_2024' THEN e.valore::NUMERIC END) -
MAX(CASE WHEN e.attributo = 'popolazione_2023' THEN e.valore::NUMERIC END)) /
NULLIF(MAX(CASE WHEN e.attributo = 'popolazione_2023' THEN e.valore::NUMERIC END), 0) * 100
AS crescita_popolazione_pct,
-- ATO attributes (from HTML parsing)
MAX(CASE WHEN e.attributo = 'ato_gas_id' THEN e.valore END) AS ato_gas_id,
MAX(CASE WHEN e.attributo = 'ato_gas_gestore' THEN e.valore END) AS ato_gas_gestore,
NOW() AS last_updated
FROM
gold.comuni_anagrafica c
LEFT JOIN
silver.comuni_attributi_eav e ON e.codice_istat = c.codice_istat
AND e.valid_to IS NULL -- Solo dati correnti
GROUP BY
c.codice_istat, c.denominazione, c.geometria;
-- Analyze per query optimizer
ANALYZE gold.comuni_aggregati;
END;
$$ LANGUAGE plpgsql;
-- Schedule refresh (chiamato da Prefect flow daily)
SELECT gold.refresh_comuni_aggregati();2.1.7 Presentazione Interattiva
Per una visualizzazione interattiva dell'architettura Medallion e del pattern EAV con esempi specifici per MAPS:
📊 Architettura Dati MAPS (presentazione interattiva disponibile nel repository)
La presentazione include:
- Visualizzazione interattiva dei tre layer (Bronze/Silver/Gold)
- Comparazione EAV vs schema tradizionale
- Esempi concreti di trasformazioni per MAPS
- Gestione fusioni comunali e temporalità
2.1.8 EAV vs Schema Tradizionale: Comparazione
Perché EAV per MAPS?
Il progetto MAPS gestisce ~200 dataset con attributi eterogenei e variabili nel tempo. Comparazione tra modelli:
Schema Tradizionale (Wide Table)
| codice_istat | popolazione | n_asili | has_ospedale | n_scuole | ato_gas_id | ... |
|---|---|---|---|---|---|---|
| 001001 | 45230 | 12 | true | ? | ? | ... |
| 001002 | 8420 | 2 | false | ? | ? | ... |
Problemi:
- ❌ Colonne nulle per attributi non disponibili (storage sprecato)
- ❌ Schema migration ad ogni nuovo dataset
- ❌ Nessuna tracciabilità temporale
- ❌ Difficile gestire multipli data sources per stesso attributo
Schema EAV (Entity-Attribute-Value)
| entity | attribute | value | valid_from | source |
|---|---|---|---|---|
| 001001 | popolazione | 45230 | 2021-01-01 | ISTAT |
| 001001 | n_asili_nido | 12 | 2021-01-01 | ISTAT |
| 001001 | has_ospedale | true | 2015-01-01 | MinSalute |
| 001002 | popolazione | 8420 | 2021-01-01 | ISTAT |
| 001002 | cod_pre_fusione | 001045 | 2017-01-01 | ISTAT |
Vantaggi:
- ✅ Flessibilità: Aggiungi nuovi attributi senza schema migration
- ✅ Storage efficiente: Solo attributi presenti sono memorizzati
- ✅ Temporalità:
valid_from/valid_toper time-series - ✅ Lineage:
sourcetraccia provenienza dati - ✅ Multi-source: Stesso attributo da fonti diverse con timestamp
Trade-off:
- ⚠️ Query più complesse (richiede
CASE WHENpivot in Gold) - ⚠️ Performance: Più righe da scannare (mitigato da indici)
- ⚠️ Type inference: Valori TEXT, cast in Gold layer
Decisione per MAPS: EAV in Silver è il compromesso ottimale tra flessibilità e performance per gestire l'eterogeneità dei 200+ dataset.
2.2 Stack Tecnologico
2.2.1 Componenti Principali
| Componente | Tecnologia | Versione | Ruolo |
|---|---|---|---|
| Storage primario | PostgreSQL + PostGIS | 17 + 3.5 | Master data, operazioni spaziali |
| Orchestrazione | Prefect | 3.x | Scheduling, monitoring, lineage ETL |
| Analytics layer | DuckDB | 1.x | Query analytics (federation da PostgreSQL) |
| Governance interna | OpenMetadata | 1.x | Catalog interno, lineage, data quality, profiling |
| Catalogo open data | CKAN | 2.10+ | Catalogo pubblico open data, DCAT-AP_IT |
| Data quality | Great Expectations | 1.x | Validation, anomaly detection |
| PDF extraction | Docling | Latest | Estrazione tabelle da PDF (97.9% accuracy) |
| Object storage | MinIO (o GCS) | Latest | Raw files, PDFs, archivi Excel |
graph TB
subgraph "Data Sources"
A1[ISTAT CSV]
A2[Min PDF]
A3[HTML Tables]
end
subgraph "Orchestration"
B[Prefect
Worker Pools]
end
subgraph "Storage Layers"
C[Bronze
File System]
D[Silver
PostgreSQL EAV]
E[Gold
PostgreSQL + PostGIS]
end
subgraph "Internal Governance & Analytics"
F[OpenMetadata
Internal Catalog]
G[Great Expectations
Quality]
H[DuckDB
Analytics]
end
subgraph "Public Access"
I[CKAN
Open Data Catalog]
J[dati.gov.it]
end
A1 --> B
A2 --> B
A3 --> B
B --> C
C --> D
D --> E
D --> G
B -.metadata.-> F
E --> F
E --> H
E --> I
I --> J
2.2.2 Rationale delle Scelte Tecnologiche
PostgreSQL + PostGIS vs BigQuery/Cloud Data Warehouse
Scelta: PostgreSQL 17 + PostGIS 3.5 (self-hosted)
Motivazioni:
| Criterio | PostgreSQL + PostGIS | BigQuery | Decisione |
|---|---|---|---|
| Scala dati | Ottimizzato 10⁴-10⁶ righe | Ottimizzato petabyte | ✅ PostgreSQL (scala MAPS: ~10⁴ comuni × ~10³ attributi) |
| Operazioni spaziali | PostGIS = industry standard | BigQuery GIS = limitato | ✅ PostgreSQL (isocrone, buffer, intersezioni native) |
| Costi | Prevedibili (infra fissa) | Per-query pricing | ✅ PostgreSQL (~€2.4k/anno vs €600-6k+/anno imprevedibili) |
| Vendor lock-in | Zero | Alto | ✅ PostgreSQL (principio indipendenza) |
| Maturità | 30+ anni | ~15 anni | ✅ PostgreSQL (affidabilità rock-solid) |
| Integrazione | Universale | Ecosistema GCP | ✅ PostgreSQL (funziona con ogni tool) |
Quando BigQuery avrebbe senso:
- Volumi dati > 100GB per query
- Centinaia di utenti concorrenti
- Zero tolleranza ops (fully managed)
- Budget per costi analytics €5k+/anno
Nessuna di queste condizioni vale per MAPS.
DuckDB vs BigQuery per Analytics
Scelta: DuckDB (embedded) + PostgreSQL federation
Motivazioni:
| Criterio | DuckDB | BigQuery | Decisione |
|---|---|---|---|
| Dimensionamento | GB scale | TB/PB scale | ✅ DuckDB (volumi MAPS: ~50GB) |
| Costi | Zero | €5/TB query | ✅ DuckDB (risparmio €600-1,200/anno) |
| Performance | ms su GB | ms su TB | ✅ DuckDB (query < 10ms sul volume MAPS) |
| Federation | Legge da PostgreSQL | Richiede export | ✅ DuckDB (no ETL aggiuntivo) |
| Portabilità | File auto-contenuto | Vendor lock-in | ✅ DuckDB (export Parquet portabile) |
| Developer UX | Embedded Python | API REST | ✅ DuckDB (zero overhead setup) |
Prefect vs Airflow/Dagster
Scelta: Prefect 3.x (self-hosted)
Motivazioni:
| Criterio | Prefect | Airflow | Dagster | Decisione |
|---|---|---|---|---|
| Setup complexity | Basso | Alto | Medio | ✅ Prefect |
| Learning curve | Gentile | Ripida | Ripida | ✅ Prefect |
| Python-native | Completo | Parziale | Completo | ✅ Prefect |
| Time-to-value | Rapido (ore) | Lento (giorni) | Medio | ✅ Prefect |
| UI/UX | Moderna | Datata | Moderna | ✅ Prefect/Dagster |
| Overhead ops | Basso | Alto | Medio | ✅ Prefect |
Dettagli: Vedere Appendice A del documento di architettura piattaforma per comparazione completa.
Docling vs Alternative PDF Extraction
Scelta: Docling (IBM TableFormer) con fallback PyMuPDF/pdfplumber
Benchmark accuracy estrazione tabelle:
| Libreria | Accuracy | License | Active Dev | Pandas Native | Decisione |
|---|---|---|---|---|---|
| Docling (TableFormer AI) | 97.9% | MIT | ✅ 2025 (LF AI) | ✅ | ✅ Primary |
| pdfplumber | 85-90% | MIT | ✅ | ⚠️ (manual) | ✅ Fallback |
| PyMuPDF | 75-80% | AGPL-3.0 | ✅ | ⚠️ | ✅ Fallback |
| Camelot | 73% | MIT | ⚠️ Maintenance | ❌ | ❌ |
| Tabula | 67.9% | MIT | ⚠️ Maintenance | ❌ | ❌ |
| Azure Doc Intelligence | ~95% | Commercial | ✅ | ✅ | ❌ (vendor lock-in) |
Docling vantaggi:
- +24-30 punti percentuali su Camelot/Tabula
- MIT license vs AGPL PyMuPDF
- Active development (LF AI & Data Foundation, 2025)
- Pandas export nativo (integrazione Great Expectations)
- Self-hosted (no cloud dependency)
Dettagli: Vedere documento docs/briefing/comparazione-librerie-estrazione-pdf.md nel repository per benchmark completo delle 9 librerie analizzate.
OpenMetadata vs DataHub/Atlan/Atlas
Scelta: OpenMetadata 1.x (self-hosted)
Motivazioni:
| Criterio | OpenMetadata | DataHub | Atlan | Atlas | Decisione |
|---|---|---|---|---|---|
| Costo annuale | €1.2k (infra) | €1.8-2.4k | €12-30k (lic.) | €3-5k | ✅ OpenMetadata |
| Setup | 1 VM | 2-3 VMs | 1 VM | 11+ VMs | ✅ OpenMetadata |
| UI/UX | Moderna | Funzionale | Eccellente | Datata | ✅ OpenMetadata |
| Stack fit | Perfect | Buono | Buono | Hadoop-only | ✅ OpenMetadata |
| Vendor lock-in | Zero | Zero | Alto | Zero | ✅ OpenMetadata |
| Time-to-value | 1-2 giorni | 2-4 giorni | Medio | Settimane | ✅ OpenMetadata |
Dettagli: Vedere Appendice B del documento di architettura piattaforma per comparazione completa.
CKAN per Open Data vs Uso Esteso OpenMetadata
Scelta: CKAN 2.10+ (self-hosted) per catalogo pubblico + OpenMetadata per governance interna
Motivazioni architetturali:
| Criterio | CKAN | OpenMetadata esteso | Decisione |
|---|---|---|---|
| Target audience | Pubblico esterno | Data operators interni | ✅ CKAN (separation of concerns) |
| Standard DCAT-AP_IT | Nativo | Non supportato | ✅ CKAN (requisito WP6 D6.3) |
| Integrazione dati.gov.it | Built-in harvester | Richiede sviluppo custom | ✅ CKAN (API standard CSW/DCAT) |
| Portale user-friendly | Web UI ottimizzata pubblico | UI tecnica data engineers | ✅ CKAN (esperienza utente) |
| Rate limiting & API keys | Nativo | Limitato | ✅ CKAN (controllo accessi pubblici) |
| Dataset downloads | Multi-formato con preview | Solo metadata | ✅ CKAN (full data access) |
| SEO e discoverability | Ottimizzato motori ricerca | Non ottimizzato | ✅ CKAN (findability pubblica) |
| Licensing metadata | Completo (Creative Commons) | Basic | ✅ CKAN (compliance open data) |
Separazione delle responsabilità:
graph LR
subgraph "Internal - Data Operators"
A[Prefect Pipeline] --> B[OpenMetadata]
B --> C[Lineage Tracking]
B --> D[Data Quality Metrics]
B --> E[Schema Profiling]
end
subgraph "Public - External Users"
F[Gold Layer] --> G[CKAN]
G --> H[DCAT-AP_IT Metadata]
G --> I[dati.gov.it Harvester]
G --> J[Public Downloads]
end
F -.selected datasets.-> A
Workflow integrato:
Internal governance (OpenMetadata):
- Prefect popola automaticamente metadata durante pipeline execution
- Tracciamento lineage completo: Bronze → Silver → Gold
- Data quality checks con Great Expectations
- Schema profiling e anomaly detection
- Audience: Data engineers, analysts, QA team
Public catalog (CKAN):
- Sync automatico da Gold layer (solo dataset approvati per pubblicazione)
- Metadata arricchiti DCAT-AP_IT (licensing, temporal coverage, geographic extent)
- API pubblica per download programmatico
- Harvesting automatico verso dati.gov.it
- Audience: Cittadini, ricercatori, sviluppatori terzi, policy makers
Conformità contrattuale WP6 D6.3:
"Catalogo open data e metadata DCAT-AP_IT [...] Integrazione dati.gov.it [...] Portal open data dedicato"
CKAN soddisfa tutti questi requisiti out-of-the-box, mentre estendere OpenMetadata richiederebbe sviluppo custom significativo (~2-3 mesi/persona) senza garanzie di conformità DCAT-AP_IT.
Costi operativi:
| Voce | OpenMetadata solo | OpenMetadata + CKAN | Delta |
|---|---|---|---|
| Infra (VM) | €1.2k/anno | €1.8k/anno | +€600/anno |
| Sviluppo custom | €15-20k (DCAT-AP_IT) | €0 | -€15-20k |
| Manutenzione | Alta (custom code) | Bassa (standard stack) | Significativo |
Conclusione: CKAN è investimento minore (~€600/anno infra) che evita sviluppo custom (€15-20k) e garantisce compliance contrattuale WP6.
2.3 Architettura di Deployment
2.3.1 Deployment su Server op-linkurious
Infrastruttura:
- Server:
root@op-linkurious(8 CPU, 31GB RAM, 621GB disk) - Domains:
*.maps.deppsviluppo.org(Route53 DNS) - Network: Traefik reverse proxy su
gwnetwork
Componenti containerizzati (Docker Compose):
services:
postgres:
image: postgis/postgis:17-3.5
volumes:
- postgres-data:/var/lib/postgresql/data
- ./init-scripts:/docker-entrypoint-initdb.d
environment:
- POSTGRES_DB=maps_db
- POSTGRES_USER=maps
networks:
- gw
- maps-internal
prefect-server:
image: prefecthq/prefect:3-latest
command: prefect server start
volumes:
- prefect-data:/root/.prefect
networks:
- gw
- maps-internal
prefect-worker:
image: prefecthq/prefect:3-latest
command: prefect worker start --pool default-pool
volumes:
- ./flows:/flows
- ./data/bronze:/data/bronze
networks:
- maps-internal
openmetadata:
image: openmetadata/server:latest
depends_on:
- postgres
environment:
- OPENMETADATA_CLUSTER_NAME=maps-cluster
networks:
- gw
- maps-internal
ckan:
image: ckan/ckan-base:2.10
depends_on:
- postgres
- redis
environment:
- CKAN_SITE_URL=https://opendata.maps.deppsviluppo.org
- CKAN_SQLALCHEMY_URL=postgresql://ckan_user:${CKAN_DB_PASSWORD}@postgres/ckan_db
- CKAN_REDIS_URL=redis://redis:6379/1
volumes:
- ckan-storage:/var/lib/ckan
networks:
- gw
- maps-internal
redis:
image: redis:7-alpine
networks:
- maps-internal
volumes:
postgres-data:
prefect-data:
ckan-storage:
networks:
gw:
external: true
maps-internal:
driver: bridge2.3.2 Data Flow Completo
flowchart TD
A[External Sources] --> B{Prefect Orchestration}
B -->|Download| C[Bronze Layer
File System]
C -->|Parse & Clean| D[Silver Layer
PostgreSQL EAV]
D -->|Aggregate & Enrich| E[Gold Layer
PostgreSQL + PostGIS]
E -->|Internal Catalog| F[OpenMetadata]
E -->|Analytics| G[DuckDB]
E -->|Serve| H[Web API]
E -->|Public Catalog| I[CKAN]
F -.->|Lineage| B
F -.->|Quality Metrics| D
I -->|Harvest| J[dati.gov.it]
style C fill:#cd7f32,color:#000
style D fill:#c0c0c0,color:#000
style E fill:#ffd700,color:#000
2.3.3 Security Architecture
Network Isolation:
maps-internal: Comunicazione inter-servizi (PostgreSQL, Prefect workers)gw: Traefik reverse proxy per accesso esterno HTTPS
Access Control:
-- PostgreSQL roles
CREATE ROLE maps_writer;
GRANT INSERT, UPDATE ON ALL TABLES IN SCHEMA bronze TO maps_writer;
GRANT INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA silver TO maps_writer;
GRANT SELECT ON ALL TABLES IN SCHEMA gold TO maps_writer;
CREATE ROLE maps_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA gold TO maps_reader;
CREATE ROLE maps_api;
GRANT SELECT ON gold.comuni_aggregati TO maps_api;
GRANT SELECT ON gold.dls_attractors TO maps_api;Secrets Management:
.envfile con credenziali (.gitignore)- Prefect Secret blocks per API keys
- PostgreSQL password rotation via
pg_passfile
2.4 Data Lineage e Governance
2.4.1 Lineage Tracking con OpenMetadata
Flow completo esempio HTML source:
graph LR
A[External HTML
MinAmbiente ATO Gas] --> B[Prefect Flow
download_html_page]
B --> C[Bronze
ato_gas_page.html]
C --> D[Prefect Task
parse_html_to_dataframe]
D --> E[Silver
comuni_attributi_eav]
E --> F[SQL Function
refresh_comuni_aggregati]
F --> G[Gold
comuni_aggregati]
G --> H[Metabase Dashboard
Servizi Utilities]
style C fill:#cd7f32,color:#000
style E fill:#c0c0c0,color:#000
style G fill:#ffd700,color:#000
OpenMetadata registration:
# Register Bronze HTML as external table
tracker.register_table(
schema="bronze",
table="minambiente_ato_gas_html",
columns=[{"name": "html_content", "dataType": "TEXT"}],
owner="guglielmo.celata@gransassotech.org",
description="Raw HTML page from MinAmbiente with ATO Gas table",
tags=["minambiente", "ato_gas", "bronze", "html"]
)
# Add lineage
tracker.add_lineage(
source_table="bronze.minambiente_ato_gas_html",
target_table="silver.comuni_attributi_eav",
description="Parse HTML table with BeautifulSoup, extract ATO data",
pipeline="minambiente-ato-gas-flow"
)2.4.2 Data Quality Metrics
Silver layer quality checks (Great Expectations):
def log_silver_quality_metrics(df: pd.DataFrame, fonte: str, dataset: str):
"""Log data quality to OpenMetadata dopo ingestion Silver"""
metrics = {
"row_count": len(df),
"null_count": df.isnull().sum().sum(),
"completeness": 1 - (df.isnull().sum().sum() / df.size),
"duplicate_count": df.duplicated().sum(),
"unique_comuni": df['codice_istat'].nunique(),
"timestamp": datetime.utcnow().isoformat()
}
# Log to OpenMetadata
tracker = get_metadata_tracker()
tracker.log_data_quality(
schema="silver",
table="comuni_attributi_eav",
metrics=metrics
)
# Alert se quality degraded
if metrics["completeness"] < 0.95:
send_alert(f"Data completeness low: {metrics['completeness']:.1%}")2.5 Best Practices Implementate
2.5.1 Idempotenza
Requisito: Re-esecuzione flow non deve generare duplicati o inconsistenze.
Implementazione:
-- Silver: UPSERT con ON CONFLICT
INSERT INTO silver.comuni_attributi_eav
(codice_istat, attributo, valore, fonte, dataset, anno_rif, valid_from)
VALUES (%s, %s, %s, %s, %s, %s, CURRENT_DATE)
ON CONFLICT (codice_istat, attributo, fonte, dataset, anno_rif)
DO UPDATE SET
valore = EXCLUDED.valore,
valid_to = NULL,
updated_at = CURRENT_TIMESTAMP;2.5.2 Incremental Loading
Strategia: Silver usa temporal versioning, Gold usa snapshot refresh.
# Silver: Incremental append (new records only)
def load_to_silver_incremental(df, fonte, dataset, anno):
existing_keys = get_existing_keys(fonte, dataset, anno)
new_records = df[~df['codice_istat'].isin(existing_keys)]
# Insert solo nuovi record
# Gold: Full refresh periodico (daily snapshot)
def refresh_gold():
# Truncate + rebuild da Silver
gold.refresh_comuni_aggregati()2.5.3 Schema Evolution
Gestione backward compatibility:
-- Aggiunta nuova colonna Gold (non-breaking)
ALTER TABLE gold.comuni_aggregati
ADD COLUMN num_biblioteche INTEGER DEFAULT 0;
-- Update da nuova fonte
UPDATE gold.comuni_aggregati c
SET num_biblioteche = (
SELECT COUNT(*)
FROM silver.comuni_attributi_eav e
WHERE e.codice_istat = c.codice_istat
AND e.fonte = 'mincultura'
AND e.dataset = 'biblioteche'
);
-- Alert in OpenMetadata: schema changed2.6 Metriche e Monitoraggio
2.6.1 KPIs Architettura Medallion
| Metrica | Target | Strumento |
|---|---|---|
| Bronze storage growth | < 5GB/anno | File system monitoring |
| Silver ingestion latency | < 5 min per flow | Prefect execution logs |
| Silver data quality | > 95% completeness | Great Expectations |
| Gold refresh time | < 30 min | PostgreSQL query logs |
| Gold query performance | < 100ms p95 | pg_stat_statements |
| Lineage coverage | 100% flows tracked | OpenMetadata API |
| HTML parsing success rate | > 98% | Prefect task success rate |
2.6.2 Monitoring Dashboard
Query Metabase: Medallion health check
SELECT
'Bronze' AS layer,
COUNT(*) AS num_files,
SUM(file_size_bytes) / 1024 / 1024 / 1024 AS size_gb,
COUNT(CASE WHEN status = 'failed' THEN 1 END) AS failed_ingestions
FROM bronze.ingestion_log
UNION ALL
SELECT
'Silver',
COUNT(DISTINCT (codice_istat, fonte, dataset)),
pg_total_relation_size('silver.comuni_attributi_eav') / 1024 / 1024 / 1024,
NULL
FROM silver.comuni_attributi_eav
UNION ALL
SELECT
'Gold',
COUNT(*),
pg_total_relation_size('gold.comuni_aggregati') / 1024 / 1024 / 1024,
NULL
FROM gold.comuni_aggregati;Prossimo capitolo: Solution Design Cloud - Architettura deployment dettagliata, resource allocation, networking e security hardening.