管理 API 从 DataLake.open(...) 暴露。
from bagelquant_data import DataLake
lake = DataLake.open("data")
门面对象包含:
lake.sourceslake.datasetslake.updatelake.querylake.financelake.status
数据源管理
注册数据源:
from bagelquant_data.sources.tushare import TushareSource
lake.sources.register(TushareSource(name="tushare"))
配置数据源:
lake.sources.configure("tushare", token="...")
Tushare 便捷方法会委托给通用配置方法:
lake.sources.configure_tushare(token="...")
列出已注册数据源:
sources = lake.sources.list()
获取适配器:
tushare = lake.sources.get("tushare")
测试连接:
lake.sources.test("tushare")
移除数据源注册:
lake.sources.remove("tushare")
移除注册不会删除标准数据。
数据集管理
数据集行为由 DatasetSpec 对象或 YAML 文件声明。
添加 spec 对象:
from bagelquant_data import DatasetSpec
spec = DatasetSpec(
name="daily",
source="custom",
source_dataset="daily",
category="market",
field_mapping={"ts_code": "ts_code", "trade_date": "trade_date"},
required_columns=("asset_id", "time"),
primary_key=("asset_id", "time"),
asset_column="ts_code",
time_column="trade_date",
partition_strategy="year_month",
deduplication="primary_key_last",
sort_columns=("time", "asset_id"),
)
lake.datasets.add(spec)
添加 YAML spec:
lake.datasets.add_from_yaml(
"src/bagelquant_data/sources/tushare/datasets/daily.yaml"
)
获取数据集:
spec = lake.datasets.get("daily", source="tushare")
列出数据集:
all_datasets = lake.datasets.list()
tushare_datasets = lake.datasets.list("tushare")
启用或禁用数据集:
lake.datasets.enable("daily", source="tushare")
lake.datasets.disable("daily", source="tushare")
仅移除注册,不删除数据:
lake.datasets.remove("daily", source="tushare")
删除标准数据必须显式确认:
lake.datasets.remove(
"daily",
source="tushare",
delete_data=True,
confirm=True,
)
状态和检查
汇总:
summary = lake.status.summary()
数据集状态:
status = lake.status.dataset("income", source="tushare")
分区 manifest:
partitions = lake.status.partitions("income", source="tushare")
最近摄取 run:
runs = lake.status.runs(limit=20)
失败 run:
failures = lake.status.failures(dataset="income", source="tushare")
manifest 已知文件:
files = lake.status.files("income", source="tushare")
普通状态查询使用 SQLite manifest 元数据,成本较低,不需要扫描所有 Parquet 文件。
标准记录检查
用于人工检查:
records = lake.query.records(
"income",
source="tushare",
limit=10,
)
这不是主要研究提取 API。单值面板应使用 lake.query.field(...)。