The management API is exposed from DataLake.open(...).
from bagelquant_data import DataLake
lake = DataLake.open("data")
The facade exposes:
lake.sourceslake.datasetslake.updatelake.querylake.financelake.status
Source Management
Register a source adapter:
from bagelquant_data.sources.tushare import TushareSource
lake.sources.register(TushareSource(name="tushare"))
Configure a source:
lake.sources.configure("tushare", token="...")
The Tushare convenience method delegates to the generic configuration method:
lake.sources.configure_tushare(token="...")
List registered sources:
sources = lake.sources.list()
Get a registered adapter:
tushare = lake.sources.get("tushare")
Test a source connection:
lake.sources.test("tushare")
Remove a source registration:
lake.sources.remove("tushare")
Removing a source registration does not delete canonical data.
Dataset Management
Dataset behavior is declared by DatasetSpec objects or YAML files.
Add a spec object:
from bagelquant_data import DatasetSpec
spec = DatasetSpec(
name="daily",
source="custom",
source_dataset="daily",
category="market",
field_mapping={"ts_code": "ts_code", "trade_date": "trade_date"},
required_columns=("asset_id", "time"),
primary_key=("asset_id", "time"),
asset_column="ts_code",
time_column="trade_date",
partition_strategy="year_month",
deduplication="primary_key_last",
sort_columns=("time", "asset_id"),
)
lake.datasets.add(spec)
Add a YAML spec:
lake.datasets.add_from_yaml(
"src/bagelquant_data/sources/tushare/datasets/daily.yaml"
)
Get a dataset:
spec = lake.datasets.get("daily", source="tushare")
List datasets:
all_datasets = lake.datasets.list()
tushare_datasets = lake.datasets.list("tushare")
Enable or disable a dataset:
lake.datasets.enable("daily", source="tushare")
lake.datasets.disable("daily", source="tushare")
Remove a dataset registration without deleting data:
lake.datasets.remove("daily", source="tushare")
Delete canonical data only with explicit confirmation:
lake.datasets.remove(
"daily",
source="tushare",
delete_data=True,
confirm=True,
)
Status And Inspection
Summary:
summary = lake.status.summary()
Dataset status:
status = lake.status.dataset("income", source="tushare")
Partition manifest:
partitions = lake.status.partitions("income", source="tushare")
Recent ingestion runs:
runs = lake.status.runs(limit=20)
Failed runs:
failures = lake.status.failures(dataset="income", source="tushare")
Files known to the manifest:
files = lake.status.files("income", source="tushare")
Normal status calls use SQLite manifest metadata. They are designed to be cheap and do not need to scan every Parquet file.
Canonical Record Inspection
Use lake.query.records(...) for human inspection:
records = lake.query.records(
"income",
source="tushare",
limit=10,
)
This is not the main research extraction API. Use lake.query.field(...) for single-value panels.