API Reference

polars_hist_db

Top-level package. Enables Polars string cache on import.

  • enable_debug_mode() — enables debug mode for Polars dataframe display

Configuration (polars_hist_db.config)

Objects to access the configuration structure instantiated from YAML.

PolarsHistDbConfig

Root configuration object.

PolarsHistDbConfig.from_yaml(
    filename: str,
    table_configs_path: str = "table_configs",
    datasets_path: str = "datasets",
    db_config_path: str = "db",
) -> PolarsHistDbConfig
Attribute Type Description
tables TableConfigs Collection of table configurations
db_config DbEngineConfig Database connection settings
datasets DatasetsConfig Dataset definitions

DbEngineConfig

Database connection and engine settings.

Field Type Default Description
hostname str Database host
backend str "mariadb" Database backend
port int 3306 Connection port
username str Database user
password str Database password
ssl_config SslConfig None Optional SSL configuration
pool_size int 3 Connection pool size
max_overflow int 2 Max overflow connections
  • get_engine() -> Engine — returns a cached SQLAlchemy engine
  • dispose() — disposes the cached engine

TableConfig

Defines a single database table.

Field Type Default Description
name str Table name
schema str Database schema
columns List[TableColumnConfig] Column definitions
primary_keys List[str] Primary key columns
foreign_keys List[ForeignKeyConfig] [] Foreign key constraints
is_temporal bool False Whether table uses system versioning
forbid_drop_table bool False Prevent table drops
  • from_yaml(file_path: str) -> TableConfig
  • from_dataframe(df, table_schema, table_name, primary_keys, default_categorical_length) -> TableConfig
  • columns_df() -> pl.DataFrame
  • table_names() -> List[str]
  • dtypes() -> Mapping[str, pl.DataType]
  • build_sqlalchemy_columns(is_delta_table: bool) -> List[Column]
  • to_df() -> pl.DataFrame

TableColumnConfig

Defines a single column.

Field Type Default Description
table str Owning table name
name str Column name
data_type str SQL data type
default_value str None Default value
autoincrement bool False Auto-increment
nullable bool True Allow nulls
unique_constraint List[str] [] Unique constraints

TableConfigs

Collection of TableConfig objects.

  • __getitem__(name: str) -> TableConfig — lookup by table name
  • names() -> List[str]
  • schemas() -> List[str]
  • from_yamls(*file_path: str) -> TableConfigs

DatasetConfig

Defines a data source and its ingestion pipeline.

Field Type Default Description
name str Dataset name
delta_table_schema str Target schema
input_config InputConfig Input source configuration
pipeline Pipeline Ingestion pipeline definition
scrape_limit int -1 Max items to scrape (-1 = unlimited)
time_partition TimePartition None Optional time partitioning
null_values Sequence[str] None Strings treated as null
delta_config DeltaConfig Delta/upsert behaviour

DeltaConfig

Controls temporal upsert behaviour.

Field Type Default Description
drop_unchanged_rows bool False Skip rows that haven’t changed
on_duplicate_key Literal["error", "take_last", "take_first"] "error" Duplicate handling strategy
prefill_nulls_with_default bool False Fill nulls with defaults
row_finality Literal["disabled", "dropout", "manual"] "disabled" Handling of disappeared rows
is_temporary_table bool True Use temporary staging table

TransformFnRegistry

Singleton registry for column transformation functions.

  • register_function(name: str, fn: TransformFnSignature, allow_overwrite: bool = False)
  • delete_function(name: str)
  • call_function(name: str, df: pl.DataFrame, result_col: str, args: List[Any]) -> pl.DataFrame
  • list_functions() -> List[str]

Built-in transforms: null_if_gte, apply_type_casts, combine_columns, map_to_true, parse_date

TransformFnSignature = Callable[[pl.DataFrame, str, List[Any]], pl.DataFrame]

IngestFnRegistry

Singleton registry for ingestion functions.

  • register_function(name: str, fn: IngestFnSignature, allow_overwrite: bool = False)
  • delete_function(name: str)
  • call_function(payload: Any, ts: datetime, name: str, args: Dict[str, Any]) -> pl.DataFrame
  • list_functions() -> List[str]

IngestFnSignature = Callable[[Any, datetime, Dict[str, Any]], pl.DataFrame]

Input Sources

Input configuration is polymorphic via InputConfig.from_dict(config).

DsvCrawlerInputConfig — file-based ingestion via search paths.

JetStreamInputConfig — NATS JetStream ingestion with fields:

  • jetstream: JetStreamConfig
  • nats: NatsConfig
  • payload_ingest: JetstreamIngestConfig
  • run_until: Literal["empty", "forever"]

Core Operations (polars_hist_db.core)

DbOps

Low-level database operations.

DbOps(connection: Connection)
  • db_create(table_schema: str) — create schema if not exists
  • execute_sqlalchemy(description, statement, parameters=None, disable_foreign_key_checks=False, disable_keys=None) -> CursorResult
  • get_all_variables(filter: str = None) -> pl.DataFrame
  • get_system_versioning_time() -> datetime
  • set_system_versioning_time(t: datetime | None)
  • enable_engine_logging(level: int) — static

TableOps

Table-level metadata and operations.

TableOps(table_schema: str, table_name: str, connection: Connection)
  • enable_system_versioning(partition_interval: str = "1 YEAR")
  • get_table_metadata(autoload_metadata: bool = True) -> Table
  • row_count() -> int
  • get_column_intersection(column_selection: Sequence[str] | None) -> ColumnCollection
  • has_all_columns(search_col_names: Sequence[str]) -> bool
  • get_primary_keys(tbl, remap={}, include_temporal=False) -> Sequence[Column]
  • is_temporal_table() -> bool
  • table_exists() -> bool

DataframeOps

DataFrame ↔︎ database transfer operations.

DataframeOps(connection: Connection)
  • from_table(table_schema, table_name, time_hint=None) -> pl.DataFrame
  • from_selectable(query, schema_overrides=None) -> pl.DataFrame
  • from_raw_sql(query: str, schema_overrides=None) -> pl.DataFrame
  • table_create(table_schema, table_name, df, primary_keys, tbl_for_types=None, is_temporary_table=False)
  • table_insert(df, table_schema, table_name, uniqueness_col_set, prefill_nulls_with_default, clear_table_first=False) -> int
  • table_update(df, table_schema, table_name, primary_keys_override=None)
  • table_upsert_temporal(df, table_schema, table_name, delta_config, update_time=None, src_tgt_colname_map={})
  • table_query(table_schema, table_name, query_df, column_selection, time_hint=TimeHint(mode="none")) -> pl.DataFrame
  • table_delete_rows(df, table_schema, table_name) -> int
  • table_delete_rows_temporal(df, table_schema, table_name, update_time=None) -> int
  • fill_nulls_with_defaults(df, default_values) -> pl.DataFrame — static

DeltaTableOps

Manages staging (delta) tables for temporal upserts.

DeltaTableOps(table_schema: str, table_name: str, delta_config: DeltaConfig, connection: Connection)
  • table_config(column_definitions: List[TableColumnConfig]) -> TableConfig
  • upsert(target_table, update_time=None, is_main_table=True, source_columns=None, src_tgt_colname_map={}) -> Tuple[int, int, int]

TableConfigOps

Creates and introspects database tables from configuration.

TableConfigOps(connection: Connection)
  • create_all(tcs: TableConfigs)
  • create(table_config, column_selection=None, is_delta_table=False, is_temporary_table=False) -> Table
  • from_table(table_schema: str, table_name: str) -> TableConfig

TimeHint

Constructs temporal query hints for system-versioned tables.

Field Type Default Description
mode Literal["none", "all", "asof", "span"] "none" Query mode
all bool False Include all historical rows
asof_utc datetime None Point-in-time query
history_span timedelta None History window
  • build() -> str | None — build SQL time hint clause
  • apply(query: Select, tbl) -> Select — apply hint to a query

AuditOps

Tracks data source ingestion for deduplication.

AuditOps(schema: str)
  • create(connection) -> Table
  • drop(connection)
  • purge(target_table_name, connection) -> int
  • purge_after_timestamp(target_table_name, timestamp, interval, connection) -> int
  • prevalidate_new_items(target_table_name, new_data_source_items, connection)
  • filter_items(data_source_items, data_source_col_name, data_source_ts_col_name, target_table_name, connection) -> pl.DataFrame
  • get_latest_entry(connection, asof_timestamp=None) -> pl.DataFrame
  • record_entry(target_table_name, data_source, data_source_type, data_source_ts, connection)

AuditLogTracker

Singleton that tracks table update timestamps.

  • update_last_known_update(table_key: str, timestamp: datetime)
  • set_table_update_callback(cb: TableUpdateCallback)
  • clear_updates()
  • async check_for_updates(epoch_ms, schemas, connection)

Dataset (polars_hist_db.dataset)

async run_datasets(
    config: PolarsHistDbConfig,
    engine: Engine,
    dataset_name: str | None = None,
    debug_capture_output: List[Tuple[datetime, pl.DataFrame]] | None = None,
)

Runs the ingestion pipeline for all (or a named) dataset.

Data Loading (polars_hist_db.loaders)

load_typed_dsv

load_typed_dsv(
    file_or_bytes: Path | bytes,
    column_configs: Sequence[IngestionColumnConfig],
    schema_overrides: Mapping[str, pl.DataType] = {},
    delimiter: str | None = None,
    null_values: Sequence[str] | None = None,
) -> pl.DataFrame

Loads a delimiter-separated file with typed columns and transformation support.

find_files

find_files(search_paths: pl.DataFrame) -> pl.DataFrame

Searches directories for matching files based on search path configuration.

convert_zipped_csvs_to_parquet

convert_zipped_csvs_to_parquet(
    folder: str,
    schema: Mapping[str, pl.DataType],
    remove_original: bool,
) -> None

Converts zipped CSV files to Parquet format.

Type Converters (polars_hist_db.types)

PolarsType

Converts between SQL types and Polars DataType.

  • from_sql(sql_type: str) -> pl.DataType
  • from_table(tbl: Table) -> Mapping[str, str]
  • get_dataframe_schema_from_sqltext(sql_statement, connection) -> Dict[str, pl.DataType]
  • get_dataframe_schema_from_selectable(selectable) -> Dict[str, pl.DataType]
  • apply_dtype_to_column(df, col, target_type) -> pl.DataFrame
  • apply_schema_to_dataframe(df, **schema_overrides) -> pl.DataFrame
  • cast_str_to_cat(df, ignore_cols=None) -> pl.DataFrame

SQLType

Converts Polars types to SQL type strings.

  • from_polars(pl_dtype: pl.DataType, default_varchar_length: int = 255) -> str

SQLAlchemyType

Converts SQL type strings to SQLAlchemy types.

  • from_sql(sql_type: str) -> TypeEngine

Utilities (polars_hist_db.utils)

  • Clock — singleton for tracking operation timings and ETAs
  • compare_dataframes(lhs, rhs, on, cmp_cols=None, suffixes=("_lhs", "_rhs", "_diff")) — diff two dataframes
  • to_ipc_b64(df, compression=None) -> bytes — serialize a dataframe to base64 IPC
  • from_ipc_b64(payload, use_zlib=False) -> pl.DataFrame — deserialize from base64 IPC
  • recursive_flatten(df) -> pl.DataFrame — recursively flatten struct columns
  • NonRetryableException — exception that should not be retried

CLI Scripts

scrape-dataset

scrape-dataset --config <config.yaml> --dataset <name>

Runs the ingestion pipeline for a dataset.

drop-dataset-tables

drop-dataset-tables --config <config.yaml> --dataset <name>

Drops all tables associated with a dataset.