Skip to content

Architettura Tecnica Data-Lake

Deliverable D2.1.1: Documento Progettazione Tecnica Data-Lake

2.1 Pattern Medallion: Bronze → Silver → Gold

2.1.1 Visione d'Insieme

L'architettura del Data Lake MAPS adotta il pattern Medallion (anche denominato Multi-Hop Architecture), best practice nell'ecosistema Modern Data Stack, che organizza i dati in tre layer di progressivo raffinamento con crescente livello di qualità, pulizia e business-readiness:

FONTI ESTERNE → BRONZE → SILVER → GOLD → APPLICAZIONI

2.1.2 Rationale della Scelta

Per il progetto MAPS, l'architettura Medallion è particolarmente indicata per le seguenti motivazioni:

A. Eterogeneità delle Fonti Dati

  • ~200 dataset da fonti pubbliche diverse (ISTAT, Ministeri, OpenData)
  • Formati multipli: CSV, Excel, PDF, JSON, HTML
  • Qualità variabile: da dataset strutturati a documenti semi-strutturati
  • Necessità: Layer progressivi per standardizzare gradualmente l'eterogeneità

B. Requisiti di Audit e Compliance

  • Dati pubblici ma necessità di tracciabilità per ricerca scientifica
  • GDPR Article 30: documentazione delle attività di trattamento dati
  • Necessità: Bronze layer immutabile come "source of truth" originale

C. Complessità delle Trasformazioni

  • Pipeline multi-step: parsing PDF → validazione → normalizzazione EAV → aggregazioni territoriali
  • Time-series con fusioni/scissioni comunali (2010-2025)
  • Necessità: Separazione logica tra raw ingestion, cleaning e business logic

D. Riutilizzo dei Dati

  • Stesso dataset usato per multiple analisi (demografia, servizi, mobilità)
  • Necessità di evitare re-processing da fonte esterna ogni volta
  • Necessità: Silver layer come cache enterprise validata

E. Performance e Scalabilità

  • Query analitiche su 8.000+ comuni con decine di attributi
  • Spatial operations PostGIS computazionalmente intensive
  • Necessità: Gold layer denormalizzato per fast queries

2.1.3 Data Flow Completo

graph TB
    A[Fonti Esterne
ISTAT, GTFS, Ministeri, OpenData] --> B[Prefect Orchestration
Worker Pools: istat, pdf, analytics] B --> C[Bronze Layer
File System - Raw Files] C --> D[Silver Layer
PostgreSQL - EAV Schema] D --> E[Gold Layer
PostgreSQL + PostGIS] B -.metadata.-> F[OpenMetadata
Internal Governance] E --> F E --> G[DuckDB
Analytics] E --> H[API v2
Web Apps] E --> I[CKAN
Open Data Catalog] I --> J[dati.gov.it
National Portal] style C fill:#cd7f32 style D fill:#c0c0c0 style E fill:#ffd700

2.1.4 Bronze Layer: Raw Data Archive

Ruolo: Archivio immutabile dei dati originali esattamente come scaricati dalla fonte.

Principio chiave: "Never modify, always preserve"

Caratteristiche Tecniche

AspettoSpecifica MAPS
StorageFile system locale /data/bronze/ (server op-linkurious)
FormatoFile originali senza trasformazioni (CSV, XLSX, PDF, JSON, HTML)
Naming convention/data/bronze/{fonte}/{anno}/{dataset}_{timestamp}.{ext}
Retention policyIndefinita (storage cost è basso: ~50GB totali per 200 dataset)
BackupSnapshot giornalieri via backup.sh script
Access patternWrite-once, read-rarely (solo per reprocessing o audit)

Struttura Directory

bash
/data/bronze/
├── istat/
   ├── 2024/
   ├── popolazione_comuni_20240218.csv
   ├── pendolarismo_matrix_20240218.csv
   ├── confini_amministrativi_20240218.geojson
   └── _metadata/
       ├── popolazione_comuni_20240218.json   # Metadata file
       └── checksums.sha256
   └── 2023/
       └── popolazione_comuni_20230315.csv
├── minlavoro/
   └── 2024/
       ├── tabacchi_adm_report_20240218.pdf
       └── _metadata/
           └── tabacchi_adm_report_20240218.json
├── minsalute/
   └── 2023/
       ├── strutture_sanitarie_asl_20231120.xlsx
       └── _metadata/
           └── strutture_sanitarie_asl_20231120.json
└── minambiente/
    └── 2024/
        ├── ato_gas_page_20240115.html          # ← HTML file
        └── _metadata/
            └── ato_gas_page_20240115.json

Metadata Tracking

Ogni file Bronze ha un corrispondente file JSON con metadata:

Esempio: /data/bronze/istat/2024/_metadata/popolazione_comuni_20240218.json

json
{
  "file_path": "/data/bronze/istat/2024/popolazione_comuni_20240218.csv",
  "fonte": "istat",
  "dataset": "popolazione",
  "anno_riferimento": 2024,
  "download_timestamp": "2024-02-18T03:00:15Z",
  "download_url": "https://www.istat.it/storage/cartografia/popolazione_comuni_2024.csv",
  "file_size_bytes": 3355482,
  "file_hash_sha256": "a3f5b8c9d2e1f4a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0",
  "mime_type": "text/csv",
  "encoding": "UTF-8",
  "rows_detected": 7901,
  "columns_detected": 12,
  "prefect_flow_run_id": "abc123-456-def-789",
  "ingestion_status": "completed"
}

Database tracking (PostgreSQL):

sql
-- Schema: bronze
CREATE TABLE bronze.ingestion_log (
    id BIGSERIAL PRIMARY KEY,
    fonte VARCHAR(50) NOT NULL,
    dataset VARCHAR(100) NOT NULL,
    anno_rif INTEGER NOT NULL,
    file_path TEXT NOT NULL,
    file_size_bytes BIGINT,
    file_hash_sha256 CHAR(64),
    download_url TEXT,
    download_timestamp TIMESTAMP NOT NULL,
    prefect_flow_run_id UUID,
    status VARCHAR(20) NOT NULL,  -- 'completed', 'failed', 'in_progress'
    error_message TEXT,
    created_at TIMESTAMP DEFAULT NOW(),

    UNIQUE (fonte, dataset, anno_rif, file_hash_sha256)
);

-- Index per query frequenti
CREATE INDEX idx_ingestion_log_fonte_dataset ON bronze.ingestion_log(fonte, dataset);
CREATE INDEX idx_ingestion_log_status ON bronze.ingestion_log(status);
CREATE INDEX idx_ingestion_log_timestamp ON bronze.ingestion_log(download_timestamp DESC);

Garanzie Bronze Layer

Immutabilità:

  • File Bronze non vengono mai modificati dopo scrittura
  • Re-download stesso dataset → nuovo file con timestamp diverso
  • History completa: tutti i download conservati

Idempotenza:

  • Re-esecuzione flow → skip se file con stesso hash già presente
  • Deduplication basata su (fonte, dataset, anno_rif, file_hash)

Disaster Recovery:

  • Bronze è la "golden copy" per rebuild completo
  • Se Silver/Gold si corrompono → reprocess da Bronze
  • Backup script: bash /root/maps-docker/backup.sh

Caso d'Uso: HTML → Structured Data

Scenario reale: Molte fonti pubbliche italiane pubblicano dati come tabelle HTML embedded in pagine web invece di file CSV scaricabili.

Esempio concreto: MinAmbiente pubblica elenco ATO Gas come tabella HTML su pagina web (no CSV/Excel disponibile).

Flusso Bronze-HTML:

sequenceDiagram
    participant S as Source Website
    participant P as Prefect Flow
    participant B as Bronze Layer
    participant SI as Silver Layer

    S->>P: HTTP GET HTML page
    P->>B: Save raw HTML file
    P->>B: Log metadata JSON
    Note over B: HTML preserved unchanged
    P->>P: Parse HTML with BeautifulSoup
    P->>P: Clean & Validate DataFrame
    P->>SI: Load to EAV schema

Vantaggi Pattern Bronze-HTML:

Scenario senza Bronze layer:
  → Script scarica HTML e parsa immediatamente
  → Se parsing fallisce (HTML structure changed) → dati persi
  → Se server web va offline → impossibile reprocessare

Scenario con Bronze layer:
  → HTML salvato in Bronze (immutabile)
  → Parsing fallisce? → Fix parser, reprocess da Bronze
  → Server offline? → Bronze ha copia originale
  → HTML structure cambia? → Version history in Bronze

2.1.5 Silver Layer: Cleaned & Validated Data

Ruolo: Single Source of Truth (SSOT) enterprise - dati puliti, validati, normalizzati.

Principio chiave: "Trust but verify, then store"

Caratteristiche Tecniche

AspettoSpecifica MAPS
StoragePostgreSQL schema silver
FormatoTabelle relazionali normalizzate (EAV schema)
Data modelEntity-Attribute-Value per gestire eterogeneità
RetentionTemporal versioning (valid_from, valid_to) per time-series
BackupSnapshot giornalieri PostgreSQL + WAL archiving
Access patternRead-heavy (queries analitiche), write-moderate (batch ingestion)

Schema EAV (Entity-Attribute-Value)

Rationale: Il progetto MAPS gestisce ~200 dataset con attributi eterogenei (popolazione, servizi, infrastrutture). Un modello EAV offre flessibilità senza continue schema migrations.

erDiagram
    COMUNI_ANAGRAFICA ||--o{ COMUNI_ATTRIBUTI_EAV : has
    FUSIONI_COMUNALI ||--o{ COMUNI_ANAGRAFICA : affects

    COMUNI_ANAGRAFICA {
        varchar codice_istat PK
        varchar denominazione
        varchar codice_regione
        geometry geometria
    }

    COMUNI_ATTRIBUTI_EAV {
        bigserial id PK
        varchar codice_istat FK
        varchar attributo
        text valore
        varchar fonte
        varchar dataset
        integer anno_rif
        date valid_from
        date valid_to
    }

    FUSIONI_COMUNALI {
        serial id PK
        varchar codice_istat_old
        varchar codice_istat_new
        date data_fusione
        varchar tipo_operazione
    }

Schema principale:

sql
-- Schema: silver
CREATE TABLE silver.comuni_attributi_eav (
    -- Primary key
    id BIGSERIAL PRIMARY KEY,

    -- Entity: Comune
    codice_istat VARCHAR(6) NOT NULL REFERENCES gold.comuni_anagrafica(codice_istat),

    -- Attribute: Nome attributo
    attributo VARCHAR(255) NOT NULL,

    -- Value: Valore come testo (type inference in Gold)
    valore TEXT NOT NULL,

    -- Metadata: Provenienza dati
    fonte VARCHAR(50) NOT NULL,        -- 'istat', 'minlavoro', 'minsalute', etc.
    dataset VARCHAR(100) NOT NULL,     -- 'popolazione', 'tabacchi', 'asl', etc.
    anno_rif INTEGER NOT NULL,

    -- Temporal validity
    valid_from DATE NOT NULL DEFAULT CURRENT_DATE,
    valid_to DATE,

    -- Audit trail
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
    created_by VARCHAR(100),           -- Prefect flow name

    -- Constraints
    CONSTRAINT unique_attribute_version
        UNIQUE (codice_istat, attributo, fonte, dataset, anno_rif),
    CONSTRAINT valid_temporal_range
        CHECK (valid_to IS NULL OR valid_to >= valid_from),
    CONSTRAINT valid_anno_rif
        CHECK (anno_rif BETWEEN 2000 AND 2100)
);

-- Indexes for performance
CREATE INDEX idx_silver_codice ON silver.comuni_attributi_eav(codice_istat);
CREATE INDEX idx_silver_attributo ON silver.comuni_attributi_eav(attributo);
CREATE INDEX idx_silver_fonte_dataset ON silver.comuni_attributi_eav(fonte, dataset);
CREATE INDEX idx_silver_anno ON silver.comuni_attributi_eav(anno_rif);
CREATE INDEX idx_silver_valid ON silver.comuni_attributi_eav(valid_from, valid_to)
    WHERE valid_to IS NULL;  -- Partial index per dati correnti

Lookup Fusioni Comunali:

sql
CREATE TABLE silver.fusioni_comunali (
    id SERIAL PRIMARY KEY,
    codice_istat_old VARCHAR(6),
    codice_istat_new VARCHAR(6),
    nome_comune_old VARCHAR(255),
    nome_comune_new VARCHAR(255),
    data_fusione DATE,
    tipo_operazione VARCHAR(20), -- 'fusione', 'separazione', 'modifica'

    UNIQUE (codice_istat_old, codice_istat_new, data_fusione)
);

Esempio Dati EAV

Comune AGLIÈ (001001) con attributi da multiple fonti:

sql
-- Da ISTAT popolazione
INSERT INTO silver.comuni_attributi_eav VALUES
(1, '001001', 'popolazione_2024', '2635', 'istat', 'popolazione', 2024, '2024-02-18', NULL, ...),
(2, '001001', 'superficie_kmq', '13.98', 'istat', 'popolazione', 2024, '2024-02-18', NULL, ...),
(3, '001001', 'denominazione', 'AGLIÈ', 'istat', 'popolazione', 2024, '2024-02-18', NULL, ...);

-- Da MinLavoro tabacchi (PDF parsed with Docling)
INSERT INTO silver.comuni_attributi_eav VALUES
(4, '001001', 'num_tabacchi', '2', 'minlavoro', 'tabacchi', 2024, '2024-02-18', NULL, ...),
(5, '001001', 'tabacchi_ids', 'TAB001,TAB002', 'minlavoro', 'tabacchi', 2024, '2024-02-18', NULL, ...);

-- Da MinSalute strutture sanitarie
INSERT INTO silver.comuni_attributi_eav VALUES
(6, '001001', 'num_asl', '1', 'minsalute', 'strutture_sanitarie', 2023, '2024-01-15', NULL, ...),
(7, '001001', 'asl_denominazione', 'ASL TO4', 'minsalute', 'strutture_sanitarie', 2023, '2024-01-15', NULL, ...);

-- Da MinAmbiente ATO Gas (parsed from HTML)
INSERT INTO silver.comuni_attributi_eav VALUES
(8, '001001', 'ato_gas_id', 'ATO-PIE-01', 'minambiente', 'ato_gas', 2024, '2024-02-18', NULL, ...),
(9, '001001', 'ato_gas_gestore', 'SMAT S.p.A.', 'minambiente', 'ato_gas', 2024, '2024-02-18', NULL, ...);

Trasformazioni Bronze → Silver

Pipeline Prefect standard:

python
@task(name="bronze-to-silver-transform")
def transform_bronze_to_silver(bronze_file_path: str, fonte: str, dataset: str, anno: int):
    """
    Standard transformation pipeline: Bronze → Silver

    Steps:
    1. Parse: Read Bronze file (CSV/PDF/Excel/HTML)
    2. Clean: Normalize encoding, trim whitespace, fix typos
    3. Validate: Check data quality, enforce business rules
    4. Load: Insert into Silver EAV schema
    """

    # Step 1: Parse
    if bronze_file_path.endswith('.csv'):
        df = pd.read_csv(bronze_file_path)
    elif bronze_file_path.endswith('.pdf'):
        df = extract_pdf_tables(bronze_file_path)  # Docling
    elif bronze_file_path.endswith('.xlsx'):
        df = pd.read_excel(bronze_file_path)
    elif bronze_file_path.endswith('.html'):
        df = parse_html_to_dataframe(bronze_file_path)  # BeautifulSoup

    # Step 2: Clean
    df = clean_dataframe(df)

    # Step 3: Validate with Great Expectations
    validation_results = validate_dataframe(df, fonte, dataset)
    if not validation_results.passed:
        raise ValueError(f"Validation failed: {validation_results.errors}")

    # Step 4: Load to Silver (EAV)
    load_to_silver_eav(df, fonte, dataset, anno)

Temporal Versioning

Use case: Gestire modifiche nel tempo (fusioni/scissioni comunali).

Esempio: Fusione comunale nel 2019 (Comune A + Comune B → Comune C)

sql
-- Prima della fusione (2018)
INSERT INTO silver.comuni_attributi_eav VALUES
(100, '001234', 'popolazione', '1500', 'istat', 'popolazione', 2018, '2018-01-01', '2019-01-01', ...),
(101, '001235', 'popolazione', '800', 'istat', 'popolazione', 2018, '2018-01-01', '2019-01-01', ...);

-- Dopo la fusione (2019+)
INSERT INTO silver.comuni_attributi_eav VALUES
(102, '001236', 'popolazione', '2300', 'istat', 'popolazione', 2019, '2019-01-01', NULL, ...);

-- Query: Popolazione comune 001234 nel 2018
SELECT valore FROM silver.comuni_attributi_eav
WHERE codice_istat = '001234'
  AND attributo = 'popolazione'
  AND '2018-12-31' BETWEEN valid_from AND COALESCE(valid_to, '9999-12-31');
-- Result: 1500

2.1.6 Gold Layer: Business-Ready Analytics

Ruolo: Dati ottimizzati per use case specifici - aggregati, denormalizzati, arricchiti.

Principio chiave: "Optimize for queries, not for storage"

Caratteristiche Tecniche

AspettoSpecifica MAPS
StoragePostgreSQL schema gold + PostGIS extensions
FormatoTabelle denormalizzate (wide tables), spatial geometries
Data modelDomain-specific (comuni, DLS attractors, time-series)
RetentionSnapshot refreshed periodically (daily/weekly)
BackupSnapshot giornalieri (ma rebuiltable da Silver)
Access patternRead-very-heavy (dashboards, APIs, analytics)

Data Mart Principali

A. gold.comuni_aggregati (Wide Table)

Denormalizzazione di tutti attributi comuni per fast queries.

sql
CREATE TABLE gold.comuni_aggregati (
    -- Identificativi
    codice_istat VARCHAR(6) PRIMARY KEY,
    denominazione VARCHAR(255) NOT NULL,
    denominazione_full VARCHAR(255),  -- Con sigla provincia

    -- Gerarchia amministrativa
    codice_regione CHAR(2) NOT NULL,
    denominazione_regione VARCHAR(100) NOT NULL,
    codice_provincia CHAR(3),
    denominazione_provincia VARCHAR(100),
    sigla_provincia CHAR(2),

    -- Attributi demografici (da ISTAT)
    popolazione_2024 INTEGER,
    popolazione_2023 INTEGER,
    popolazione_2022 INTEGER,
    crescita_popolazione_pct NUMERIC(5,2),  -- Calculated: (2024-2023)/2023*100

    -- Attributi geografici
    superficie_kmq NUMERIC(10,2),
    densita_abitanti_kmq NUMERIC(10,2),  -- Calculated: pop/superficie
    altitudine_m INTEGER,
    zona_altimetrica VARCHAR(50),  -- 'montagna', 'collina', 'pianura'

    -- Geometria PostGIS
    geometria GEOMETRY(MultiPolygon, 4326) NOT NULL,
    centroide GEOMETRY(Point, 4326),

    -- Servizi (da Ministeri)
    num_strutture_sanitarie INTEGER DEFAULT 0,
    num_asl INTEGER DEFAULT 0,
    num_scuole_primarie INTEGER DEFAULT 0,
    num_scuole_secondarie INTEGER DEFAULT 0,
    num_tabacchi INTEGER DEFAULT 0,
    num_uffici_postali INTEGER DEFAULT 0,

    -- Infrastrutture (da OpenData)
    ha_stazione_ferroviaria BOOLEAN DEFAULT FALSE,
    ha_casello_autostradale BOOLEAN DEFAULT FALSE,
    ha_aeroporto BOOLEAN DEFAULT FALSE,

    -- Utilities (da MinAmbiente - parsed from HTML)
    ato_gas_id VARCHAR(50),
    ato_gas_denominazione VARCHAR(255),
    ato_gas_gestore VARCHAR(255),
    ato_acqua_id VARCHAR(50),
    ato_rifiuti_id VARCHAR(50),

    -- Attributi DLS (Calculated)
    attractor_level VARCHAR(50),  -- 'metropolitan', 'urban', 'semi-urban', 'rural'
    cluster_dls_id INTEGER,
    isochrone_60min GEOMETRY(MultiPolygon, 4326),

    -- Metadata
    last_updated TIMESTAMP NOT NULL DEFAULT NOW(),
    data_completeness_pct NUMERIC(5,2),  -- % attributi popolati

    -- Constraints
    CONSTRAINT valid_codice CHECK (codice_istat ~ '^\d{6}$')
);

-- Spatial indexes
CREATE INDEX idx_gold_comuni_geometria ON gold.comuni_aggregati USING GIST(geometria);
CREATE INDEX idx_gold_comuni_centroide ON gold.comuni_aggregati USING GIST(centroide);
CREATE INDEX idx_gold_comuni_isochrone ON gold.comuni_aggregati USING GIST(isochrone_60min);

-- Attribute indexes
CREATE INDEX idx_gold_comuni_regione ON gold.comuni_aggregati(codice_regione);
CREATE INDEX idx_gold_comuni_provincia ON gold.comuni_aggregati(codice_provincia);
CREATE INDEX idx_gold_comuni_attractor ON gold.comuni_aggregati(attractor_level);

Esempio query (performance ottimale):

sql
-- Query: Comuni in ATO Gas "ATO-PIE-01" con popolazione > 5000
SELECT
    denominazione,
    popolazione_2024,
    ato_gas_gestore,
    num_tabacchi,
    attractor_level
FROM gold.comuni_aggregati
WHERE ato_gas_id = 'ATO-PIE-01'  -- Da HTML MinAmbiente
  AND popolazione_2024 > 5000
ORDER BY popolazione_2024 DESC;

-- Execution: Index scan su ato_gas_id, no joins, < 10ms

B. gold.dls_attractors (DLS Analysis)

Risultati analisi Daily Life Systems (attrattori territoriali).

sql
CREATE TABLE gold.dls_attractors (
    codice_istat VARCHAR(6) PRIMARY KEY REFERENCES gold.comuni_aggregati(codice_istat),
    denominazione VARCHAR(255) NOT NULL,

    -- Attractor classification
    attractor_level VARCHAR(50) NOT NULL,  -- 'metropolitan', 'urban', 'semi-urban', 'rural'
    attractor_score NUMERIC(5,2),  -- 0-100 score

    -- Service availability (weighted scores)
    servizi_sanitari_score NUMERIC(5,2),
    servizi_educativi_score NUMERIC(5,2),
    servizi_commerciali_score NUMERIC(5,2),
    servizi_trasporti_score NUMERIC(5,2),

    -- Isochrone analysis (60 min travel time)
    isochrone_60min GEOMETRY(MultiPolygon, 4326),
    comuni_raggiungibili_60min INTEGER[],  -- Array codici ISTAT
    popolazione_raggiungibile_60min INTEGER,

    -- Clustering
    cluster_id INTEGER NOT NULL,
    cluster_centroid GEOMETRY(Point, 4326),

    -- Metadata
    calculation_date TIMESTAMP NOT NULL DEFAULT NOW(),

    CONSTRAINT valid_attractor_level CHECK (attractor_level IN ('metropolitan', 'urban', 'semi-urban', 'rural'))
);

C. gold.time_series_popolazione (Temporal Aggregations)

Time-series aggregati per analisi trend.

sql
CREATE TABLE gold.time_series_popolazione (
    regione VARCHAR(100) NOT NULL,
    anno INTEGER NOT NULL,
    popolazione_totale BIGINT NOT NULL,
    popolazione_media_comune INTEGER,
    num_comuni INTEGER,
    densita_media_kmq NUMERIC(10,2),

    -- Calculated metrics
    crescita_assoluta INTEGER,  -- vs anno precedente
    crescita_percentuale NUMERIC(5,2),

    PRIMARY KEY (regione, anno)
);

Trasformazioni Silver → Gold

Materialized views approach:

sql
-- Refresh Gold tables da Silver (scheduled daily)
CREATE OR REPLACE FUNCTION gold.refresh_comuni_aggregati()
RETURNS void AS $$
BEGIN
    -- Truncate e rebuild (snapshot approach)
    TRUNCATE gold.comuni_aggregati;

    -- Pivot EAV → Wide table
    INSERT INTO gold.comuni_aggregati
    SELECT
        c.codice_istat,
        c.denominazione,
        c.geometria,
        ST_Centroid(c.geometria) AS centroide,

        -- Pivot attributi da Silver
        MAX(CASE WHEN e.attributo = 'popolazione_2024' THEN e.valore::INTEGER END) AS popolazione_2024,
        MAX(CASE WHEN e.attributo = 'popolazione_2023' THEN e.valore::INTEGER END) AS popolazione_2023,
        MAX(CASE WHEN e.attributo = 'superficie_kmq' THEN e.valore::NUMERIC END) AS superficie_kmq,

        -- Calculated fields
        (MAX(CASE WHEN e.attributo = 'popolazione_2024' THEN e.valore::NUMERIC END) -
         MAX(CASE WHEN e.attributo = 'popolazione_2023' THEN e.valore::NUMERIC END)) /
         NULLIF(MAX(CASE WHEN e.attributo = 'popolazione_2023' THEN e.valore::NUMERIC END), 0) * 100
            AS crescita_popolazione_pct,

        -- ATO attributes (from HTML parsing)
        MAX(CASE WHEN e.attributo = 'ato_gas_id' THEN e.valore END) AS ato_gas_id,
        MAX(CASE WHEN e.attributo = 'ato_gas_gestore' THEN e.valore END) AS ato_gas_gestore,

        NOW() AS last_updated
    FROM
        gold.comuni_anagrafica c
    LEFT JOIN
        silver.comuni_attributi_eav e ON e.codice_istat = c.codice_istat
            AND e.valid_to IS NULL  -- Solo dati correnti
    GROUP BY
        c.codice_istat, c.denominazione, c.geometria;

    -- Analyze per query optimizer
    ANALYZE gold.comuni_aggregati;
END;
$$ LANGUAGE plpgsql;

-- Schedule refresh (chiamato da Prefect flow daily)
SELECT gold.refresh_comuni_aggregati();

2.1.7 Presentazione Interattiva

Per una visualizzazione interattiva dell'architettura Medallion e del pattern EAV con esempi specifici per MAPS:

📊 Architettura Dati MAPS (presentazione interattiva disponibile nel repository)

La presentazione include:

  • Visualizzazione interattiva dei tre layer (Bronze/Silver/Gold)
  • Comparazione EAV vs schema tradizionale
  • Esempi concreti di trasformazioni per MAPS
  • Gestione fusioni comunali e temporalità

2.1.8 EAV vs Schema Tradizionale: Comparazione

Perché EAV per MAPS?

Il progetto MAPS gestisce ~200 dataset con attributi eterogenei e variabili nel tempo. Comparazione tra modelli:

Schema Tradizionale (Wide Table)

codice_istatpopolazionen_asilihas_ospedalen_scuoleato_gas_id...
0010014523012true??...
00100284202false??...

Problemi:

  • ❌ Colonne nulle per attributi non disponibili (storage sprecato)
  • ❌ Schema migration ad ogni nuovo dataset
  • ❌ Nessuna tracciabilità temporale
  • ❌ Difficile gestire multipli data sources per stesso attributo

Schema EAV (Entity-Attribute-Value)

entityattributevaluevalid_fromsource
001001popolazione452302021-01-01ISTAT
001001n_asili_nido122021-01-01ISTAT
001001has_ospedaletrue2015-01-01MinSalute
001002popolazione84202021-01-01ISTAT
001002cod_pre_fusione0010452017-01-01ISTAT

Vantaggi:

  • ✅ Flessibilità: Aggiungi nuovi attributi senza schema migration
  • ✅ Storage efficiente: Solo attributi presenti sono memorizzati
  • ✅ Temporalità: valid_from/valid_to per time-series
  • ✅ Lineage: source traccia provenienza dati
  • ✅ Multi-source: Stesso attributo da fonti diverse con timestamp

Trade-off:

  • ⚠️ Query più complesse (richiede CASE WHEN pivot in Gold)
  • ⚠️ Performance: Più righe da scannare (mitigato da indici)
  • ⚠️ Type inference: Valori TEXT, cast in Gold layer

Decisione per MAPS: EAV in Silver è il compromesso ottimale tra flessibilità e performance per gestire l'eterogeneità dei 200+ dataset.

2.2 Stack Tecnologico

2.2.1 Componenti Principali

ComponenteTecnologiaVersioneRuolo
Storage primarioPostgreSQL + PostGIS17 + 3.5Master data, operazioni spaziali
OrchestrazionePrefect3.xScheduling, monitoring, lineage ETL
Analytics layerDuckDB1.xQuery analytics (federation da PostgreSQL)
Governance internaOpenMetadata1.xCatalog interno, lineage, data quality, profiling
Catalogo open dataCKAN2.10+Catalogo pubblico open data, DCAT-AP_IT
Data qualityGreat Expectations1.xValidation, anomaly detection
PDF extractionDoclingLatestEstrazione tabelle da PDF (97.9% accuracy)
Object storageMinIO (o GCS)LatestRaw files, PDFs, archivi Excel
graph TB
    subgraph "Data Sources"
        A1[ISTAT CSV]
        A2[Min PDF]
        A3[HTML Tables]
    end

    subgraph "Orchestration"
        B[Prefect
Worker Pools] end subgraph "Storage Layers" C[Bronze
File System] D[Silver
PostgreSQL EAV] E[Gold
PostgreSQL + PostGIS] end subgraph "Internal Governance & Analytics" F[OpenMetadata
Internal Catalog] G[Great Expectations
Quality] H[DuckDB
Analytics] end subgraph "Public Access" I[CKAN
Open Data Catalog] J[dati.gov.it] end A1 --> B A2 --> B A3 --> B B --> C C --> D D --> E D --> G B -.metadata.-> F E --> F E --> H E --> I I --> J

2.2.2 Rationale delle Scelte Tecnologiche

PostgreSQL + PostGIS vs BigQuery/Cloud Data Warehouse

Scelta: PostgreSQL 17 + PostGIS 3.5 (self-hosted)

Motivazioni:

CriterioPostgreSQL + PostGISBigQueryDecisione
Scala datiOttimizzato 10⁴-10⁶ righeOttimizzato petabyte✅ PostgreSQL (scala MAPS: ~10⁴ comuni × ~10³ attributi)
Operazioni spazialiPostGIS = industry standardBigQuery GIS = limitato✅ PostgreSQL (isocrone, buffer, intersezioni native)
CostiPrevedibili (infra fissa)Per-query pricing✅ PostgreSQL (~€2.4k/anno vs €600-6k+/anno imprevedibili)
Vendor lock-inZeroAlto✅ PostgreSQL (principio indipendenza)
Maturità30+ anni~15 anni✅ PostgreSQL (affidabilità rock-solid)
IntegrazioneUniversaleEcosistema GCP✅ PostgreSQL (funziona con ogni tool)

Quando BigQuery avrebbe senso:

  • Volumi dati > 100GB per query
  • Centinaia di utenti concorrenti
  • Zero tolleranza ops (fully managed)
  • Budget per costi analytics €5k+/anno

Nessuna di queste condizioni vale per MAPS.

DuckDB vs BigQuery per Analytics

Scelta: DuckDB (embedded) + PostgreSQL federation

Motivazioni:

CriterioDuckDBBigQueryDecisione
DimensionamentoGB scaleTB/PB scale✅ DuckDB (volumi MAPS: ~50GB)
CostiZero€5/TB query✅ DuckDB (risparmio €600-1,200/anno)
Performancems su GBms su TB✅ DuckDB (query < 10ms sul volume MAPS)
FederationLegge da PostgreSQLRichiede export✅ DuckDB (no ETL aggiuntivo)
PortabilitàFile auto-contenutoVendor lock-in✅ DuckDB (export Parquet portabile)
Developer UXEmbedded PythonAPI REST✅ DuckDB (zero overhead setup)

Prefect vs Airflow/Dagster

Scelta: Prefect 3.x (self-hosted)

Motivazioni:

CriterioPrefectAirflowDagsterDecisione
Setup complexityBassoAltoMedio✅ Prefect
Learning curveGentileRipidaRipida✅ Prefect
Python-nativeCompletoParzialeCompleto✅ Prefect
Time-to-valueRapido (ore)Lento (giorni)Medio✅ Prefect
UI/UXModernaDatataModerna✅ Prefect/Dagster
Overhead opsBassoAltoMedio✅ Prefect

Dettagli: Vedere Appendice A del documento di architettura piattaforma per comparazione completa.

Docling vs Alternative PDF Extraction

Scelta: Docling (IBM TableFormer) con fallback PyMuPDF/pdfplumber

Benchmark accuracy estrazione tabelle:

LibreriaAccuracyLicenseActive DevPandas NativeDecisione
Docling (TableFormer AI)97.9%MIT✅ 2025 (LF AI)✅ Primary
pdfplumber85-90%MIT⚠️ (manual)✅ Fallback
PyMuPDF75-80%AGPL-3.0⚠️✅ Fallback
Camelot73%MIT⚠️ Maintenance
Tabula67.9%MIT⚠️ Maintenance
Azure Doc Intelligence~95%Commercial❌ (vendor lock-in)

Docling vantaggi:

  • +24-30 punti percentuali su Camelot/Tabula
  • MIT license vs AGPL PyMuPDF
  • Active development (LF AI & Data Foundation, 2025)
  • Pandas export nativo (integrazione Great Expectations)
  • Self-hosted (no cloud dependency)

Dettagli: Vedere documento docs/briefing/comparazione-librerie-estrazione-pdf.md nel repository per benchmark completo delle 9 librerie analizzate.

OpenMetadata vs DataHub/Atlan/Atlas

Scelta: OpenMetadata 1.x (self-hosted)

Motivazioni:

CriterioOpenMetadataDataHubAtlanAtlasDecisione
Costo annuale€1.2k (infra)€1.8-2.4k€12-30k (lic.)€3-5k✅ OpenMetadata
Setup1 VM2-3 VMs1 VM11+ VMs✅ OpenMetadata
UI/UXModernaFunzionaleEccellenteDatata✅ OpenMetadata
Stack fitPerfectBuonoBuonoHadoop-only✅ OpenMetadata
Vendor lock-inZeroZeroAltoZero✅ OpenMetadata
Time-to-value1-2 giorni2-4 giorniMedioSettimane✅ OpenMetadata

Dettagli: Vedere Appendice B del documento di architettura piattaforma per comparazione completa.

CKAN per Open Data vs Uso Esteso OpenMetadata

Scelta: CKAN 2.10+ (self-hosted) per catalogo pubblico + OpenMetadata per governance interna

Motivazioni architetturali:

CriterioCKANOpenMetadata estesoDecisione
Target audiencePubblico esternoData operators interni✅ CKAN (separation of concerns)
Standard DCAT-AP_ITNativoNon supportato✅ CKAN (requisito WP6 D6.3)
Integrazione dati.gov.itBuilt-in harvesterRichiede sviluppo custom✅ CKAN (API standard CSW/DCAT)
Portale user-friendlyWeb UI ottimizzata pubblicoUI tecnica data engineers✅ CKAN (esperienza utente)
Rate limiting & API keysNativoLimitato✅ CKAN (controllo accessi pubblici)
Dataset downloadsMulti-formato con previewSolo metadata✅ CKAN (full data access)
SEO e discoverabilityOttimizzato motori ricercaNon ottimizzato✅ CKAN (findability pubblica)
Licensing metadataCompleto (Creative Commons)Basic✅ CKAN (compliance open data)

Separazione delle responsabilità:

graph LR
    subgraph "Internal - Data Operators"
        A[Prefect Pipeline] --> B[OpenMetadata]
        B --> C[Lineage Tracking]
        B --> D[Data Quality Metrics]
        B --> E[Schema Profiling]
    end

    subgraph "Public - External Users"
        F[Gold Layer] --> G[CKAN]
        G --> H[DCAT-AP_IT Metadata]
        G --> I[dati.gov.it Harvester]
        G --> J[Public Downloads]
    end

    F -.selected datasets.-> A

Workflow integrato:

  1. Internal governance (OpenMetadata):

    • Prefect popola automaticamente metadata durante pipeline execution
    • Tracciamento lineage completo: Bronze → Silver → Gold
    • Data quality checks con Great Expectations
    • Schema profiling e anomaly detection
    • Audience: Data engineers, analysts, QA team
  2. Public catalog (CKAN):

    • Sync automatico da Gold layer (solo dataset approvati per pubblicazione)
    • Metadata arricchiti DCAT-AP_IT (licensing, temporal coverage, geographic extent)
    • API pubblica per download programmatico
    • Harvesting automatico verso dati.gov.it
    • Audience: Cittadini, ricercatori, sviluppatori terzi, policy makers

Conformità contrattuale WP6 D6.3:

"Catalogo open data e metadata DCAT-AP_IT [...] Integrazione dati.gov.it [...] Portal open data dedicato"

CKAN soddisfa tutti questi requisiti out-of-the-box, mentre estendere OpenMetadata richiederebbe sviluppo custom significativo (~2-3 mesi/persona) senza garanzie di conformità DCAT-AP_IT.

Costi operativi:

VoceOpenMetadata soloOpenMetadata + CKANDelta
Infra (VM)€1.2k/anno€1.8k/anno+€600/anno
Sviluppo custom€15-20k (DCAT-AP_IT)€0-€15-20k
ManutenzioneAlta (custom code)Bassa (standard stack)Significativo

Conclusione: CKAN è investimento minore (~€600/anno infra) che evita sviluppo custom (€15-20k) e garantisce compliance contrattuale WP6.

2.3 Architettura di Deployment

2.3.1 Deployment su Server op-linkurious

Infrastruttura:

  • Server: root@op-linkurious (8 CPU, 31GB RAM, 621GB disk)
  • Domains: *.maps.deppsviluppo.org (Route53 DNS)
  • Network: Traefik reverse proxy su gw network

Componenti containerizzati (Docker Compose):

yaml
services:
  postgres:
    image: postgis/postgis:17-3.5
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./init-scripts:/docker-entrypoint-initdb.d
    environment:
      - POSTGRES_DB=maps_db
      - POSTGRES_USER=maps
    networks:
      - gw
      - maps-internal

  prefect-server:
    image: prefecthq/prefect:3-latest
    command: prefect server start
    volumes:
      - prefect-data:/root/.prefect
    networks:
      - gw
      - maps-internal

  prefect-worker:
    image: prefecthq/prefect:3-latest
    command: prefect worker start --pool default-pool
    volumes:
      - ./flows:/flows
      - ./data/bronze:/data/bronze
    networks:
      - maps-internal

  openmetadata:
    image: openmetadata/server:latest
    depends_on:
      - postgres
    environment:
      - OPENMETADATA_CLUSTER_NAME=maps-cluster
    networks:
      - gw
      - maps-internal

  ckan:
    image: ckan/ckan-base:2.10
    depends_on:
      - postgres
      - redis
    environment:
      - CKAN_SITE_URL=https://opendata.maps.deppsviluppo.org
      - CKAN_SQLALCHEMY_URL=postgresql://ckan_user:${CKAN_DB_PASSWORD}@postgres/ckan_db
      - CKAN_REDIS_URL=redis://redis:6379/1
    volumes:
      - ckan-storage:/var/lib/ckan
    networks:
      - gw
      - maps-internal

  redis:
    image: redis:7-alpine
    networks:
      - maps-internal

volumes:
  postgres-data:
  prefect-data:
  ckan-storage:

networks:
  gw:
    external: true
  maps-internal:
    driver: bridge

2.3.2 Data Flow Completo

flowchart TD
    A[External Sources] --> B{Prefect Orchestration}

    B -->|Download| C[Bronze Layer
File System] C -->|Parse & Clean| D[Silver Layer
PostgreSQL EAV] D -->|Aggregate & Enrich| E[Gold Layer
PostgreSQL + PostGIS] E -->|Internal Catalog| F[OpenMetadata] E -->|Analytics| G[DuckDB] E -->|Serve| H[Web API] E -->|Public Catalog| I[CKAN] F -.->|Lineage| B F -.->|Quality Metrics| D I -->|Harvest| J[dati.gov.it] style C fill:#cd7f32,color:#000 style D fill:#c0c0c0,color:#000 style E fill:#ffd700,color:#000

2.3.3 Security Architecture

Network Isolation:

  • maps-internal: Comunicazione inter-servizi (PostgreSQL, Prefect workers)
  • gw: Traefik reverse proxy per accesso esterno HTTPS

Access Control:

sql
-- PostgreSQL roles
CREATE ROLE maps_writer;
GRANT INSERT, UPDATE ON ALL TABLES IN SCHEMA bronze TO maps_writer;
GRANT INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA silver TO maps_writer;
GRANT SELECT ON ALL TABLES IN SCHEMA gold TO maps_writer;

CREATE ROLE maps_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA gold TO maps_reader;

CREATE ROLE maps_api;
GRANT SELECT ON gold.comuni_aggregati TO maps_api;
GRANT SELECT ON gold.dls_attractors TO maps_api;

Secrets Management:

  • .env file con credenziali (.gitignore)
  • Prefect Secret blocks per API keys
  • PostgreSQL password rotation via pg_passfile

2.4 Data Lineage e Governance

2.4.1 Lineage Tracking con OpenMetadata

Flow completo esempio HTML source:

graph LR
    A[External HTML
MinAmbiente ATO Gas] --> B[Prefect Flow
download_html_page] B --> C[Bronze
ato_gas_page.html] C --> D[Prefect Task
parse_html_to_dataframe] D --> E[Silver
comuni_attributi_eav] E --> F[SQL Function
refresh_comuni_aggregati] F --> G[Gold
comuni_aggregati] G --> H[Metabase Dashboard
Servizi Utilities] style C fill:#cd7f32,color:#000 style E fill:#c0c0c0,color:#000 style G fill:#ffd700,color:#000

OpenMetadata registration:

python
# Register Bronze HTML as external table
tracker.register_table(
    schema="bronze",
    table="minambiente_ato_gas_html",
    columns=[{"name": "html_content", "dataType": "TEXT"}],
    owner="guglielmo.celata@gransassotech.org",
    description="Raw HTML page from MinAmbiente with ATO Gas table",
    tags=["minambiente", "ato_gas", "bronze", "html"]
)

# Add lineage
tracker.add_lineage(
    source_table="bronze.minambiente_ato_gas_html",
    target_table="silver.comuni_attributi_eav",
    description="Parse HTML table with BeautifulSoup, extract ATO data",
    pipeline="minambiente-ato-gas-flow"
)

2.4.2 Data Quality Metrics

Silver layer quality checks (Great Expectations):

python
def log_silver_quality_metrics(df: pd.DataFrame, fonte: str, dataset: str):
    """Log data quality to OpenMetadata dopo ingestion Silver"""

    metrics = {
        "row_count": len(df),
        "null_count": df.isnull().sum().sum(),
        "completeness": 1 - (df.isnull().sum().sum() / df.size),
        "duplicate_count": df.duplicated().sum(),
        "unique_comuni": df['codice_istat'].nunique(),
        "timestamp": datetime.utcnow().isoformat()
    }

    # Log to OpenMetadata
    tracker = get_metadata_tracker()
    tracker.log_data_quality(
        schema="silver",
        table="comuni_attributi_eav",
        metrics=metrics
    )

    # Alert se quality degraded
    if metrics["completeness"] < 0.95:
        send_alert(f"Data completeness low: {metrics['completeness']:.1%}")

2.5 Best Practices Implementate

2.5.1 Idempotenza

Requisito: Re-esecuzione flow non deve generare duplicati o inconsistenze.

Implementazione:

sql
-- Silver: UPSERT con ON CONFLICT
INSERT INTO silver.comuni_attributi_eav
    (codice_istat, attributo, valore, fonte, dataset, anno_rif, valid_from)
VALUES (%s, %s, %s, %s, %s, %s, CURRENT_DATE)
ON CONFLICT (codice_istat, attributo, fonte, dataset, anno_rif)
DO UPDATE SET
    valore = EXCLUDED.valore,
    valid_to = NULL,
    updated_at = CURRENT_TIMESTAMP;

2.5.2 Incremental Loading

Strategia: Silver usa temporal versioning, Gold usa snapshot refresh.

python
# Silver: Incremental append (new records only)
def load_to_silver_incremental(df, fonte, dataset, anno):
    existing_keys = get_existing_keys(fonte, dataset, anno)
    new_records = df[~df['codice_istat'].isin(existing_keys)]
    # Insert solo nuovi record

# Gold: Full refresh periodico (daily snapshot)
def refresh_gold():
    # Truncate + rebuild da Silver
    gold.refresh_comuni_aggregati()

2.5.3 Schema Evolution

Gestione backward compatibility:

sql
-- Aggiunta nuova colonna Gold (non-breaking)
ALTER TABLE gold.comuni_aggregati
ADD COLUMN num_biblioteche INTEGER DEFAULT 0;

-- Update da nuova fonte
UPDATE gold.comuni_aggregati c
SET num_biblioteche = (
    SELECT COUNT(*)
    FROM silver.comuni_attributi_eav e
    WHERE e.codice_istat = c.codice_istat
      AND e.fonte = 'mincultura'
      AND e.dataset = 'biblioteche'
);

-- Alert in OpenMetadata: schema changed

2.6 Metriche e Monitoraggio

2.6.1 KPIs Architettura Medallion

MetricaTargetStrumento
Bronze storage growth< 5GB/annoFile system monitoring
Silver ingestion latency< 5 min per flowPrefect execution logs
Silver data quality> 95% completenessGreat Expectations
Gold refresh time< 30 minPostgreSQL query logs
Gold query performance< 100ms p95pg_stat_statements
Lineage coverage100% flows trackedOpenMetadata API
HTML parsing success rate> 98%Prefect task success rate

2.6.2 Monitoring Dashboard

Query Metabase: Medallion health check

sql
SELECT
    'Bronze' AS layer,
    COUNT(*) AS num_files,
    SUM(file_size_bytes) / 1024 / 1024 / 1024 AS size_gb,
    COUNT(CASE WHEN status = 'failed' THEN 1 END) AS failed_ingestions
FROM bronze.ingestion_log
UNION ALL
SELECT
    'Silver',
    COUNT(DISTINCT (codice_istat, fonte, dataset)),
    pg_total_relation_size('silver.comuni_attributi_eav') / 1024 / 1024 / 1024,
    NULL
FROM silver.comuni_attributi_eav
UNION ALL
SELECT
    'Gold',
    COUNT(*),
    pg_total_relation_size('gold.comuni_aggregati') / 1024 / 1024 / 1024,
    NULL
FROM gold.comuni_aggregati;

Prossimo capitolo: Solution Design Cloud - Architettura deployment dettagliata, resource allocation, networking e security hardening.