Python API¶
DBPort is the single public import and entry point for all warehouse operations.
Quick reference¶
| Method | Purpose |
|---|---|
DBPort(agency, dataset_id, ...) |
Create a client instance with credentials and paths |
port.schema(ddl_or_path) |
Declare the output table schema |
port.load(table, filters, version) |
Load an Iceberg table into DuckDB |
port.configure_input(table, filters, version) |
Persist an input declaration without loading data |
port.columns.<name>.meta(...) |
Override codelist metadata for a column |
port.columns.<name>.attach(table) |
Use a DuckDB table as codelist source |
port.execute(sql_or_path) |
Run SQL in DuckDB |
port.run(version, mode) |
Execute the configured run hook |
port.publish(version, params, mode) |
Write output to the Iceberg warehouse |
port.close() |
Release resources |
Constructor¶
DBPort(
agency: str,
dataset_id: str,
*,
catalog_uri: str | None = None,
catalog_token: str | None = None,
warehouse: str | None = None,
s3_endpoint: str | None = None,
s3_access_key: str | None = None,
s3_secret_key: str | None = None,
duckdb_path: str | None = None,
lock_path: str | None = None,
model_root: str | None = None,
load_inputs_on_init: bool = True,
config_only: bool = False,
)
Parameters¶
| Parameter | Default | Description |
|---|---|---|
agency |
(required) | Agency namespace (e.g. "wifor", "estat") |
dataset_id |
(required) | Dataset identifier (e.g. "emp__regional_trends") |
catalog_uri |
None |
Iceberg REST catalog URL. Falls back to ICEBERG_REST_URI |
catalog_token |
None |
Bearer token for catalog. Falls back to ICEBERG_CATALOG_TOKEN |
warehouse |
None |
Warehouse name. Falls back to ICEBERG_WAREHOUSE |
s3_endpoint |
None |
S3-compatible endpoint. Falls back to S3_ENDPOINT |
s3_access_key |
None |
S3 access key. Falls back to AWS_ACCESS_KEY_ID |
s3_secret_key |
None |
S3 secret key. Falls back to AWS_SECRET_ACCESS_KEY |
duckdb_path |
None |
Path to the DuckDB file. Default: <model_root>/data/<dataset_id>.duckdb |
lock_path |
None |
Path to dbport.lock. Default: repo root (next to pyproject.toml) |
model_root |
None |
Model directory for resolving SQL file paths and the default DuckDB location. Default: auto-detected from the calling script's directory |
load_inputs_on_init |
True |
When True, inputs previously declared in dbport.lock are reloaded into DuckDB on startup. Set to False to skip automatic input loading |
config_only |
False |
Lightweight mode — see Full mode vs. config_only below |
Context manager (recommended)¶
with DBPort(agency="wifor", dataset_id="emp__regional_trends") as port:
...
# port.close() is called automatically
Initialization behavior¶
Creating a DBPort instance runs through four phases:
-
Path resolution — discovers the model root directory (from
model_rootkwarg or auto-detected from the calling script), walks up topyproject.tomlto find the repo root, and derives the lock path and DuckDB path. -
Credential resolution — merges explicit constructor kwargs with environment variables via
WarehouseCreds. Explicit kwargs always take precedence. -
Adapter wiring — creates the lock adapter (reads/initializes
dbport.lock), opens DuckDB, connects to the Iceberg REST catalog, and sets up the in-memory metadata builder. -
State sync — runs four operations, all resilient to errors (logged but never fail initialization):
Step What it does On error Auto-detect schema If no user-declared schema exists in the lock, checks the warehouse for an existing table and imports its schema Logged at debug level; skipped Sync output table Creates the output table in DuckDB from the lock file schema (skipped if the table already exists) Logged at warning level; skipped Update last_fetched_atWrites a timestamp to the warehouse table properties (no new snapshot) Logged at debug level; skipped Reload inputs Reloads all inputs declared in dbport.lockinto DuckDB (only whenload_inputs_on_init=True)Per-input errors logged; other inputs still loaded
Full mode vs. config_only¶
| Aspect | Full mode (default) | config_only=True |
|---|---|---|
| Credentials | Required (kwargs or env vars) | Not needed |
| DuckDB | Opened; data/ directory created |
Not opened; no directory created |
| Catalog connection | Established | Not established |
| Lock file | Read/initialized | Read/initialized |
| State sync | All four phases run | All skipped |
columns.meta() / columns.attach() |
Works | Works |
schema(), load(), execute(), run(), configure_input(), publish() |
Works | Raises RuntimeError |
close() |
Releases DuckDB | No-op |
Use config_only=True when you need to manipulate column metadata or lock file state without warehouse access:
with DBPort(agency="wifor", dataset_id="emp__regional_trends", config_only=True) as port:
port.columns.nuts2024.meta(codelist_id="NUTS2024", codelist_kind="hierarchical")
port.columns.nuts2024.attach(table="wifor.cl_nuts2024")
Methods¶
Methods are listed in typical workflow order: declare schema, load inputs, configure columns, transform, run, publish.
schema()¶
Declares the output table schema from a DDL string or .sql file path.
Parameters:
ddl_or_path(str) — ACREATE TABLEDDL string, or a path to a.sqlfile (resolved relative tomodel_root).
Returns: None
Raises:
ValueError— Invalid DDL string.SchemaDriftError— Local DDL is incompatible with the existing warehouse table.
Examples:
# From a .sql file
port.schema("sql/create_output.sql")
# Inline DDL
port.schema("""
CREATE OR REPLACE TABLE wifor.emp__regional_trends (
freq VARCHAR,
year DATE,
nuts2024 VARCHAR,
value DOUBLE
)
""")
The table is created in DuckDB and the schema (DDL + column list) is persisted to dbport.lock. A default codelist entry is initialized for every column.
Call port.schema() once, early in the script. Re-running the same DDL is idempotent.
load()¶
port.load(
table_address: str,
*,
filters: dict[str, str] | None = None,
version: str | None = None,
) -> IngestRecord
Loads an Iceberg table from the warehouse into DuckDB under its exact original address.
Parameters:
table_address(str) — Fully-qualified table address (e.g."estat.nama_10r_3empers").filters(dict[str, str] | None) — Optional equality filters pushed to the Iceberg scan (e.g.{"wstatus": "EMP"}). Default:None(loads all rows).version(str | None) — Pin to a specific dataset version (e.g."2025-01-01"). Default:None(latest version). Ignored for tables without DBPort metadata.
Returns: IngestRecord — a frozen record of the completed ingest, including the snapshot ID, timestamp, row count, and any filters applied.
Raises:
RuntimeError— Called inconfig_onlymode.
Examples:
port.load("estat.nama_10r_3empers", filters={"wstatus": "EMP", "nace_r2": "TOTAL"})
port.load("wifor.cl_nuts2024")
# Pin to a specific version
port.load("wifor.emp__regional_trends", version="2025-01-01")
Snapshot-based caching: if the table's snapshot has not changed and the DuckDB relation already exists, the load is skipped automatically.
No row cap: load() always fetches the full table. Use filters to scope data.
configure_input()¶
port.configure_input(
table_address: str,
*,
filters: dict[str, str] | None = None,
version: str | None = None,
) -> IngestRecord
Validates and persists an input declaration to dbport.lock without loading data.
Parameters: Same as load().
Returns: IngestRecord — the persisted declaration.
Raises:
RuntimeError— Called inconfig_onlymode.
Example:
This is the configuration-only counterpart of load():
| Method | Validates | Persists to lock | Loads data into DuckDB |
|---|---|---|---|
load() |
Yes | Yes | Yes |
configure_input() |
Yes | Yes | No |
Use configure_input() when you want to declare inputs for the lock file (e.g. during project setup) without requiring warehouse connectivity for the actual data load.
columns¶
Attribute-style access to per-column metadata. Changes are persisted to dbport.lock immediately.
.meta(...) — override codelist metadata¶
port.columns.<name>.meta(
codelist_id: str | None = None,
codelist_kind: str | None = None,
codelist_type: str | None = None,
codelist_labels: dict[str, str] | None = None,
) -> ColumnConfig
Parameters:
codelist_id(str | None) — Identifier for the codelist. Default: column name.codelist_kind(str | None) —"flat"or"hierarchical". Default: inferred from SQL type.codelist_type(str | None) — Value type hint. Default: inferred from SQL type.codelist_labels(dict[str, str] | None) — Human-readable labels per language. Default:None.
Returns: ColumnConfig (self, for chaining).
Example:
port.columns.nuts2024.meta(
codelist_id="NUTS2024",
codelist_kind="hierarchical",
codelist_labels={"en": "NUTS 2024 Regions"},
)
.attach(table=...) — use a DuckDB table as codelist source¶
Parameters:
table(str) — Address of a DuckDB table to use as the codelist source. Should already be loaded viaport.load().
Returns: ColumnConfig (self, for chaining).
Example:
On publish(), the full table is exported as the codelist for this column.
execute()¶
Runs a SQL statement or a .sql file in DuckDB.
Parameters:
sql_or_path(str) — Inline SQL string, or a path to a.sqlfile (resolved relative tomodel_root).
Returns: None
Raises:
FileNotFoundError—.sqlfile path does not exist.RuntimeError— Called inconfig_onlymode.
Examples:
port.execute("sql/staging.sql")
port.execute("CREATE TABLE staging.ranked AS SELECT ...")
# Dynamic SQL
sql = template.format(level=2, parent_table="staging.bench_lvl1")
port.execute(sql)
run()¶
Executes the configured run hook, optionally publishing afterward.
Parameters:
version(str | None) — When provided,publish(version=version, mode=mode)is called automatically after the hook completes. Default:None(no publish).mode(str | None) — Publish mode, forwarded topublish(). Only used whenversionis set. Seepublish()for valid values.
Returns: None
Raises:
ValueError— Hook file has an unsupported extension (not.pyor.sql).RuntimeError— Called inconfig_onlymode.
Examples:
# Run the hook without publishing
port.run()
# Run the hook and publish
port.run(version="2026-03-09", mode="dry")
Hook resolution¶
The run hook is resolved in this order:
- Explicit hook path set in
dbport.lock(via CLIdbp config run-hook) main.pyin the model root (if it exists)sql/main.sqlin the model root (if it exists)- Falls back to
main.py(may not exist — will error on execution)
See Hooks & Execution for the full execution model, trust boundary, and exec vs run vs publish semantics.
Hook dispatch by file extension¶
| Extension | Behavior |
|---|---|
.sql |
Executed via port.execute() |
.py |
Loaded as a Python module with port available in scope. If the module defines a top-level run(port) function, it is called after the module is loaded |
| Other | Raises ValueError |
port.run_hook property¶
Read-only property that returns the resolved hook path as a string, or None if no hook can be found. Returns the path that run() would execute, without actually running it.
publish()¶
port.publish(
*,
version: str,
params: dict[str, str] | None = None,
mode: str | None = None,
) -> None
Writes the output table from DuckDB to the Iceberg warehouse.
Parameters:
version(str) — Version label for this publish (e.g."2026-03-09"). Required.params(dict[str, str] | None) — Key-value parameters describing this version (e.g.{"wstatus": "EMP"}). Default:None.mode(str | None) — Publish mode. Default:None.
mode value |
Behavior |
|---|---|
None (default) |
Idempotent — skip if version already completed; resume from checkpoint if interrupted |
"dry" |
Schema validation only — no data written |
"refresh" |
Overwrite existing version unconditionally |
Returns: None
Raises:
RuntimeError—port.schema()has not been called; or called inconfig_onlymode.SchemaDriftError— Local schema incompatible with warehouse.
Pre-publish checks (in order):
- Schema defined — raises
RuntimeErrorifport.schema()has not been called - Version idempotency — if version already completed, returns immediately (skipped in
refreshmode) - Schema drift — compares local vs warehouse schema; raises
SchemaDriftErrorwith a diff if incompatible
Example:
On success:
- Data written to
<agency>.<dataset_id>in Iceberg - Codelists auto-generated per column
metadata.jsonmaterialized and embedded in table propertiesVersionRecordappended todbport.lockcreated_atset on first publish;last_updated_atupdated every time
close()¶
Closes the DuckDB connection. Called automatically when using the context manager. In config_only mode, this is a no-op.
Parameters: None
Returns: None
Complete example¶
from dbport import DBPort
with DBPort(agency="wifor", dataset_id="emp__regional_trends") as port:
# 1. Declare output schema
port.schema("sql/create_output.sql")
# 2. Column metadata
port.columns.nuts2024.meta(codelist_id="NUTS2024", codelist_kind="hierarchical")
port.columns.nuts2024.attach(table="wifor.cl_nuts2024")
# 3. Load inputs from warehouse
port.load("estat.nama_10r_3empers", filters={"wstatus": "EMP"})
port.load("wifor.cl_nuts2024")
# 4. Run SQL transforms
port.execute("sql/staging.sql")
port.execute("sql/final_output.sql")
# 5. Publish to warehouse
port.publish(version="2026-03-09", params={"wstatus": "EMP"})
See also: Python workflow example, CLI workflow example.
Errors¶
| Exception | When raised |
|---|---|
ValidationError |
Missing required credentials |
ValueError |
Invalid DDL string passed to schema(); unsupported hook file extension |
RuntimeError |
publish() called before schema(); data method called in config_only mode; DuckDB extension unavailable |
SchemaDriftError |
Local schema incompatible with warehouse (raised by both schema() and publish()) |
FileNotFoundError |
.sql file path does not exist |