The management API is exposed from DataLake.open(...).

from bagelquant_data import DataLake

lake = DataLake.open("data")

The facade exposes:

  • lake.sources
  • lake.datasets
  • lake.update
  • lake.query
  • lake.finance
  • lake.status

Source Management

Register a source adapter:

from bagelquant_data.sources.tushare import TushareSource

lake.sources.register(TushareSource(name="tushare"))

Configure a source:

lake.sources.configure("tushare", token="...")

The Tushare convenience method delegates to the generic configuration method:

lake.sources.configure_tushare(token="...")

List registered sources:

sources = lake.sources.list()

Get a registered adapter:

tushare = lake.sources.get("tushare")

Test a source connection:

lake.sources.test("tushare")

Remove a source registration:

lake.sources.remove("tushare")

Removing a source registration does not delete canonical data.

Dataset Management

Dataset behavior is declared by DatasetSpec objects or YAML files.

Add a spec object:

from bagelquant_data import DatasetSpec

spec = DatasetSpec(
    name="daily",
    source="custom",
    source_dataset="daily",
    category="market",
    field_mapping={"ts_code": "ts_code", "trade_date": "trade_date"},
    required_columns=("asset_id", "time"),
    primary_key=("asset_id", "time"),
    asset_column="ts_code",
    time_column="trade_date",
    partition_strategy="year_month",
    deduplication="primary_key_last",
    sort_columns=("time", "asset_id"),
)

lake.datasets.add(spec)

Add a YAML spec:

lake.datasets.add_from_yaml(
    "src/bagelquant_data/sources/tushare/datasets/daily.yaml"
)

Get a dataset:

spec = lake.datasets.get("daily", source="tushare")

List datasets:

all_datasets = lake.datasets.list()
tushare_datasets = lake.datasets.list("tushare")

Enable or disable a dataset:

lake.datasets.enable("daily", source="tushare")
lake.datasets.disable("daily", source="tushare")

Remove a dataset registration without deleting data:

lake.datasets.remove("daily", source="tushare")

Delete canonical data only with explicit confirmation:

lake.datasets.remove(
    "daily",
    source="tushare",
    delete_data=True,
    confirm=True,
)

Status And Inspection

Summary:

summary = lake.status.summary()

Dataset status:

status = lake.status.dataset("income", source="tushare")

Partition manifest:

partitions = lake.status.partitions("income", source="tushare")

Recent ingestion runs:

runs = lake.status.runs(limit=20)

Failed runs:

failures = lake.status.failures(dataset="income", source="tushare")

Files known to the manifest:

files = lake.status.files("income", source="tushare")

Normal status calls use SQLite manifest metadata. They are designed to be cheap and do not need to scan every Parquet file.

Canonical Record Inspection

Use lake.query.records(...) for human inspection:

records = lake.query.records(
    "income",
    source="tushare",
    limit=10,
)

This is not the main research extraction API. Use lake.query.field(...) for single-value panels.