Integration Patterns¶
This page shows practical “fetch → load → model” patterns for common warehouse tools, including a small end-to-end example that joins fact rows to codelists.
Warehouse loading basics¶
sdmxflow produces files; it does not load into a warehouse directly.
Typical flow:
- Run
fetch()into a durableout_dir. - Load
<out_dir>/dataset.csvinto a staging table. - Load
<out_dir>/codelists/*.csvinto reference tables. - Model “latest version” using the
last_updatedcolumn.
See Output Artifacts (Contract) for file semantics.
Postgres (COPY)¶
-- Create a wide staging table that matches the CSV header.
-- (The provider columns vary by dataset, so define this based on the header.)
CREATE TABLE IF NOT EXISTS sdmx_stage_lfsa_egai2d (
last_updated timestamptz NOT NULL,
-- provider columns...
-- geo text,
-- sex text,
-- time_period text,
-- value numeric,
raw_line text
);
-- COPY requires the server to see the file. In many deployments you will use
-- \copy from psql (client-side) or a file ingestion service.
Client-side load (psql \copy):
psql "$DATABASE_URL" -c "\\copy sdmx_stage_lfsa_egai2d FROM '/data/lfsa_egai2d/dataset.csv' WITH (FORMAT csv, HEADER true)"
Note If you re-load the full append-only file each run, your staging table will accumulate duplicates. Consider loading incrementally by filtering new
last_updatedvalues (or truncate+reload staging, then build downstream models for “latest”).
DuckDB (read_csv_auto)¶
DuckDB is convenient for local validation and “lightweight warehouse” pipelines:
from pathlib import Path
import duckdb
out_dir = Path("./out/lfsa_egai2d")
con = duckdb.connect()
con.execute(
"""
CREATE OR REPLACE TABLE facts AS
SELECT *
FROM read_csv_auto(?, header=true)
""",
[str(out_dir / "dataset.csv")],
)
# Example: select the newest upstream version present in the file.
latest = con.execute("SELECT max(last_updated) FROM facts").fetchone()[0]
print("latest last_updated:", latest)
BigQuery / Snowflake / Redshift (generic pattern)¶
For managed warehouses:
- stage the file in object storage (GCS/S3/Azure Blob)
- load into a staging table
- either:
- model latest version in SQL (
QUALIFY/window function), or - snapshot by
last_updated
The key operational decision is whether your staging table is:
- truncate+reload (simple, robust), or
- incremental append (more efficient; requires tracking
last_updatedvalues already loaded)
dbt pattern (staging + latest view)¶
A common dbt approach:
- define
dataset.csvas an external source (or load it to a raw table) - create a staging model that picks the latest version
Pseudo-SQL for “latest”:
WITH ranked AS (
SELECT
*,
row_number() OVER (
PARTITION BY /* your natural key columns here */
ORDER BY last_updated DESC
) AS rn
FROM {{ source('sdmx', 'facts') }}
)
SELECT *
FROM ranked
WHERE rn = 1
End-to-end example: fetch → join to a codelist¶
This example is intentionally generic: it demonstrates the mechanics without assuming a specific dataset schema.
from __future__ import annotations
from pathlib import Path
import duckdb
from sdmxflow import SdmxDataset
out_dir = Path("./out/example")
# 1) Fetch artifacts.
ds = SdmxDataset(out_dir=out_dir, source_id="ESTAT", dataset_id="lfsa_egai2d")
ds.fetch()
# 2) Load facts.
con = duckdb.connect()
con.execute(
"CREATE OR REPLACE TABLE facts AS SELECT * FROM read_csv_auto(?, header=true)",
[str(out_dir / "dataset.csv")],
)
# 3) Load one codelist CSV (pick one that exists).
# In a real pipeline you would iterate `out_dir/codelists/*.csv`.
codelist_path = next((out_dir / "codelists").glob("*.csv"))
con.execute(
"CREATE OR REPLACE TABLE cl AS SELECT * FROM read_csv_auto(?, header=true)",
[str(codelist_path)],
)
print("codelist file:", codelist_path.name)
print(con.execute("SELECT * FROM cl LIMIT 5").fetchall())
# 4) Join strategy: join facts.<dimension_column> = cl.code
# The exact dimension column depends on your dataset and the codelist.
To make joins deterministic in production, use the column → codelist mapping stored in metadata.json (see Output Artifacts (Contract)).
Next:
- See Scheduling & Deployment for orchestration patterns.
- See Provider Support for provider semantics and differences.