← 返回文章列表
March 22, 2026
5 分钟阅读

算法交易的K线类型与聚合方法

算法交易的K线类型与聚合方法
#algotrading
#candles
#market microstructure
#Lopez de Prado
#order flow
#backtesting
#research

你在 Binance、TradingView 或任何交易所界面上看到的每一张K线图,构建方式都完全相同:在固定时间窗口内聚合成交——1分钟、5分钟、1小时——然后生成一根 OHLCV K线。这种做法如此普遍,以至于大多数交易者从未质疑过它。但对于算法交易而言,K线类型的选择和聚合方法是两个独立的决策——而大多数系统将二者混为一谈。

本文将K线构建分离为两个轴:构建什么类型的K线(17种类型)以及如何将它们聚合为更高时间框架(3种方法)。两者的组合产生51种可能的配置,每种配置在回测、实盘交易和信号生成中具有不同的特性。

关于原始成交如何转化为标准K线的入门介绍,请参阅 交易K线揭秘


要点速览

  • K线构建有两个独立轴:K线类型和聚合方法
  • 17种基础K线类型:时间、成交笔数(tick)、成交量、美元、砖形(Renko)、范围、波动率、Heikin-Ashi、卡吉(Kagi)、折线突破(Line Break)、点数图(P&F)、成交笔数失衡(TIB)、成交量失衡(VIB)、游程(run)、CUSUM、熵(entropy)、Delta
  • 3种聚合方法:日历对齐、滚动窗口、自适应滚动
  • 17 × 3 = 51 种可能的组合,每种具有不同特性
  • 大多数系统只使用一种组合:日历对齐的时间K线。其余50种尚未被充分利用。
  • 实践建议:分层使用多种组合——滚动时间K线用于信号、日历时间K线用于市场结构、信息驱动K线用于微观结构

K线构建的两个轴

传统视角将所有K线类型排成一个扁平列表:时间K线、tick K线、成交量K线、砖形图等。这是有误导性的。实际上存在两个正交的选择:

轴1——基础K线类型(17种类型): 如何决定一根新K线何时关闭?在固定时间间隔之后?在N笔成交之后?在价格移动之后?当信息含量发生变化时?这决定了"一根K线"的含义。

轴2——聚合方法(3种方法): 如何将基础K线组合成更高时间框架的K线?对齐到日历边界(00:00、01:00、...)?使用最近N根K线的滚动窗口?根据波动率自适应调整窗口大小?

这两个轴是独立的。你可以拥有:

  • 日历对齐的 tick K线 —— 将14:00到14:59之间关闭的 tick K线聚合为一根小时K线
  • 滚动成交量K线 —— 取最近24根成交量K线,不管它们何时关闭
  • 自适应 Delta K线 —— 在 Delta K线上使用波动率驱动的窗口

标准的"1小时K线"只是这个17×3矩阵中的一个点:时间K线 + 日历对齐。其他每一种组合都是值得考虑的替代方案。


1. 时间K线(标准)

日历时间K线问题 信息密度不均:刚性时间边界将200笔成交的平静时段与50,000笔成交的公告时段同等对待。

默认类型。固定时间间隔后形成一根新K线:1分钟、5分钟、1小时。每个交易所都原生提供这些数据。

特性:

  • 在亚洲时段(00:00–08:00 UTC),一根1小时K线可能包含200笔成交。在 Binance 上币公告期间,同一窗口可能包含50,000笔成交。时间K线将两者等同对待。检测此类活跃度峰值对于机器人保护至关重要——参见 交易机器人的异常检测
  • 所有市场参与者看到相同的K线边界——一个谢林焦点。这使得时间K线对于分析群体行为至关重要。
  • 在重启后对不完整K线计算的指标会产生垃圾值。
from datetime import datetime

def time_until_valid_hourly_candle():
    """How long until the first complete hourly candle after restart."""
    now = datetime.utcnow()
    minutes_into_hour = now.minute
    seconds_into_minute = now.second

    wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
    wait_seconds += 3600

    return wait_seconds

2–4. 基于活跃度的K线

基于活跃度的K线 Tick K线、成交量K线和美元K线:三种让市场参与度——而非时钟——决定K线边界的方法。

不再按固定时间间隔采样,而是在固定量的市场活动之后采样。这会产生信息含量大致相等的K线,不受一天中时段的影响。

2. Tick K线

每N笔成交(tick)后形成一根新K线。在高活跃期,K线快速形成。在平静时期,一根K线可能跨越数小时。

from collections import deque
from dataclasses import dataclass

@dataclass
class OHLCV:
    timestamp: int
    open: float
    high: float
    low: float
    close: float
    volume: float

class TickBarGenerator:
    """
    Generates a new bar every `threshold` trades.
    Each bar contains equal number of market "opinions".
    """

    def __init__(self, threshold: int = 1000):
        self.threshold = threshold
        self.trades: list[tuple[float, float]] = []  # (price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((price, qty))

        if len(self.trades) >= self.threshold:
            self._close_bar(timestamp)

    def _close_bar(self, timestamp: int):
        prices = [t[0] for t in self.trades]
        volumes = [t[1] for t in self.trades]

        bar = OHLCV(
            timestamp=timestamp,
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

优点: 自然适应市场活跃度。tick K线的收益率分布比时间K线的收益率分布更接近正态分布——这一特性可以提高许多统计模型的性能。

缺点: 需要原始成交流(并非所有数据提供商都提供历史 tick 数据)。K线关闭时间不可预测——你无法说"下一根K线将在X时关闭。"

3. 成交量K线

当N个合约(或加密货币中的代币)成交后,形成一根新K线。与 tick K线类似,但按成交规模加权——一笔100 BTC的交易贡献的权重是1 BTC交易的100倍。

class VolumeBarGenerator:
    """
    Generates a new bar every `threshold` units of volume.
    Normalizes for trade size: one large order ≠ one small order.
    """

    def __init__(self, threshold: float = 100.0):
        self.threshold = threshold
        self.accumulated_volume = 0.0
        self.trades: list[tuple[int, float, float]] = []  # (ts, price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_volume += qty

        if self.accumulated_volume >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_volume = 0.0
        self.trades = []
        return bar

4. 美元K线

当固定名义价值(以 USD/USDT 计)的交易完成后,形成一根新K线。这是基于活跃度的K线中最稳健的类型,因为它同时对成交笔数和价格水平进行了标准化。

考虑一下:如果 ETH 从 1,000涨到1,000 涨到 4,000,卖出 10,000ETH10,000 的 ETH 在 4,000 时需要 2.5 ETH,但在 $1,000 时需要 10 ETH。成交量K线会对这两种情况区别对待;美元K线则一视同仁。

class DollarBarGenerator:
    """
    Generates a new bar every `threshold` dollars (USDT) of notional volume.
    Most robust normalization: independent of price level.

    Lopez de Prado (2018) recommends dollar bars as the default
    for most quantitative applications.
    """

    def __init__(self, threshold: float = 1_000_000.0):
        self.threshold = threshold
        self.accumulated_dollars = 0.0
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_dollars += price * qty

        if self.accumulated_dollars >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_dollars = 0.0
        self.trades = []
        return bar

阈值选择

基于活跃度的K线的阈值应设置为每天产生的K线数量与你要替换的时间K线大致相同。以 Binance 上的 BTCUSDT 为例:

K线类型 典型阈值 每日约K线数 等效时间框架
Tick 1,000 笔成交 ~1,400 ~1分钟
Tick 50,000 笔成交 ~28 ~1小时
成交量 100 BTC ~600 ~2-3分钟
成交量 2,400 BTC ~25 ~1小时
美元 $1M ~1,400 ~1分钟
美元 $50M ~28 ~1小时

这些数字是近似值,会随市场状态发生巨大变化。在上涨或崩盘期间,基于活跃度的K线会产生比平常多5-10倍的K线——这正是其核心意义。

5–7. 基于价格的K线

基于价格的K线 砖形图(Renko)、范围K线和波动率K线:仅在价格移动足够显著时才进行采样。

基于价格的K线同时忽略时间和活跃度。只有当价格移动了指定幅度时,才形成一根新K线。这自然过滤了横盘噪音并突出趋势。

5. Renko K线

当收盘价相对前一块砖的收盘价移动至少N个单位时,形成一块新的 Renko "砖"。砖块大小始终相同,为趋势方向创建清晰的视觉表示。

class RenkoBarGenerator:
    """
    Generates Renko bricks based on price movement.

    Key property: during sideways movement, no new bricks form.
    During strong trends, bricks form rapidly.
    """

    def __init__(self, brick_size: float = 10.0):
        self.brick_size = brick_size
        self.bricks: list[dict] = []
        self.last_close: float | None = None

    def on_price(self, timestamp: int, price: float, volume: float = 0.0):
        if self.last_close is None:
            self.last_close = price
            return []

        new_bricks = []
        diff = price - self.last_close
        num_bricks = int(abs(diff) / self.brick_size)

        if num_bricks == 0:
            return []

        direction = 1 if diff > 0 else -1

        for i in range(num_bricks):
            brick_open = self.last_close
            brick_close = self.last_close + direction * self.brick_size

            brick = {
                'timestamp': timestamp,
                'open': brick_open,
                'high': max(brick_open, brick_close),
                'low': min(brick_open, brick_close),
                'close': brick_close,
                'volume': volume / num_bricks if num_bricks > 0 else 0,
                'direction': direction,
            }
            new_bricks.append(brick)
            self.last_close = brick_close

        self.bricks.extend(new_bricks)
        return new_bricks

动态 Renko 使用 ATR(平均真实波幅)代替固定砖块大小,自动适应波动率。

6. 范围K线

每根K线具有固定的最高-最低价范围。当范围被超过时,K线关闭并开始新的一根。与 Renko 不同,范围K线包含影线,可以展示K线内的波动情况。

class RangeBarGenerator:
    """
    Generates bars with a fixed high-low range.

    Difference from Renko: range bars show the full OHLC within
    the range, not just brick direction. More information-rich.
    """

    def __init__(self, range_size: float = 20.0):
        self.range_size = range_size
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_open: float | None = None
        self.current_volume: float = 0.0
        self.current_start_ts: int = 0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_start_ts = timestamp

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        if self.current_high - self.current_low >= self.range_size:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            self.current_start_ts = timestamp

            return bar

        return None

Renko 和范围K线的关键区别: Renko 仅跟踪收盘价并展示方向;范围K线跟踪完整的价格范围并展示K线内部结构。范围K线通常对算法交易更有用,因为它们保留了止损和止盈模拟所需的最高-最低价信息。

7. 波动率K线

当K线内波动率达到动态阈值时——例如近期 ATR 的倍数——形成一根新K线。与范围K线(固定阈值)不同,波动率K线会适应市场状况。

class VolatilityBarGenerator:
    """
    Generates bars when intra-bar volatility reaches a threshold.

    Similar to range bars, but the threshold adapts to market conditions
    using a rolling ATR measure. In calm markets, bars need less
    absolute movement to close; in volatile markets, more.
    """

    def __init__(
        self,
        atr_period: int = 14,
        atr_multiplier: float = 1.0,
        initial_threshold: float = 20.0,
    ):
        self.atr_period = atr_period
        self.atr_multiplier = atr_multiplier
        self.threshold = initial_threshold

        self.recent_ranges: list[float] = []
        self.current_open: float | None = None
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_volume: float = 0.0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        intra_bar_range = self.current_high - self.current_low

        if intra_bar_range >= self.threshold:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.recent_ranges.append(intra_bar_range)
            if len(self.recent_ranges) > self.atr_period:
                self.recent_ranges = self.recent_ranges[-self.atr_period:]
            if len(self.recent_ranges) >= self.atr_period:
                avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
                self.threshold = avg_range * self.atr_multiplier

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            return bar

        return None

8. Heikin-Ashi(平滑变换)

Heikin-Ashi 变换 Heikin-Ashi:均值变换将噪声K线转化为平滑的趋势信号——但代价是丢失精确价格信息。

Heikin-Ashi(日语意为"平均K线")不是一种K线类型——它是一种变换,可以应用于任何基础K线类型之上。它通过对当前和前一根K线的值求平均来平滑K线:

  • HA 收盘价 = (开盘价 + 最高价 + 最低价 + 收盘价) / 4
  • HA 开盘价 = (前一根 HA 开盘价 + 前一根 HA 收盘价) / 2
  • HA 最高价 = max(最高价, HA 开盘价, HA 收盘价)
  • HA 最低价 = min(最低价, HA 开盘价, HA 收盘价)

趋势表现为连续同色K线的序列,无下影线(上升趋势)或无上影线(下降趋势)。

class HeikinAshiTransformer:
    """
    Transforms standard OHLCV candles into Heikin-Ashi candles.

    Can be applied on top of ANY bar type: time bars, volume bars,
    rolling bars, etc. It's a transformation, not a sampling method.

    WARNING: HA prices are synthetic — they don't represent real
    traded prices. Never use HA close for order placement or
    PnL calculation. Use HA only for signal generation, then
    execute at real prices.
    """

    def __init__(self):
        self.prev_ha_open: float | None = None
        self.prev_ha_close: float | None = None

    def transform(self, candle: OHLCV) -> OHLCV:
        ha_close = (candle.open + candle.high + candle.low + candle.close) / 4

        if self.prev_ha_open is None:
            ha_open = (candle.open + candle.close) / 2
        else:
            ha_open = (self.prev_ha_open + self.prev_ha_close) / 2

        ha_high = max(candle.high, ha_open, ha_close)
        ha_low = min(candle.low, ha_open, ha_close)

        self.prev_ha_open = ha_open
        self.prev_ha_close = ha_close

        return OHLCV(
            timestamp=candle.timestamp,
            open=ha_open,
            high=ha_high,
            low=ha_low,
            close=ha_close,
            volume=candle.volume,
        )

    def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
        """Transform an entire series. Resets state first."""
        self.prev_ha_open = None
        self.prev_ha_close = None
        return [self.transform(c) for c in candles]

def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
    """
    Simple HA trend signal.

    Returns:
        +1: bullish (N consecutive green HA candles with no lower wick)
        -1: bearish (N consecutive red HA candles with no upper wick)
         0: no clear trend
    """
    if len(ha_candles) < lookback:
        return 0

    recent = ha_candles[-lookback:]

    all_bullish = all(
        c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
        for c in recent
    )

    all_bearish = all(
        c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
        for c in recent
    )

    if all_bullish:
        return 1
    elif all_bearish:
        return -1
    return 0

回测中的关键注意事项: Heikin-Ashi 价格是合成的。如果你的回测使用 HA 收盘价作为入场价格,结果将是错误的。始终仅将 HA 用于信号生成,并以真实 OHLC 价格执行交易。

HA 有用的场景: 需要清晰"持仓"信号的趋势跟踪策略。在任何基础K线类型——时间K线、成交量K线、美元K线——之上应用 HA,以过滤虚假交叉信号。

HA 有害的场景: 任何需要精确价格水平的策略——支撑/阻力、订单簿分析、PIQ(队列位置)。均值化会破坏精确的价格信息。

9–11. 日本反转图表

日本图表方法 卡吉图、折线突破图和点数图:完全无时间维度的图表方法,纯粹关注价格结构。

这些是传统的日本图表方法(与 Renko 并列),完全抛弃时间维度,专注于价格结构。

9. 卡吉图(Kagi)

卡吉图由垂直线组成,当价格反转达到指定幅度时改变方向。当价格突破前高时线条变粗(粗线 = "阳" = 需求),当价格跌破前低时线条变细(细线 = "阴" = 供给)。

class KagiChartGenerator:
    """
    Generates Kagi chart lines based on price reversals.

    Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
    of each move and changes line thickness at breakout points.

    Useful for identifying support/resistance breaks and
    supply/demand shifts without time noise.
    """

    def __init__(self, reversal_amount: float = 10.0):
        self.reversal_amount = reversal_amount
        self.lines: list[dict] = []
        self.current_direction: int = 0  # 1=up, -1=down
        self.current_price: float | None = None
        self.extreme_price: float | None = None
        self.prev_high: float | None = None
        self.prev_low: float | None = None
        self.line_type: str = 'yang'  # 'yang' (thick) or 'yin' (thin)

    def on_price(self, timestamp: int, price: float):
        if self.current_price is None:
            self.current_price = price
            self.extreme_price = price
            return None

        if self.current_direction == 0:
            if price - self.current_price >= self.reversal_amount:
                self.current_direction = 1
                self.extreme_price = price
            elif self.current_price - price >= self.reversal_amount:
                self.current_direction = -1
                self.extreme_price = price
            return None

        if self.current_direction == 1:
            if price > self.extreme_price:
                self.extreme_price = price
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
            elif self.extreme_price - price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'up',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_high = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = -1
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
                return line
        else:
            if price < self.extreme_price:
                self.extreme_price = price
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
            elif price - self.extreme_price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'down',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_low = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = 1
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
                return line

        return None

10. 折线突破图(Line Break)

折线突破图仅在收盘价超过前N根线(通常为3根)的最高或最低价时才绘制新线(方块)。如果价格保持在该范围内,则不绘制新线。

class LineBreakGenerator:
    """
    Generates Line Break bars (Three Line Break by default).

    A new bar is drawn only when the close exceeds the high or low
    of the last N bars. Filters out minor noise by requiring price
    to break through a multi-bar range.

    The 'N' parameter (line_count) controls sensitivity:
    - N=2: more sensitive, more bars, more noise
    - N=3: standard (Three Line Break)
    - N=4+: less sensitive, fewer bars, stronger signals
    """

    def __init__(self, line_count: int = 3):
        self.line_count = line_count
        self.lines: list[dict] = []

    def on_close(self, timestamp: int, close: float) -> dict | None:
        if not self.lines:
            self.lines.append({
                'timestamp': timestamp,
                'open': close,
                'close': close,
                'high': close,
                'low': close,
                'direction': 0,
            })
            return None

        lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines

        highest = max(l['high'] for l in lookback)
        lowest = min(l['low'] for l in lookback)
        last = self.lines[-1]

        new_line = None

        if close > highest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': close,
                'low': last['close'],
                'direction': 1,
            }
        elif close < lowest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': last['close'],
                'low': close,
                'direction': -1,
            }

        if new_line:
            self.lines.append(new_line)
            return new_line

        return None

11. 点数图(Point & Figure)

点数图(P&F)使用 X 列(价格上涨)和 O 列(价格下跌)。列的切换通常需要3个格子大小的反转。这是最古老的噪音过滤和支撑/阻力识别方法之一。

class PointAndFigureGenerator:
    """
    Generates Point & Figure chart data.

    X column: price rising by box_size increments.
    O column: price falling by box_size increments.
    Column switch: requires reversal_boxes * box_size movement
    in the opposite direction.

    Classic setting: box_size based on ATR, reversal_boxes = 3.
    """

    def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
        self.box_size = box_size
        self.reversal_boxes = reversal_boxes
        self.reversal_amount = box_size * reversal_boxes

        self.columns: list[dict] = []
        self.current_direction: int = 0
        self.current_top: float | None = None
        self.current_bottom: float | None = None

    def on_price(self, timestamp: int, price: float):
        if self.current_top is None:
            box_price = self._round_to_box(price)
            self.current_top = box_price
            self.current_bottom = box_price
            self.current_direction = 1
            return None

        events = []

        if self.current_direction == 1:
            while price >= self.current_top + self.box_size:
                self.current_top += self.box_size
                events.append(('X', self.current_top, timestamp))

            if price <= self.current_top - self.reversal_amount:
                col = {
                    'type': 'X',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = -1
                self.current_top = self.current_top - self.box_size
                self.current_bottom = self._round_to_box(price)
                events.append(('new_column', 'O', timestamp))

        else:
            while price <= self.current_bottom - self.box_size:
                self.current_bottom -= self.box_size
                events.append(('O', self.current_bottom, timestamp))

            if price >= self.current_bottom + self.reversal_amount:
                col = {
                    'type': 'O',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = 1
                self.current_bottom = self.current_bottom + self.box_size
                self.current_top = self._round_to_box(price)
                events.append(('new_column', 'X', timestamp))

        return events if events else None

    def _round_to_box(self, price: float) -> float:
        return round(price / self.box_size) * self.box_size

卡吉图、折线突破图和点数图在算法交易中的应用: 主要用于长期趋势检测和支撑/阻力识别。作为过滤层——"当卡吉图处于阴线模式时不发出做多信号"——它们通过将交易与宏观结构对齐来增加价值。

12–14. 信息驱动K线

信息驱动K线 失衡K线、游程K线、CUSUM 过滤器和熵K线:当市场告诉我们某些事情发生了变化时进行采样。

这是最精密的方法,来自 Marcos Lopez de Prado 的《Advances in Financial Machine Learning》(2018)。核心洞察:当新信息到达市场时进行采样,而不是按固定间隔采样。

12. 成交笔数失衡K线(TIB)

如果市场处于均衡状态,买方发起和卖方发起的成交应大致平衡。当失衡超出我们的预期时,说明某些事情发生了变化。在那个时刻采样一根K线。

每笔成交使用 tick 规则分类为买方发起(+1)或卖方发起(-1)。我们跟踪累积失衡 θ,当 |θ| 超过动态阈值时采样。

class TickImbalanceBarGenerator:
    """
    Generates bars when the cumulative tick imbalance exceeds
    expected levels — i.e., when "new information" arrives.

    Based on Lopez de Prado (2018), Chapter 2.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        """Classify trade as buy (+1) or sell (-1) using tick rule."""
        if self.prev_price is None:
            self.prev_price = price
            return 1

        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign

        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.imbalances.append(self.theta / len(self.trades))

        if len(self.bar_lengths) >= 2:
            alpha = 2.0 / (self.ewma_window + 1)
            self.expected_ticks = (
                alpha * self.bar_lengths[-1]
                + (1 - alpha) * self.expected_ticks
            )
            self.expected_ticks = max(
                self.min_ticks,
                min(self.max_ticks, self.expected_ticks)
            )
            self.expected_imbalance = (
                alpha * self.imbalances[-1]
                + (1 - alpha) * self.expected_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

13. 成交量失衡K线(VIB)

TIB 的扩展:不是将每笔成交计为 ±1,而是按带符号的成交量加权。100 BTC 的买入贡献 +100,1 BTC 的卖出贡献 -1。捕获可能被拆分为多笔小额交易的大额知情订单。

class VolumeImbalanceBarGenerator:
    """
    Like TIBs, but uses signed volume instead of signed ticks.

    Captures the insight that a 100-BTC buy signal is 100x more
    informative than a 1-BTC buy signal.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.volume_imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_vol_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign * qty
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= 10:
            return self._close_bar()
        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.volume_imbalances.append(self.theta / len(self.trades))

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = (
                alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            )
            self.expected_vol_imbalance = (
                alpha * self.volume_imbalances[-1]
                + (1 - alpha) * self.expected_vol_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

爆炸问题

失衡K线的一个已知问题:基于 EWMA 的阈值可能进入正反馈循环。解决方案:使用 min_ticksmax_ticks 边界进行钳制。


self.expected_ticks = max(
    self.min_ticks,    # Floor: never less than 100 ticks
    min(
        self.max_ticks,  # Ceiling: never more than 50000 ticks
        new_expected_ticks
    )
)

14. 游程K线(Run Bars)

游程K线跟踪当前方向性游程的长度——最长的连续买入或卖出序列。当大型知情交易者将一笔订单拆分为多笔小额交易时,该序列会变得异常长。游程K线可以检测到这一点。

class TickRunBarGenerator:
    """
    Generates bars when the length of a directional run exceeds expectations.

    Based on Lopez de Prado (2018), Chapter 2.

    Difference from imbalance bars:
    - Imbalance bars track NET imbalance (buys minus sells)
    - Run bars track the MAXIMUM run length (consecutive buys OR sells)
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        self.bar_lengths: list[int] = []
        self.max_runs: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_max_run = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.trades.append((timestamp, price, qty))

        if sign == 1:
            self.buy_run += 1
            self.sell_run = 0
        else:
            self.sell_run += 1
            self.buy_run = 0

        self.max_buy_run = max(self.max_buy_run, self.buy_run)
        self.max_sell_run = max(self.max_sell_run, self.sell_run)

        theta = max(self.max_buy_run, self.max_sell_run)
        threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3

        if theta >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
        self.bar_lengths.append(len(self.trades))
        self.max_runs.append(max_run)

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
            self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run

        self.trades = []
        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        return bar

游程K线可以扩展为成交量游程K线美元游程K线

15. CUSUM 过滤器K线

CUSUM(累积和)过滤器通过跟踪累积收益来确定何时采样。与失衡K线(基于原始成交)不同,CUSUM 可以应用于现有的1分钟 OHLCV 数据——不需要 tick 数据。

class CUSUMFilterBarGenerator:
    """
    Symmetric CUSUM filter for event-based sampling.

    Based on Lopez de Prado (2018), Chapter 2.5.

    Key advantage over Bollinger Bands: CUSUM requires a FULL
    run of threshold magnitude before triggering. Bollinger Bands
    trigger repeatedly when price hovers near the band.

    Can be applied to 1m OHLCV data — no tick data required.
    """

    def __init__(self, threshold: float = 0.01):
        self.threshold = threshold
        self.s_pos = 0.0
        self.s_neg = 0.0
        self.prev_price: float | None = None
        self.buffer: list[OHLCV] = []
        self.bars: list[OHLCV] = []

    def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
        self.buffer.append(candle)

        if self.prev_price is None:
            self.prev_price = candle.close
            return None

        import math
        log_ret = math.log(candle.close / self.prev_price)
        self.prev_price = candle.close

        self.s_pos = max(0.0, self.s_pos + log_ret)
        self.s_neg = min(0.0, self.s_neg + log_ret)

        triggered = False

        if self.s_pos > self.threshold:
            self.s_pos = 0.0
            triggered = True

        if self.s_neg < -self.threshold:
            self.s_neg = 0.0
            triggered = True

        if triggered and len(self.buffer) >= 2:
            bars = self.buffer
            bar = OHLCV(
                timestamp=bars[-1].timestamp,
                open=bars[0].open,
                high=max(b.high for b in bars),
                low=min(b.low for b in bars),
                close=bars[-1].close,
                volume=sum(b.volume for b in bars),
            )
            self.bars.append(bar)
            self.buffer = []
            return bar

        return None

CUSUM + 三重障碍法(Triple Barrier Method): 在 Lopez de Prado 的框架中,CUSUM 事件被用作三重障碍法的入场点——每个事件触发一笔设有止损、止盈和到期障碍的交易。要对此类事件驱动策略进行稳健验证,请参阅 前向优化蒙特卡洛 Bootstrap 回测

16. 熵K线(Entropy Bars)

理论上最优雅的方法:当K线内价格序列的信息含量(香农熵)超过阈值时进行采样。

class EntropyBarGenerator:
    """
    Generates bars when the entropy of intra-bar returns exceeds
    a threshold.

    Based on Shannon's information theory: bars are sampled when
    "new information" arrives, measured as the entropy of the
    return distribution within the current bar.

    This is the most theoretically "pure" information-driven bar.
    """

    def __init__(
        self,
        entropy_threshold: float = 2.0,
        min_trades: int = 50,
        n_bins: int = 10,
    ):
        self.entropy_threshold = entropy_threshold
        self.min_trades = min_trades
        self.n_bins = n_bins
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))

        if len(self.trades) < self.min_trades:
            return None

        entropy = self._compute_entropy()

        if entropy >= self.entropy_threshold:
            return self._close_bar()

        return None

    def _compute_entropy(self) -> float:
        import math

        prices = [t[1] for t in self.trades]
        if len(prices) < 2:
            return 0.0

        returns = [
            math.log(prices[i] / prices[i-1])
            for i in range(1, len(prices))
            if prices[i-1] > 0
        ]

        if not returns:
            return 0.0

        min_r = min(returns)
        max_r = max(returns)

        if max_r == min_r:
            return 0.0

        bin_width = (max_r - min_r) / self.n_bins
        bins = [0] * self.n_bins

        for r in returns:
            idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
            bins[idx] += 1

        total = sum(bins)
        entropy = 0.0
        for count in bins:
            if count > 0:
                p = count / total
                entropy -= p * math.log2(p)

        return entropy

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

实践说明: 熵K线计算成本较高,主要用于研究——但对于基于机器学习的策略,它们能产生统计性质更好的特征,因为每根K线包含大致相等的"信息量"。

17. Delta K线(订单流)

Delta K线和订单流 累积 Delta:实时衡量主动买方与卖方的净力量。

Delta K线基于累积 Delta——买入成交量与卖出成交量之间的运行差值——进行采样。与失衡K线(使用 tick 符号 ±1)不同,Delta K线使用实际的成交量加权订单流。

class DeltaBarGenerator:
    """
    Generates bars based on cumulative order flow delta.

    Delta = Buy Volume - Sell Volume (classified by aggressor side).

    Requires trade-level data with side classification
    (available from Binance aggTrades, Bybit trades, etc.)
    """

    def __init__(self, threshold: float = 500.0):
        self.threshold = threshold
        self.cumulative_delta = 0.0
        self.trades: list[tuple[int, float, float, int]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
        side = -1 if is_buyer_maker else 1
        signed_qty = side * qty

        self.cumulative_delta += signed_qty
        self.trades.append((timestamp, price, qty, side))

        if abs(self.cumulative_delta) >= self.threshold:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        bar.delta = self.cumulative_delta  # type: ignore
        bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1)  # type: ignore
        bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1)  # type: ignore

        self.bars.append(bar)
        self.cumulative_delta = 0.0
        self.trades = []
        return bar

Delta 背离: 最强大的信号之一——价格上涨而累积 Delta 为负(卖方积极但价格仍在上涨,表明限价买单在吸收抛压)。这与 数字指纹:交易者识别 文章中描述的行为指纹方法直接相关。对于使用 Avellaneda-Stoikov 模型 的做市商,Delta K线提供了库存风险和主动方压力的实时视图。


滚动窗口聚合 基础K线的循环缓冲区:新数据进入,旧数据退出,聚合K线始终有效。

聚合方法决定了基础K线如何组合成更高时间框架(HTF)的K线。它们独立于K线类型——你可以将任何聚合方法应用于任何基础K线类型。

方法 A:日历对齐聚合

聚合落在固定日历边界内的所有基础K线。"1小时"K线涵盖14:00:00到14:59:59之间的所有K线。

特性:

  • 所有市场参与者看到相同的边界——对市场结构分析、支撑/阻力、PIQ 触发至关重要
  • 冷启动问题:重启后的不完整K线
  • 对时间K线来说很自然(这是交易所原生提供的)
  • 也适用于非时间K线:"14:00到15:00之间关闭的所有成交量K线" = 成交量K线的日历对齐小时K线

方法 B:滚动窗口聚合

聚合最近N根已关闭的基础K线,每当有新K线时重新计算。"1小时"滚动K线 = 最近60根已关闭的1分钟时间K线,每分钟更新一次。

原子单位是已关闭的基础K线。 这一设计选择带来:

  1. 无冷启动。 N根K线之后,聚合K线即为有效。无不完整K线噪音。
  2. 回测一致性。 如果实盘交易使用与回测引擎相同的原子单位,信号是一致的。
  3. 简单验证。 一条规则:if buffer not full: skip
import numpy as np

class RollingCandleAggregator:
    """
    Produces rolling higher-timeframe candles from closed base bars.

    Works with ANY bar type: time bars, tick bars, volume bars,
    dollar bars, delta bars — anything that produces OHLCV output.

    Example: RollingCandleAggregator(window=60) with 1m time bars
    produces a "1h" candle updated every minute.

    Example: RollingCandleAggregator(window=24) with volume bars
    produces a candle spanning the last 24 volume bars.
    """

    def __init__(self, window: int):
        self.window = window
        self.buffer: deque[OHLCV] = deque(maxlen=window)

    def push(self, bar: OHLCV) -> OHLCV | None:
        """
        Add a closed base bar. Returns aggregated candle
        only when buffer is full (= candle is valid).
        """
        self.buffer.append(bar)

        if len(self.buffer) < self.window:
            return None

        return self._aggregate()

    def _aggregate(self) -> OHLCV:
        bars = list(self.buffer)
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

    @property
    def is_valid(self) -> bool:
        return len(self.buffer) == self.window

相位偏移的权衡: 如果你在 :37 开始运行,滚动K线在 :37 关闭,而不是像其他人的K线那样在 :00 关闭。这对于依赖群体可见水平的策略很重要。解决方案:两者并用——日历用于市场结构,滚动用于信号。

方法 C:自适应滚动聚合

与滚动类似,但窗口大小根据当前波动率自适应调整。平静市场 → 更宽的窗口(更多平滑)。波动市场 → 更窄的窗口(更快反应)。

class AdaptiveRollingAggregator:
    """
    Rolling window where the window size adapts to volatility.

    Works with any base bar type. Uses ATR of recent bars
    as the volatility measure.

    Low volatility → wider window (more smoothing, fewer signals)
    High volatility → narrower window (faster reaction)
    """

    def __init__(
        self,
        base_window: int = 60,
        min_window: int = 15,
        max_window: int = 240,
        atr_period: int = 14,
        atr_base: float | None = None,
    ):
        self.base_window = base_window
        self.min_window = min_window
        self.max_window = max_window
        self.atr_period = atr_period
        self.atr_base = atr_base

        self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
        self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
        self.current_window = base_window

    def push(self, bar: OHLCV) -> OHLCV | None:
        self.all_candles.append(bar)

        tr = bar.high - bar.low
        self.atr_values.append(tr)

        if len(self.atr_values) < self.atr_period:
            return None

        current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period

        if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
            self.atr_base = sum(self.atr_values) / len(self.atr_values)

        if self.atr_base is None or self.atr_base == 0:
            return None

        vol_ratio = current_atr / self.atr_base
        self.current_window = int(self.base_window / vol_ratio)
        self.current_window = max(self.min_window, min(self.max_window, self.current_window))

        if len(self.all_candles) < self.current_window:
            return None

        bars = list(self.all_candles)[-self.current_window:]
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

每种基础K线类型都可以与每种聚合方法组合。有些组合是标准的(日历时间K线 = 交易所提供的默认数据),有些则新颖但强大。

组合示例

基础K线类型 日历 滚动 自适应
时间 标准交易所K线 始终有效的HTF,无冷启动 波动率自适应时间框架
成交量 "本小时所有成交量K线" 最近24根成交量K线 平静市场时更宽的窗口
美元 每小时美元K线聚合 最近N根美元K线 自适应美元窗口
成交笔数失衡 每小时失衡聚合 最近N个失衡事件 波动行情中快速反应
Delta 每小时净订单流 滚动 Delta 快照 自适应流量窗口
Renko "本小时的砖块" 最近N块砖 自适应砖块数量

混合引擎:日历 + 滚动

在实践中,你需要同时维护日历和滚动聚合。内存开销可忽略不计——每个交易对每个时间框架只需两个 deque 缓冲区。

class HybridCandleEngine:
    """
    Maintains both calendar-aligned and rolling candles
    for any base bar type.

    Calendar candles: for market structure, support/resistance, PIQ.
    Rolling candles: for indicators, signal generation, entries/exits.
    """

    def __init__(self):
        self.rolling = {
            '1h': RollingCandleAggregator(60),
            '4h': RollingCandleAggregator(240),
        }
        self.calendar: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }
        self._calendar_buffer: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }

    def on_bar(self, bar: OHLCV):
        """Process any base bar type — time, volume, tick, delta, etc."""
        rolling_results = {}
        for tf, agg in self.rolling.items():
            rolling_results[tf] = agg.push(bar)

        self._update_calendar(bar)

        return rolling_results

    def _update_calendar(self, bar: OHLCV):
        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)

        for tf, minutes in [('1h', 60), ('4h', 240)]:
            self._calendar_buffer[tf].append(bar)

            total_minutes = ts.hour * 60 + ts.minute
            if (total_minutes + 1) % minutes == 0:
                bars = self._calendar_buffer[tf]
                if bars:
                    agg = OHLCV(
                        timestamp=bars[-1].timestamp,
                        open=bars[0].open,
                        high=max(b.high for b in bars),
                        low=min(b.low for b in bars),
                        close=bars[-1].close,
                        volume=sum(b.volume for b in bars),
                    )
                    self.calendar[tf].append(agg)
                    self._calendar_buffer[tf] = []

时间-成交量混合:日历 + 成交量拆分

一种特殊的聚合变体:日历对齐的K线在成交量超过阈值时强制提前关闭。在适应活跃度高峰的同时保持时间同步。

class TimeVolumeHybridGenerator:
    """
    Calendar-aligned candles that split when volume spikes.

    Rule: close the candle at the calendar boundary OR when
    accumulated volume exceeds vol_threshold, whichever comes first.

    Works with any base bar type — the volume trigger adds an
    extra split dimension on top of calendar alignment.
    """

    def __init__(
        self,
        interval_minutes: int = 60,
        vol_threshold: float = 5000.0,
    ):
        self.interval_minutes = interval_minutes
        self.vol_threshold = vol_threshold

        self.buffer: list[OHLCV] = []
        self.accumulated_volume = 0.0
        self.bars: list[OHLCV] = []

    def on_bar(self, bar: OHLCV) -> OHLCV | None:
        self.buffer.append(bar)
        self.accumulated_volume += bar.volume

        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)
        total_minutes = ts.hour * 60 + ts.minute
        at_boundary = (total_minutes + 1) % self.interval_minutes == 0

        vol_spike = self.accumulated_volume >= self.vol_threshold

        if at_boundary or vol_spike:
            return self._close_bar(split_reason='volume' if vol_spike else 'time')

        return None

    def _close_bar(self, split_reason: str) -> OHLCV:
        bars = self.buffer
        bar = OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )
        bar.split_reason = split_reason  # type: ignore
        bar.num_bars = len(bars)  # type: ignore

        self.bars.append(bar)
        self.buffer = []
        self.accumulated_volume = 0.0
        return bar

实用聚合:级联预加载

级联聚合 级联预加载:从小时K线组合日K线,从分钟K线组合小时K线——绕过API限制。

交易所限制提供的历史数据量。Binance 每次 REST 请求约给1000根K线,OKX 上限为300根。如果你需要滚动1日K线(1440分钟),你不一定能获取足够的1分钟历史数据。关于通过 WebSocket 实时流式获取成交和订单簿数据,请参阅 CCXT Pro WebSocket 方法

解决方案:级联聚合——从每个深度可用的最高分辨率构建更高时间框架,然后将它们拼接在一起。

Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│   ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│   └── 1 partial hour:
│       └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain

这之所以可行,是因为 OHLCV 聚合是可组合的:1日K线的最高价是24根1小时最高价的最大值,也是1440根1分钟最高价的最大值。

多交易所限制

交易所 最大1分钟K线数 最大1小时K线数 特殊时间间隔
Binance 1,000 1,000 1分钟–1月,完整范围
Bybit 1,000 1,000 1–720,日/周/月
OKX 300 300 1分钟–1月(限制更严格)
Gate.io 1,000 1,000 10秒–30天

聚合一致性校验

REST API 返回的1小时K线可能与你从60根1分钟K线计算得到的不匹配。务必验证:

def validate_aggregation(
    candle_htf: OHLCV,
    candles_ltf: list[OHLCV],
    tolerance_pct: float = 0.001,
) -> dict[str, bool]:
    agg = OHLCV(
        timestamp=candles_ltf[-1].timestamp,
        open=candles_ltf[0].open,
        high=max(c.high for c in candles_ltf),
        low=min(c.low for c in candles_ltf),
        close=candles_ltf[-1].close,
        volume=sum(c.volume for c in candles_ltf),
    )

    def close_enough(a: float, b: float) -> bool:
        if a == 0 and b == 0:
            return True
        return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct

    return {
        'open': close_enough(candle_htf.open, agg.open),
        'high': close_enough(candle_htf.high, agg.high),
        'low': close_enough(candle_htf.low, agg.low),
        'close': close_enough(candle_htf.close, agg.close),
        'volume': close_enough(candle_htf.volume, agg.volume),
    }

如果验证持续失败,务必自行从1分钟K线聚合——永远不要信任交易所的高时间框架K线来保证回测一致性


对比矩阵

轴1:基础K线类型

# K线类型 触发条件 需要 Tick 数据 最适用于
1 时间 固定间隔 市场结构、群体行为
2 Tick N笔成交 机器学习特征、等信息采样
3 成交量 N个单位成交 标准化活跃度分析
4 美元 $N 名义价值 跨资产比较
5 Renko 价格 ± N 个单位 趋势跟踪、噪音过滤
6 范围 最高-最低 ≥ N 突破检测
7 波动率 自适应范围 状态自适应分析
8 Heikin-Ashi 变换 趋势确认(合成价格!)
9 Kagi 价格反转 供需结构
10 Line Break N线突破 宏观趋势过滤
11 Point & Figure 格子 + 反转 支撑/阻力映射
12 TIB 成交笔数失衡 知情流量检测
13 VIB 成交量失衡 大单检测
14 Run 游程长度 订单拆分检测
15 CUSUM 累积收益 否(1分钟收盘价) 结构性突变事件
16 Entropy 香农熵 机器学习研究、特征纯度
17 Delta 订单流 Delta 是(aggTrades) 主动方流量分析

轴2:聚合方法

方法 对齐方式 冷启动 相位偏移 最适用于
日历 挂钟时间 不完整K线风险 无(与群体对齐) 市场结构、PIQ、支撑/阻力
滚动 N根K线 无(预热后) 有(相对 :00 偏移) 指标、信号
自适应 波动率驱动的N ATR 校准后 波动率自适应策略

实践建议

分层架构 四层K线架构:滚动信号、日历结构、微观结构流量和趋势过滤。

如果你的回测引擎基于1分钟 OHLCV 数据运行:

  1. 滚动时间K线 —— 最简单的升级。无需额外数据。消除冷启动问题。
  2. 混合(滚动 + 日历)时间K线 —— 日历用于市场结构,滚动用于信号。
  3. CUSUM 过滤器 —— 基于1分钟收盘价,无需 tick 数据。"价格移动了足够多,值得关注。"

如果你有 tick/成交数据:

  1. 美元K线 + 滚动 —— 量化金融文献推荐的默认选择。
  2. 成交量失衡K线 + 滚动 —— 检测知情流量,在重要事件期间采样更密集。
  3. Delta K线 + 日历 —— 如果你有主动方分类,这是观察谁在推动市场的最直接视角。

作为过滤器(在任何基础+聚合组合之上应用 Heikin-Ashi 或 Line Break):

  1. Heikin-Ashi 叠加滚动成交量K线 —— 在活跃度标准化数据上获得清晰的趋势信号。
  2. Line Break / Kagi 叠加日级日历K线 —— 宏观趋势过滤器。

针对 Marketmaker.cc 的分层方案:

  • 第1层(信号): 时间K线的滚动聚合,用于指标和入场/出场信号。无冷启动,完美的回测一致性。
  • 第2层(市场结构): 日历对齐的时间K线,用于支撑/阻力、小时收盘价分析和 PIQ 触发
  • 第3层(微观结构): 来自原始成交流的成交量失衡K线 + Delta K线,用于检测知情流量、订单拆分以及预判大行情。另请参阅 数字指纹:交易者识别 了解订单流数据上的行为模式识别。
  • 第4层(趋势过滤): 在滚动K线上应用 Heikin-Ashi 变换,或在4小时日历收盘价上应用 Line Break,以保持信号与宏观方向一致。

结论

K线构建不是单一的选择——而是两个独立的决策:

  1. 什么类型的K线? 时间K线捕获时钟间隔。活跃度K线(tick、成交量、美元)捕获市场参与度。价格K线(Renko、范围、波动率)捕获价格运动。信息K线(失衡、游程、CUSUM、熵)捕获新信息的到达。订单流K线(Delta)捕获主动方压力。

  2. 如何聚合为更高时间框架? 日历对齐与群体同步。滚动消除冷启动。自适应响应波动率变化。

标准的"来自 Binance 的1小时K线"只是17×3矩阵中的一个单元格。其余50种组合对任何愿意实现它们的人开放。对于生产系统,答案是"为决策引擎的每一层选择合适的组合。"

原子单位——已关闭的基础K线——始终是基础。其他一切都是聚合。

关于使用细粒度数据提高回测精度的更多内容,请参阅 自适应下钻:可变粒度回测。关于指标预计算对多时间框架策略的影响,请参阅 聚合 Parquet 缓存


实用链接

  1. Lopez de Prado — Advances in Financial Machine Learning (2018)
  2. Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
  3. mlfinlab — 实现信息驱动K线的 Python 库
  4. Binance — 历史市场数据
  5. Apache Parquet — 列式存储格式

Citation

@article{soloviov2026bartypes,
  author = {Soloviov, Eugen},
  title = {17 × 3: Bar Types and Aggregation Methods for Algorithmic Trading},
  year = {2026},
  url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
  description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}
免责声明:本文提供的信息仅用于教育和参考目的,不构成财务、投资或交易建议。加密货币交易涉及重大损失风险。

MarketMaker.cc Team

量化研究与策略

在 Telegram 中讨论
Newsletter

紧跟市场步伐

订阅我们的时事通讯,获取独家 AI 交易见解、市场分析和平台更新。

我们尊重您的隐私。您可以随时退订。