← 返回文章列表
March 17, 2026
5 分钟阅读

自适应下钻:从分钟到原始交易的可变粒度回测

自适应下钻:从分钟到原始交易的可变粒度回测
#算法交易
#回测
#parquet
#优化
#粒度
#下钻
#自适应分辨率

分钟级K线是回测的标准粒度。但在一根分钟K线内,价格的波动幅度各不相同:有时仅为0.01%,有时却达到2%。当止损和止盈同时落在一根分钟K线的[最低价, 最高价]范围内时,回测无法知道哪个先被触发。这就是成交歧义(fill ambiguity)问题。

朴素的解决方案是对整个回测使用秒级数据。但两年的数据意味着约6300万根秒级K线,而非约100万根分钟K线。存储空间增加60倍,速度也按比例下降。

自适应下钻解决了这个问题:仅在真正需要的地方使用更细粒度

成交歧义:止损和止盈都落在同一根K线范围内

问题:大K线上的成交歧义

考虑一个具体场景。策略以3000 USDT开多。止损:2970(-1%)。止盈:3060(+2%)。

14:37的分钟K线:

  • 开盘价:3010
  • 最高价:3065
  • 最低价:2965
  • 收盘价:3050

止损(2970)和止盈(3060)都落在[2965, 3065]范围内。哪个先被触发?

可能的结果:

  • 价格先下行 → 触发止损 → 亏损 -1%
  • 价格先上行 → 触发止盈 → 盈利 +2%

单笔交易的差异:3个百分点。若使用10倍杠杆则为30%。对于包含数百笔交易的回测,错误的成交歧义解析会系统性地扭曲结果。

框架的默认处理方式

大多数回测引擎使用以下两种启发式方法之一:

  1. 乐观式: 止盈先触发 → 结果偏高
  2. 悲观式: 止损先触发 → 结果偏低

两种方法都是猜测。真实数据在秒级甚至毫秒级是可获取的,既然可以查看真实数据,就没有理由去猜测。

下钻:四级策略

Adaptive four-level drill-down resolution pyramid

下钻的思路:从分钟级开始,仅在存在歧义时"下钻"到更低级别——基于价格波动或成交量异常。

1级:1m(分钟K线)
  → 如果止损或止盈明确在[最低价, 最高价]范围之外——当场解决
  → 如果两者都在范围内——下钻 ↓

第2级:1s(秒级K线)
  → 加载该分钟的60根秒级K线
  → 逐秒遍历:哪个先被触发?
  → 如果秒级K线有歧义,或 price_move >= min_pct,或 volume >= median_1s * vol_mult——下钻 ↓

第3级:100ms(毫秒级K线)
  → 加载该秒最多10100ms的K线
  → 逐100ms遍历
  → 如果100ms K线有歧义,或 price_move >= min_pct,或 volume >= median_100ms * vol_mult——下钻 ↓

第4级:原始交易(raw trades)
  → 加载该100ms桶中的单笔交易
  → 逐笔解析成交——最大可能的精度

何时不需要下钻

95%的情况下不需要下钻。典型场景:

明确的止损: K线最高价未达到止盈,最低价击穿止损 → 止损触发,无需下钻。

明确的止盈: 最低价未达到止损,最高价突破止盈 → 止盈触发,无需下钻。

都未触发: 两个价位都在范围之外 → 持仓继续。

跳空检测: 下一根K线的开盘价跳过止损或止盈 → 按开盘价成交,无需下钻。

下钻仅在约5%的K线中需要——即两个价位都落在单根K线范围内时。

class AdaptiveFillSimulator:
    """
    四级下钻,用于确定成交顺序。
    """
    def __init__(self, data_loader):
        self.loader = data_loader
        self.cache_1s = {}  # 按月缓存的秒级数据

    def check_fill(self, timestamp, candle_1m, sl_price, tp_price, side):
        """
        检查给定分钟K线上是否触发了止损或止盈。

        Returns: ('sl', fill_price) | ('tp', fill_price) | None
        """
        low, high = candle_1m['low'], candle_1m['high']

        open_price = candle_1m['open']
        if side == 'long':
            if open_price <= sl_price:
                return ('sl', open_price)
            if open_price >= tp_price:
                return ('tp', open_price)
        else:
            if open_price >= sl_price:
                return ('sl', open_price)
            if open_price <= tp_price:
                return ('tp', open_price)

        sl_hit = self._level_hit(sl_price, low, high, side, 'sl')
        tp_hit = self._level_hit(tp_price, low, high, side, 'tp')

        if sl_hit and not tp_hit:
            return ('sl', sl_price)
        if tp_hit and not sl_hit:
            return ('tp', tp_price)
        if not sl_hit and not tp_hit:
            return None

        return self._drill_down_1s(timestamp, sl_price, tp_price, side)

    def _drill_down_1s(self, minute_ts, sl_price, tp_price, side):
        """第2级:逐秒遍历。"""
        bars_1s = self.loader.load_1s_for_minute(minute_ts)

        if bars_1s is None or len(bars_1s) == 0:
            return self._pessimistic_fill(side, sl_price, tp_price)

        for bar in bars_1s:
            sl_hit = self._level_hit(sl_price, bar['low'], bar['high'], side, 'sl')
            tp_hit = self._level_hit(tp_price, bar['low'], bar['high'], side, 'tp')

            if sl_hit and not tp_hit:
                return ('sl', sl_price)
            if tp_hit and not sl_hit:
                return ('tp', tp_price)
            if sl_hit and tp_hit:
                result = self._drill_down_100ms(bar['timestamp'], sl_price, tp_price, side)
                if result:
                    return result

        return self._pessimistic_fill(side, sl_price, tp_price)

    def _pessimistic_fill(self, side, sl_price, tp_price):
        """悲观假设:多头触发止损,空头触发止损。"""
        if side == 'long':
            return ('sl', sl_price)
        else:
            return ('sl', sl_price)

性能

模式 单次成交检查时间 使用场景
1m(无下钻) ~0ms ~95%的情况
1s 下钻 ~5ms(首次访问该月) ~5%的情况
100ms 下钻 ~1ms <0.5%的情况
原始交易下钻 ~0.5ms <0.1%的情况

在两年约400笔交易的回测中,下钻大约被调用20次。总开销——整个回测不到1秒。

自适应数据存储

下钻需要秒级和毫秒级数据。但以最大粒度存储所有数据是不切实际的:

粒度 两年的K线数 Parquet 大小
1m ~105万 ~15 MB
1s ~6300万 ~550 MB/月
100ms ~6.3亿 ~5 GB/月

两年的完整1秒存档约13 GB。100毫秒超过100 GB。全部存储是可以的,但考虑到下钻使用的数据不到1%,这是浪费的。

热点秒检测

Hot-second detection and adaptive storage savings

关键观察:价格显著波动的秒数只占很小的比例。如果某秒内价格变化不到0.1%——就没有必要存储该秒的100ms细分数据。

热点秒检测:在下载和处理数据时,分析每一秒并仅为"热点秒"生成100ms K线——即价格波动超过阈值的秒。

def process_trades_adaptive(
    trades: pd.DataFrame,
    min_price_change_pct: float = 1.0,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    将原始交易数据处理为自适应结构:
    - 所有秒的1秒K线
    - 仅"热点秒"的100ms K线

    Args:
        trades: 包含 [timestamp, price, quantity] 列的 DataFrame
        min_price_change_pct: 下钻到100ms的阈值

    Returns:
        (df_1s, df_100ms_hot) — 秒级K线和热点秒的100ms K线
    """
    trades['second'] = trades['timestamp'].dt.floor('1s')
    df_1s = trades.groupby('second').agg(
        open=('price', 'first'),
        high=('price', 'max'),
        low=('price', 'min'),
        close=('price', 'last'),
        volume=('quantity', 'sum'),
    )

    df_1s['price_change_pct'] = (df_1s['high'] - df_1s['low']) / df_1s['open'] * 100
    hot_seconds = df_1s[df_1s['price_change_pct'] >= min_price_change_pct].index

    hot_trades = trades[trades['second'].isin(hot_seconds)]
    hot_trades['bucket_100ms'] = hot_trades['timestamp'].dt.floor('100ms')

    df_100ms = hot_trades.groupby('bucket_100ms').agg(
        open=('price', 'first'),
        high=('price', 'max'),
        low=('price', 'min'),
        close=('price', 'last'),
        volume=('quantity', 'sum'),
    )

    return df_1s, df_100ms

存储节省

以 ETHUSDT 典型月份为例:

方案 大小 粒度
仅1m ~1 MB 1分钟
全部1s ~550 MB 1秒
全部100ms ~5 GB 100毫秒
自适应 ~600 MB 1s + 仅热点秒的100ms

当阈值 min_price_change_pct = 1.0% 时,热点秒占所有秒的不到1%。它们的100ms数据仅在550 MB秒级数据基础上增加约50 MB——几乎可以忽略不计。

如果秒级数据也采用自适应存储(仅当分钟内波动超过0.1%时),存储量还可再减少3-5倍。

自适应Parquet存储层级:分钟、秒、热点毫秒和交易文件

Parquet 存储结构

data/{SYMBOL}/
├── source.json                # 数据来源:{"exchange": "binance"} 或 {"exchange": "bybit"}
├── stats.json                 # 预计算的成交量中位数:{"median_volume_1s": ..., "median_volume_100ms": ...}
├── klines_1m/
   ├── 2024-01.parquet       # ~1 MB
   ├── 2024-02.parquet
   └── ...
├── klines_1s/
   ├── 2024-01.parquet       # ~550 MB
   └── ...
├── klines_100ms_hot/
   ├── 2024-01.parquet       # ~50 MB(仅热点秒)
   └── ...
├── trades_hot/
   ├── 2024-01.parquet       # 热点100ms桶的原始交易
   └── ...
└── states_1m.parquet          # 预计算的滚动状态缓存(~112 MB)

每个文件包含一个月的数据。秒级、毫秒级数据和原始交易采用延迟加载——仅在下钻请求时才加载。stats.json 文件包含预计算的成交量中位数,用于基于成交量的下钻触发。

针对金融数据的 Parquet 优化

金融数据有其特殊性:时间戳单调递增,价格变化平滑,成交量波动较大。最优配置:

import pyarrow as pa
import pyarrow.parquet as pq

schema = pa.schema([
    pa.field("timestamp", pa.int32()),    # 从 epoch 起的秒数——int32 足够
    pa.field("open",      pa.float32()),
    pa.field("high",      pa.float32()),
    pa.field("low",       pa.float32()),
    pa.field("close",     pa.float32()),
    pa.field("volume",    pa.float32()),
])

column_encodings = {
    "timestamp": "DELTA_BINARY_PACKED",   # 单调整数 → 差分压缩
    "open":      "BYTE_STREAM_SPLIT",     # 浮点数 → 字节流分割
    "high":      "BYTE_STREAM_SPLIT",
    "low":       "BYTE_STREAM_SPLIT",
    "close":     "BYTE_STREAM_SPLIT",
    "volume":    "BYTE_STREAM_SPLIT",
}

def save_optimized_parquet(df, path):
    table = pa.Table.from_pandas(df, schema=schema)
    pq.write_table(
        table, path,
        compression="zstd",
        compression_level=9,
        use_dictionary=False,
        write_statistics=False,
        column_encoding=column_encodings,
    )

为什么使用这些配置:

  • DELTA_BINARY_PACKED 用于时间戳:连续的时间戳之间差值固定(1m为60,1s为1)。差分编码将其压缩到接近零。
  • BYTE_STREAM_SPLIT 用于浮点数:将 float32 的字节分流(所有第一字节在一起,所有第二字节在一起,依此类推)。对于平滑变化的价格,比标准编码压缩率高2-3倍。
  • ZSTD level 9:在可接受的解压速度下实现良好的压缩。
  • float32 代替 float64:对于价格和成交量足够,节省50%内存。

带缓存的延迟加载

下钻请求特定分钟的秒级数据。每次请求都加载一个 parquet 文件太慢。解决方案——按月进行 LRU 缓存的延迟加载。

from functools import lru_cache
import pyarrow.parquet as pq
import pandas as pd

class AdaptiveDataLoader:
    """
    带缓存的延迟加载器:按月加载秒级数据,
    在内存中保留最近 N 个月。
    """
    def __init__(self, symbol: str, data_dir: str = "data", cache_months: int = 2):
        self.symbol = symbol
        self.data_dir = data_dir
        self.cache_months = cache_months
        self._cache_1s: dict[str, pd.DataFrame] = {}

    def load_1s_for_minute(self, minute_ts: pd.Timestamp) -> pd.DataFrame | None:
        """加载特定分钟的1秒数据。"""
        month_key = minute_ts.strftime("%Y-%m")

        if month_key not in self._cache_1s:
            self._load_month_1s(month_key)

        if month_key not in self._cache_1s:
            return None

        df = self._cache_1s[month_key]
        minute_start = minute_ts.floor('1min')
        minute_end = minute_start + pd.Timedelta(minutes=1)

        return df[(df.index >= minute_start) & (df.index < minute_end)]

    def load_100ms_for_second(self, second_ts: pd.Timestamp) -> pd.DataFrame | None:
        """加载热点秒的100ms数据。"""
        month_key = second_ts.strftime("%Y-%m")
        path = f"{self.data_dir}/{self.symbol}/klines_100ms_hot/{month_key}.parquet"

        try:
            df = pd.read_parquet(path)
            second_start = second_ts.floor('1s')
            second_end = second_start + pd.Timedelta(seconds=1)
            return df[(df.index >= second_start) & (df.index < second_end)]
        except FileNotFoundError:
            return None

    def _load_month_1s(self, month_key: str):
        """加载一个月的1秒数据,从缓存中淘汰旧数据。"""
        path = f"{self.data_dir}/{self.symbol}/klines_1s/{month_key}.parquet"
        try:
            df = pd.read_parquet(path)
            df.index = pd.to_datetime(df['timestamp'], unit='s')

            if len(self._cache_1s) >= self.cache_months:
                oldest = min(self._cache_1s.keys())
                del self._cache_1s[oldest]

            self._cache_1s[month_key] = df
        except FileNotFoundError:
            pass

将下钻应用于回测

集成到回测循环中:

def backtest_with_adaptive_fill(
    states: pd.DataFrame,
    strategy_params: dict,
    data_loader: AdaptiveDataLoader,
) -> list:
    """
    使用自适应下钻进行成交模拟的回测。
    """
    fill_sim = AdaptiveFillSimulator(data_loader)
    trades = []
    position = None

    for i in range(len(states)):
        row = states.iloc[i]
        ts = states.index[i]

        candle_1m = {
            'open': row['open'], 'high': row['high'],
            'low': row['low'], 'close': row['close'],
            'timestamp': ts,
        }

        if position is not None:
            fill = fill_sim.check_fill(
                ts, candle_1m,
                position['sl'], position['tp'],
                position['side'],
            )

            if fill is not None:
                fill_type, fill_price = fill
                trades.append({
                    'entry_time': position['entry_time'],
                    'exit_time': ts,
                    'side': position['side'],
                    'entry_price': position['entry_price'],
                    'exit_price': fill_price,
                    'exit_type': fill_type,
                    'drill_down': fill_sim.last_drill_depth,  # 0、1 或 2
                })
                position = None
                continue

        signal = check_entry_signal(row, strategy_params)
        if signal and position is None:
            position = {
                'side': signal['side'],
                'entry_price': row['close'],
                'entry_time': ts,
                'sl': signal['sl'],
                'tp': signal['tp'],
            }

    return trades

与滚动状态缓存的关系

下钻与聚合 parquet 缓存互为补充——它们解决不同的问题:

滚动状态缓存 自适应下钻
目标 正确的高时间框架指标值 精确的止损/止盈执行顺序
作用于 每根1分钟K线 仅在成交歧义时(~5%)
数据 预计算,永久存储 延迟加载,缓存最近月份
影响 入场/出场信号 成交价格和时间

两种方法都消除了在日线级别不可见但对真实回测至关重要的错误。

总结:成交模拟方法对比

方法 精度 速度 存储
OHLC 启发式(乐观/悲观) 即时 仅1m
完整1秒回测 慢(x60) ~550 MB/月
完整100ms回测 很高 非常慢(x600) ~5 GB/月
完整原始交易回测 最高 极慢 ~50 GB/月
自适应下钻(4级) 最高 接近即时 1m + 1s + 热点100ms + 热点交易

下钻以1分钟回测的速度提供了完整1秒回测的精度。关键观察:高粒度并非处处需要——只在决策点需要

成交量突增触发向更细粒度层级下钻

基于成交量的下钻

原始的下钻仅基于价格波动触发——当K线的[最低价, 最高价]范围足够宽以产生成交歧义时。但价格并不是某个K线内发生重要事件的唯一信号。

成交量飙升是同样重要的触发条件。当某秒的成交量达到中位数的500倍时,通常对应着大额市价单、连环爆仓或闪崩。即使K线实体看起来很小,该秒内的实际价格轨迹可能非常剧烈——触及OHLC表示法所隐藏的极值。

下钻条件现在基于或逻辑:显著的价格波动或异常的成交量飙升都会触发向更细粒度的下钻。

def is_hot(bar, median_volume, min_pct=0.1, vol_mult=500):
    """
    判断一根K线是否需要下钻到下一级。
    两个独立触发条件(或逻辑):
      - K线内价格波动 >= min_pct
      - 成交量超过 median * vol_mult
    """
    price_move = (bar['high'] - bar['low']) / bar['open'] * 100
    return price_move >= min_pct or bar['volume'] >= median_volume * vol_mult

这捕获了仅靠价格检测无法发现的场景:一根open=3000、close=3001但成交量是正常值50000倍的K线,可能在毫秒内曾触及2950和3050。没有基于成交量的下钻,回测永远不会更仔细地检查这一秒。

原始交易:第四级

原始的三级层次(1m → 1s → 100ms)仍然留有空白:在单个100ms桶内,多笔交易可能以不同价格成交。对于high=3060和low=2965的桶,我们仍然不知道确切的顺序。

解决方案:下钻到原始交易作为第四级也是最终级别。

1m K线(基础)
  └─> 1s K线      (当 price_move >= min_pct 或 volume >= median_1s * vol_mult)
      └─> 100ms K线    (当检测到热点秒)
          └─> 原始交易     (当100ms显示 price_move >= min_pct 或 volume >= median_100ms * vol_mult)

在原始交易级别,没有歧义——每笔交易都有精确的价格和时间戳。成交被最终确定:

def resolve_from_trades(trades, sl_price, tp_price, side):
    """
    按时间顺序遍历单笔交易。
    第一笔穿越止损或止盈的交易决定成交。
    """
    for trade in trades:
        price = trade['price']
        if side == 'long':
            if price <= sl_price:
                return ('sl', price)
            if price >= tp_price:
                return ('tp', price)
        else:  # short
            if price >= sl_price:
                return ('sl', price)
            if price <= tp_price:
                return ('tp', price)
    return None

原始交易级别极少被调用——不到所有K线的0.1%——但当被调用时,它提供了任何K线近似都无法匹配的真实数据。

每个转换的独立阈值

不同分辨率之间的转换具有不同的特征。1秒内0.1%的价格波动是显著的;100ms桶内同样的0.1%则是极端的。同样,成交量分布在每个时间尺度上也不同。

每个级别转换现在有自己的 min_pctvol_mult 参数:

1s → 100ms:     --min-pct-1s 0.1     --vol-mult-1s 500
100ms → trades:  --min-pct-100ms 0.1  --vol-mult-100ms 500

这允许独立地微调每个转换的灵敏度。实际上,100ms到交易的转换可以使用更严格的阈值,因为加载单个100ms桶的原始交易的成本很小。

@dataclass
class DrillDownConfig:
    min_pct_1s: float = 0.1
    vol_mult_1s: float = 500
    min_pct_100ms: float = 0.1
    vol_mult_100ms: float = 500

持久化中位数统计

基于成交量的下钻需要知道每个时间尺度的成交量中位数。为每次回测实时计算中位数会抵消性能优势。解决方案:预计算一次中位数并缓存

对于每个交易对,1秒和100ms粒度的成交量中位数从历史数据中计算并存储在 stats.json 文件中:

{
  "ETHUSDT": {
    "median_volume_1s": 12.5,
    "median_volume_100ms": 1.8
  },
  "BTCUSDT": {
    "median_volume_1s": 0.45,
    "median_volume_100ms": 0.06
  }
}

统计数据在首次下载数据时为每个交易对计算一次,并在所有后续回测中复用。当数据更新(下载新月份)时,统计数据增量重新计算。

def compute_median_stats(symbol, data_dir):
    """为交易对计算并缓存成交量中位数统计。"""
    stats_path = f"{data_dir}/{symbol}/stats.json"

    all_1s = load_all_months(f"{data_dir}/{symbol}/klines_1s/")
    median_1s = all_1s['volume'].median()

    all_100ms = load_all_months(f"{data_dir}/{symbol}/klines_100ms_hot/")
    median_100ms = all_100ms['volume'].median()

    stats = {
        "median_volume_1s": float(median_1s),
        "median_volume_100ms": float(median_100ms),
    }

    with open(stats_path, 'w') as f:
        json.dump(stats, f, indent=2)

    return stats

多交易所数据流:Binance和Bybit汇聚成统一粒度层级

多交易所支持:Bybit

并非所有交易对都在Binance上可用。对于XAUTUSDT(黄金)等资产,数据必须来自其他交易所。下钻系统现在支持 Bybit 作为替代数据源。

对于Bybit交易对,所有K线级别(1m、1s、100ms)和原始交易都从Bybit的原始交易流构建。过程相同——原始交易在每个时间尺度上聚合为K线——但数据源不同。

data/{SYMBOL}/
├── source.json              # {"exchange": "bybit"} 或 {"exchange": "binance"}
├── klines_1m/
│   └── ...
├── klines_1s/
│   └── ...
├── klines_100ms_hot/
│   └── ...
└── trades_hot/              # 热点100ms桶的原始交易
    └── ...

数据加载器检查 source.json 并使用相应的下载管道。从回测引擎的角度来看,无论数据来源是哪个交易所,数据格式都是相同的——下钻逻辑与交易所无关。

这对于跨交易所策略或仅在特定平台上交易的交易对尤为重要。

结论

自适应下钻是一个简单原则的应用:按数据重要性的比例投入计算资源和存储空间

四个粒度级别:

  1. 1m — 95%K线的基础遍历
  2. 1s — 成交歧义或成交量飙升时的下钻
  3. 100ms — 极端波动或异常成交量热点秒的下钻
  4. 原始交易 — 热点100ms桶的下钻,在单笔交易级别解析成交

四个存储级别:

  1. 全部1m — 完整存档,两年约15 MB
  2. 全部1s — 完整或自适应存档,~550 MB/月
  3. 仅热点100ms — 不到1%的秒,~50 MB/月
  4. 仅热点交易 — 最极端100ms桶的原始交易

两个下钻触发条件(或逻辑):

  • 基于价格:K线价格范围超过 min_pct
  • 基于成交量:K线成交量超过 median * vol_mult

结果:以分钟级速度获得逐笔模拟器的精度。存储空间线性增长而非指数增长。并且支持多个交易所——Binance和Bybit——下钻逻辑与交易所无关。

关于多时间框架策略的预计算缓存,请参阅文章 聚合 Parquet 缓存。关于资金费率在高杠杆下对结果的影响 — 资金费率正在摧毁你的杠杆


参考链接

  1. Apache Parquet — 数据存储格式
  2. Apache Arrow — BYTE_STREAM_SPLIT 编码
  3. Zstandard — 压缩算法
  4. Lopez de Prado — Advances in Financial Machine Learning
  5. Binance — 历史市场数据

引用

@article{soloviov2026adaptivedrilldown,
  author = {Soloviov, Eugen},
  title = {Adaptive Drill-Down: Backtest with Variable Granularity from Minutes to Milliseconds},
  year = {2026},
  url = {https://marketmaker.cc/ru/blog/post/adaptive-resolution-drill-down-backtest},
  description = {自适应数据粒度如何加速回测并节省存储空间:仅在价格显著波动或成交量异常的位置从1分钟下钻到1秒、100毫秒和原始交易。}
}
免责声明:本文提供的信息仅用于教育和参考目的,不构成财务、投资或交易建议。加密货币交易涉及重大损失风险。

MarketMaker.cc Team

量化研究与策略

在 Telegram 中讨论
Newsletter

紧跟市场步伐

订阅我们的时事通讯,获取独家 AI 交易见解、市场分析和平台更新。

我们尊重您的隐私。您可以随时退订。