数据源适配器负责获取外部数据,并可选择规划数据源专用请求。新增数据源不应要求修改存储、查询、财务处理或管理模块。
数据源协议
实现 DataSource 协议:
from collections.abc import Iterable, Mapping
from typing import Any
import polars as pl
from bagelquant_data.core.dataset import DatasetSpec
from bagelquant_data.core.request import RequestContext
class MySource:
@property
def name(self) -> str:
return "my_source"
def configure(self, **options: Any) -> None:
...
def test_connection(self) -> None:
...
def fetch(
self,
source_dataset: str,
request: Mapping[str, Any],
) -> pl.DataFrame:
...
def plan_requests(
self,
dataset: DatasetSpec,
context: RequestContext,
) -> Iterable[Mapping[str, Any]]:
...
注册数据源
from bagelquant_data import DataLake
lake = DataLake.open("data")
lake.sources.register(MySource())
lake.sources.configure("my_source", token="...")
lake.sources.test("my_source")
凭证规则
凭证应通过以下方式提供:
- 环境变量
- 运行时配置
- 本地未跟踪配置
- 未来的 secret provider 集成
不要把 secret 写入:
- 数据集 YAML
- Parquet 文件
- 已提交文档
- 已提交 TOML 文件
- SQLite 元数据行
请求规划
plan_requests 接收 DatasetSpec 和 RequestContext。
对 snapshot API,发出一个请求:
yield {"start_date": context.start, "end_date": context.end}
对按资产请求的 API,每个资产发一个请求:
for asset in context.assets or []:
yield {"asset_id": asset}
对分页 API,发出分页参数。核心更新 pipeline 不应知道提供商的分页细节。
获取数据
fetch 返回 Polars DataFrame。如果提供商 SDK 返回 pandas,在适配器内转换:
return pl.from_pandas(response.copy(deep=True))
标准化边界
数据源适配器应尽量保留源字段。标准命名通过数据集 spec 和 normalizer 完成。
不要在适配器里隐藏有经济意义的记录。有效修订、重述、重复预测和多个 point-in-time 记录都应进入标准存储,除非校验证明它们格式错误。
测试建议
推荐测试:
repr不泄露 tokenconfigure不持久化 secrettest_connection抛出有用错误fetch将提供商响应转换为 Polarsplan_requests尊重start、end、assets和source_options