API Reference
polars_hist_db
Top-level package. Enables Polars string cache on import.
enable_debug_mode()— enables debug mode for Polars dataframe display
Configuration (polars_hist_db.config)
Objects to access the configuration structure instantiated from YAML.
PolarsHistDbConfig
Root configuration object.
PolarsHistDbConfig.from_yaml(
filename: str,
table_configs_path: str = "table_configs",
datasets_path: str = "datasets",
db_config_path: str = "db",
) -> PolarsHistDbConfig| Attribute | Type | Description |
|---|---|---|
tables |
TableConfigs |
Collection of table configurations |
db_config |
DbEngineConfig |
Database connection settings |
datasets |
DatasetsConfig |
Dataset definitions |
DbEngineConfig
Database connection and engine settings.
| Field | Type | Default | Description |
|---|---|---|---|
hostname |
str |
— | Database host |
backend |
str |
"mariadb" |
Database backend |
port |
int |
3306 |
Connection port |
username |
str |
— | Database user |
password |
str |
— | Database password |
ssl_config |
SslConfig |
None |
Optional SSL configuration |
pool_size |
int |
3 |
Connection pool size |
max_overflow |
int |
2 |
Max overflow connections |
get_engine() -> Engine— returns a cached SQLAlchemy enginedispose()— disposes the cached engine
TableConfig
Defines a single database table.
| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Table name |
schema |
str |
— | Database schema |
columns |
List[TableColumnConfig] |
— | Column definitions |
primary_keys |
List[str] |
— | Primary key columns |
foreign_keys |
List[ForeignKeyConfig] |
[] |
Foreign key constraints |
is_temporal |
bool |
False |
Whether table uses system versioning |
forbid_drop_table |
bool |
False |
Prevent table drops |
from_yaml(file_path: str) -> TableConfigfrom_dataframe(df, table_schema, table_name, primary_keys, default_categorical_length) -> TableConfigcolumns_df() -> pl.DataFrametable_names() -> List[str]dtypes() -> Mapping[str, pl.DataType]build_sqlalchemy_columns(is_delta_table: bool) -> List[Column]to_df() -> pl.DataFrame
TableColumnConfig
Defines a single column.
| Field | Type | Default | Description |
|---|---|---|---|
table |
str |
— | Owning table name |
name |
str |
— | Column name |
data_type |
str |
— | SQL data type |
default_value |
str |
None |
Default value |
autoincrement |
bool |
False |
Auto-increment |
nullable |
bool |
True |
Allow nulls |
unique_constraint |
List[str] |
[] |
Unique constraints |
TableConfigs
Collection of TableConfig objects.
__getitem__(name: str) -> TableConfig— lookup by table namenames() -> List[str]schemas() -> List[str]from_yamls(*file_path: str) -> TableConfigs
DatasetConfig
Defines a data source and its ingestion pipeline.
| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Dataset name |
delta_table_schema |
str |
— | Target schema |
input_config |
InputConfig |
— | Input source configuration |
pipeline |
Pipeline |
— | Ingestion pipeline definition |
scrape_limit |
int |
-1 |
Max items to scrape (-1 = unlimited) |
time_partition |
TimePartition |
None |
Optional time partitioning |
null_values |
Sequence[str] |
None |
Strings treated as null |
delta_config |
DeltaConfig |
— | Delta/upsert behaviour |
DeltaConfig
Controls temporal upsert behaviour.
| Field | Type | Default | Description |
|---|---|---|---|
drop_unchanged_rows |
bool |
False |
Skip rows that haven’t changed |
on_duplicate_key |
Literal["error", "take_last", "take_first"] |
"error" |
Duplicate handling strategy |
prefill_nulls_with_default |
bool |
False |
Fill nulls with defaults |
row_finality |
Literal["disabled", "dropout", "manual"] |
"disabled" |
Handling of disappeared rows |
is_temporary_table |
bool |
True |
Use temporary staging table |
TransformFnRegistry
Singleton registry for column transformation functions.
register_function(name: str, fn: TransformFnSignature, allow_overwrite: bool = False)delete_function(name: str)call_function(name: str, df: pl.DataFrame, result_col: str, args: List[Any]) -> pl.DataFramelist_functions() -> List[str]
Built-in transforms: null_if_gte, apply_type_casts, combine_columns, map_to_true, parse_date
TransformFnSignature = Callable[[pl.DataFrame, str, List[Any]], pl.DataFrame]
IngestFnRegistry
Singleton registry for ingestion functions.
register_function(name: str, fn: IngestFnSignature, allow_overwrite: bool = False)delete_function(name: str)call_function(payload: Any, ts: datetime, name: str, args: Dict[str, Any]) -> pl.DataFramelist_functions() -> List[str]
IngestFnSignature = Callable[[Any, datetime, Dict[str, Any]], pl.DataFrame]
Input Sources
Input configuration is polymorphic via InputConfig.from_dict(config).
DsvCrawlerInputConfig — file-based ingestion via search paths.
JetStreamInputConfig — NATS JetStream ingestion with fields:
jetstream: JetStreamConfignats: NatsConfigpayload_ingest: JetstreamIngestConfigrun_until: Literal["empty", "forever"]
Core Operations (polars_hist_db.core)
DbOps
Low-level database operations.
DbOps(connection: Connection)db_create(table_schema: str)— create schema if not existsexecute_sqlalchemy(description, statement, parameters=None, disable_foreign_key_checks=False, disable_keys=None) -> CursorResultget_all_variables(filter: str = None) -> pl.DataFrameget_system_versioning_time() -> datetimeset_system_versioning_time(t: datetime | None)enable_engine_logging(level: int)— static
TableOps
Table-level metadata and operations.
TableOps(table_schema: str, table_name: str, connection: Connection)enable_system_versioning(partition_interval: str = "1 YEAR")get_table_metadata(autoload_metadata: bool = True) -> Tablerow_count() -> intget_column_intersection(column_selection: Sequence[str] | None) -> ColumnCollectionhas_all_columns(search_col_names: Sequence[str]) -> boolget_primary_keys(tbl, remap={}, include_temporal=False) -> Sequence[Column]is_temporal_table() -> booltable_exists() -> bool
DataframeOps
DataFrame ↔︎ database transfer operations.
DataframeOps(connection: Connection)from_table(table_schema, table_name, time_hint=None) -> pl.DataFramefrom_selectable(query, schema_overrides=None) -> pl.DataFramefrom_raw_sql(query: str, schema_overrides=None) -> pl.DataFrametable_create(table_schema, table_name, df, primary_keys, tbl_for_types=None, is_temporary_table=False)table_insert(df, table_schema, table_name, uniqueness_col_set, prefill_nulls_with_default, clear_table_first=False) -> inttable_update(df, table_schema, table_name, primary_keys_override=None)table_upsert_temporal(df, table_schema, table_name, delta_config, update_time=None, src_tgt_colname_map={})table_query(table_schema, table_name, query_df, column_selection, time_hint=TimeHint(mode="none")) -> pl.DataFrametable_delete_rows(df, table_schema, table_name) -> inttable_delete_rows_temporal(df, table_schema, table_name, update_time=None) -> intfill_nulls_with_defaults(df, default_values) -> pl.DataFrame— static
DeltaTableOps
Manages staging (delta) tables for temporal upserts.
DeltaTableOps(table_schema: str, table_name: str, delta_config: DeltaConfig, connection: Connection)table_config(column_definitions: List[TableColumnConfig]) -> TableConfigupsert(target_table, update_time=None, is_main_table=True, source_columns=None, src_tgt_colname_map={}) -> Tuple[int, int, int]
TableConfigOps
Creates and introspects database tables from configuration.
TableConfigOps(connection: Connection)create_all(tcs: TableConfigs)create(table_config, column_selection=None, is_delta_table=False, is_temporary_table=False) -> Tablefrom_table(table_schema: str, table_name: str) -> TableConfig
TimeHint
Constructs temporal query hints for system-versioned tables.
| Field | Type | Default | Description |
|---|---|---|---|
mode |
Literal["none", "all", "asof", "span"] |
"none" |
Query mode |
all |
bool |
False |
Include all historical rows |
asof_utc |
datetime |
None |
Point-in-time query |
history_span |
timedelta |
None |
History window |
build() -> str | None— build SQL time hint clauseapply(query: Select, tbl) -> Select— apply hint to a query
AuditOps
Tracks data source ingestion for deduplication.
AuditOps(schema: str)create(connection) -> Tabledrop(connection)purge(target_table_name, connection) -> intpurge_after_timestamp(target_table_name, timestamp, interval, connection) -> intprevalidate_new_items(target_table_name, new_data_source_items, connection)filter_items(data_source_items, data_source_col_name, data_source_ts_col_name, target_table_name, connection) -> pl.DataFrameget_latest_entry(connection, asof_timestamp=None) -> pl.DataFramerecord_entry(target_table_name, data_source, data_source_type, data_source_ts, connection)
AuditLogTracker
Singleton that tracks table update timestamps.
update_last_known_update(table_key: str, timestamp: datetime)set_table_update_callback(cb: TableUpdateCallback)clear_updates()async check_for_updates(epoch_ms, schemas, connection)
Dataset (polars_hist_db.dataset)
async run_datasets(
config: PolarsHistDbConfig,
engine: Engine,
dataset_name: str | None = None,
debug_capture_output: List[Tuple[datetime, pl.DataFrame]] | None = None,
)Runs the ingestion pipeline for all (or a named) dataset.
Data Loading (polars_hist_db.loaders)
load_typed_dsv
load_typed_dsv(
file_or_bytes: Path | bytes,
column_configs: Sequence[IngestionColumnConfig],
schema_overrides: Mapping[str, pl.DataType] = {},
delimiter: str | None = None,
null_values: Sequence[str] | None = None,
) -> pl.DataFrameLoads a delimiter-separated file with typed columns and transformation support.
find_files
find_files(search_paths: pl.DataFrame) -> pl.DataFrameSearches directories for matching files based on search path configuration.
convert_zipped_csvs_to_parquet
convert_zipped_csvs_to_parquet(
folder: str,
schema: Mapping[str, pl.DataType],
remove_original: bool,
) -> NoneConverts zipped CSV files to Parquet format.
Type Converters (polars_hist_db.types)
PolarsType
Converts between SQL types and Polars DataType.
from_sql(sql_type: str) -> pl.DataTypefrom_table(tbl: Table) -> Mapping[str, str]get_dataframe_schema_from_sqltext(sql_statement, connection) -> Dict[str, pl.DataType]get_dataframe_schema_from_selectable(selectable) -> Dict[str, pl.DataType]apply_dtype_to_column(df, col, target_type) -> pl.DataFrameapply_schema_to_dataframe(df, **schema_overrides) -> pl.DataFramecast_str_to_cat(df, ignore_cols=None) -> pl.DataFrame
SQLType
Converts Polars types to SQL type strings.
from_polars(pl_dtype: pl.DataType, default_varchar_length: int = 255) -> str
SQLAlchemyType
Converts SQL type strings to SQLAlchemy types.
from_sql(sql_type: str) -> TypeEngine
Utilities (polars_hist_db.utils)
Clock— singleton for tracking operation timings and ETAscompare_dataframes(lhs, rhs, on, cmp_cols=None, suffixes=("_lhs", "_rhs", "_diff"))— diff two dataframesto_ipc_b64(df, compression=None) -> bytes— serialize a dataframe to base64 IPCfrom_ipc_b64(payload, use_zlib=False) -> pl.DataFrame— deserialize from base64 IPCrecursive_flatten(df) -> pl.DataFrame— recursively flatten struct columnsNonRetryableException— exception that should not be retried
CLI Scripts
scrape-dataset
scrape-dataset --config <config.yaml> --dataset <name>Runs the ingestion pipeline for a dataset.
drop-dataset-tables
drop-dataset-tables --config <config.yaml> --dataset <name>Drops all tables associated with a dataset.