← 記事一覧に戻る
March 22, 2026
読了時間: 5分

アルゴリズミックトレーディングのためのバー種類と集約方法

アルゴリズミックトレーディングのためのバー種類と集約方法
#algotrading
#candles
#market microstructure
#Lopez de Prado
#order flow
#backtesting
#research

Binance、TradingView、その他あらゆる取引所UIで目にしたことのあるローソク足チャートはすべて同じ方法で構築されています:固定された時間窓(1分、5分、1時間)内の取引を集約してOHLCVバーを生成します。これは非常に一般的であるため、ほとんどのトレーダーがそれを疑問に思うことはありません。しかしアルゴリズミックトレーディングにおいては、バーの種類と集約方法は2つの独立した意思決定であり、大部分のシステムはこれらを混同しています。

本記事ではローソク足構成の2つの軸を分離します:どのようなバーを構築するか(17種類)と、どのようにそれらをより高い時間足に集約するか(3つの方法)。この組み合わせにより51通りの構成が可能になり、それぞれがバックテスト、ライブトレーディング、シグナル生成において異なる特性を持ちます。

生の取引データから標準的なローソク足がどのように生成されるかの入門については、Trading Candles Demystifiedをご覧ください。


TL;DR

  • ローソク足構成には2つの独立した軸がある:バーの種類と集約方法
  • 17種類の基本バー:時間、ティック、出来高、ドル、Renko、レンジ、ボラティリティ、Heikin-Ashi、カギ足、新値三本足、P&F、TIB(ティックインバランス)、VIB(出来高インバランス)、ラン、CUSUM、エントロピー、Delta
  • 3つの集約方法:カレンダー整列、ローリングウィンドウ、アダプティブローリング
  • 17 × 3 = 51通りの組み合わせ、それぞれ異なる特性を持つ
  • ほとんどのシステムは1つの組み合わせのみを使用:カレンダー整列の時間バー。残りの50は未活用。
  • 実践的推奨:レイヤーで複数の組み合わせを使用する — シグナルにはローリング時間バー、市場構造にはカレンダー時間バー、ミクロ構造には情報駆動型バー

ローソク足構成の2つの軸

従来の見方では、すべてのバー種類をフラットなリストに並べます:時間バー、ティックバー、出来高バーなど。これは誤解を招きます。実際には2つの直交する選択肢があります:

軸1 — 基本バーの種類(17種類): 新しいバーがいつ閉じるかをどう決定するか?固定時間間隔後?N回の取引後?価格変動後?情報量が変化したとき?これが「1つのバー」の意味を決定します。

軸2 — 集約方法(3つの方法): 基本バーをどのようにより高い時間足のローソク足にまとめるか?カレンダー境界に合わせる(00:00、01:00、…)?最後のN本のバーのローリングウィンドウを使用する?ボラティリティに応じてウィンドウサイズを適応させる?

この2つの軸は独立しています。以下が可能です:

  • カレンダー整列のティックバー — 14:00から14:59の間に閉じたティックバーを1本の1時間足ローソクに集約
  • ローリング出来高バー — いつ閉じたかに関係なく、最後の24本の出来高バーを取得
  • アダプティブDeltaバー — Deltaバーに対してボラティリティ駆動のウィンドウを使用

標準的な「1時間足ローソク」は、この17×3マトリクスの1点に過ぎません:時間バー+カレンダー整列。他のすべての組み合わせは検討に値する代替案です。


1. 時間バー(標準)

カレンダー時間バーの問題 不均一な情報密度:固定的な時間境界は200取引の静かな時間も50,000取引のアナウンス時間も同じように扱う。

デフォルト。固定時間間隔ごとに新しいバーが形成されます:1分、5分、1時間。すべての取引所がこれらをネイティブに提供しています。

特性:

  • アジアセッション(00:00〜08:00 UTC)では、1時間足ローソクに200件の取引しか含まれないかもしれません。Binanceの上場アナウンス中は、同じ時間窓に50,000件の取引が含まれる可能性があります。時間バーは両方を同等に扱います。このようなアクティビティスパイクの検出はボット保護にとって重要です — Anomaly Detection for Trading Botsを参照。
  • すべての市場参加者が同じローソク足の境界を見る — シェリングポイント。これにより時間バーは群集行動の分析に不可欠です。
  • 再起動後の不完全なローソクで計算されたインジケーターは無効な値を生成します。
from datetime import datetime

def time_until_valid_hourly_candle():
    """How long until the first complete hourly candle after restart."""
    now = datetime.utcnow()
    minutes_into_hour = now.minute
    seconds_into_minute = now.second

    wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
    wait_seconds += 3600

    return wait_seconds

2〜4. アクティビティベースのバー

アクティビティベースのバー ティック、出来高、ドルバー:時計ではなく市場参加によってバー境界を決定する3つの方法。

固定時間間隔ではなく、固定量の市場アクティビティ後にサンプリングします。これにより、時間帯に関係なくほぼ等しい「情報量」を持つバーが生成されます。

2. ティックバー

N回の取引(ティック)ごとに新しいバーが形成されます。高いアクティビティ中はバーが急速に形成されます。静かな期間中は、1本のバーが数時間にわたることがあります。

from collections import deque
from dataclasses import dataclass

@dataclass
class OHLCV:
    timestamp: int
    open: float
    high: float
    low: float
    close: float
    volume: float

class TickBarGenerator:
    """
    Generates a new bar every `threshold` trades.
    Each bar contains equal number of market "opinions".
    """

    def __init__(self, threshold: int = 1000):
        self.threshold = threshold
        self.trades: list[tuple[float, float]] = []  # (price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((price, qty))

        if len(self.trades) >= self.threshold:
            self._close_bar(timestamp)

    def _close_bar(self, timestamp: int):
        prices = [t[0] for t in self.trades]
        volumes = [t[1] for t in self.trades]

        bar = OHLCV(
            timestamp=timestamp,
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

利点: 市場のアクティビティに自然に適応します。ティックバーからのリターンは時間バーのリターンよりも正規分布に近い傾向があり、多くの統計モデルのパフォーマンスを向上させる特性です。

欠点: 生の取引ストリームが必要(すべてのデータプロバイダーから過去データとして利用できるわけではありません)。バーのタイミングは予測不可能 — 「次のバーはXで閉じる」とは言えません。

3. 出来高バー

N単位の約定(暗号資産の場合はコイン数)後に新しいバーが形成されます。ティックバーに似ていますが、取引サイズで重み付けされます — 1回の100 BTC取引は1 BTC取引の100倍貢献します。

class VolumeBarGenerator:
    """
    Generates a new bar every `threshold` units of volume.
    Normalizes for trade size: one large order ≠ one small order.
    """

    def __init__(self, threshold: float = 100.0):
        self.threshold = threshold
        self.accumulated_volume = 0.0
        self.trades: list[tuple[int, float, float]] = []  # (ts, price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_volume += qty

        if self.accumulated_volume >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_volume = 0.0
        self.trades = []
        return bar

4. ドルバー

固定の想定元本(USD/USDT建て)が取引された後に新しいバーが形成されます。アクティビティベースのバーの中で最もロバストで、取引回数と価格水準の両方を正規化します。

考えてみてください:ETHが1,000から1,000から4,000に上昇した場合、10,000相当のETHを売却するには10,000相当のETHを売却するには4,000では2.5 ETH必要ですが$1,000では10 ETH必要です。出来高バーはこれらを異なるものとして扱いますが、ドルバーは同じものとして扱います。

class DollarBarGenerator:
    """
    Generates a new bar every `threshold` dollars (USDT) of notional volume.
    Most robust normalization: independent of price level.

    Lopez de Prado (2018) recommends dollar bars as the default
    for most quantitative applications.
    """

    def __init__(self, threshold: float = 1_000_000.0):
        self.threshold = threshold
        self.accumulated_dollars = 0.0
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_dollars += price * qty

        if self.accumulated_dollars >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_dollars = 0.0
        self.trades = []
        return bar

閾値の選択

アクティビティベースのバーの閾値は、置き換える時間バーと同程度の1日あたりのバー数を生成するように設定すべきです。BinanceのBTCUSDTの場合:

バーの種類 一般的な閾値 〜バー/日 相当する時間足
ティック 1,000取引 〜1,400 〜1分
ティック 50,000取引 〜28 〜1時間
出来高 100 BTC 〜600 〜2-3分
出来高 2,400 BTC 〜25 〜1時間
ドル $1M 〜1,400 〜1分
ドル $50M 〜28 〜1時間

これらの数値は概算であり、市場のレジームによって大きく変動します。ラリーやクラッシュ時には、アクティビティベースのバーは通常の5〜10倍のバーを生成します — まさにそれが目的です。

5〜7. 価格ベースのバー

価格ベースのバー Renkoブリック、レンジバー、ボラティリティバー:価格が意味のある動きをしたときにのみサンプリング。

価格ベースのバーは時間とアクティビティの両方を無視します。価格が指定された量だけ動いたときにのみ新しいバーが形成されます。これにより横ばいのノイズが自然にフィルタリングされ、トレンドが強調されます。

5. Renkoバー

前のブリックの終値から少なくともN単位価格が動いたときに新しいRenko「ブリック」が形成されます。ブリックは常に同じサイズで、トレンド方向のクリーンな視覚的表現を作成します。

class RenkoBarGenerator:
    """
    Generates Renko bricks based on price movement.

    Key property: during sideways movement, no new bricks form.
    During strong trends, bricks form rapidly.
    """

    def __init__(self, brick_size: float = 10.0):
        self.brick_size = brick_size
        self.bricks: list[dict] = []
        self.last_close: float | None = None

    def on_price(self, timestamp: int, price: float, volume: float = 0.0):
        if self.last_close is None:
            self.last_close = price
            return []

        new_bricks = []
        diff = price - self.last_close
        num_bricks = int(abs(diff) / self.brick_size)

        if num_bricks == 0:
            return []

        direction = 1 if diff > 0 else -1

        for i in range(num_bricks):
            brick_open = self.last_close
            brick_close = self.last_close + direction * self.brick_size

            brick = {
                'timestamp': timestamp,
                'open': brick_open,
                'high': max(brick_open, brick_close),
                'low': min(brick_open, brick_close),
                'close': brick_close,
                'volume': volume / num_bricks if num_bricks > 0 else 0,
                'direction': direction,
            }
            new_bricks.append(brick)
            self.last_close = brick_close

        self.bricks.extend(new_bricks)
        return new_bricks

ダイナミックRenkoは固定のブリックサイズの代わりにATR(Average True Range)を使用し、ボラティリティに自動的に適応します。

6. レンジバー

各バーは固定された高値-安値のレンジを持ちます。レンジを超えるとバーが閉じ、新しいバーが始まります。Renkoとは異なり、レンジバーにはヒゲがあり、バー内のボラティリティを表示できます。

class RangeBarGenerator:
    """
    Generates bars with a fixed high-low range.

    Difference from Renko: range bars show the full OHLC within
    the range, not just brick direction. More information-rich.
    """

    def __init__(self, range_size: float = 20.0):
        self.range_size = range_size
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_open: float | None = None
        self.current_volume: float = 0.0
        self.current_start_ts: int = 0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_start_ts = timestamp

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        if self.current_high - self.current_low >= self.range_size:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            self.current_start_ts = timestamp

            return bar

        return None

RenkoとレンジバーのKey Difference: Renkoは終値のみを追跡して方向を表示し、レンジバーは完全な価格レンジを追跡してバー内の構造を表示します。レンジバーはストップロスやテイクプロフィットのシミュレーションに必要な高値-安値情報を保持するため、一般的にアルゴリズミックトレーディングにはより有用です。

7. ボラティリティバー

バー内のボラティリティが動的な閾値(例えば、最近のATRの倍数)に達したときに新しいバーが形成されます。レンジバー(固定閾値)とは異なり、ボラティリティバーは市場状況に適応します。

class VolatilityBarGenerator:
    """
    Generates bars when intra-bar volatility reaches a threshold.

    Similar to range bars, but the threshold adapts to market conditions
    using a rolling ATR measure. In calm markets, bars need less
    absolute movement to close; in volatile markets, more.
    """

    def __init__(
        self,
        atr_period: int = 14,
        atr_multiplier: float = 1.0,
        initial_threshold: float = 20.0,
    ):
        self.atr_period = atr_period
        self.atr_multiplier = atr_multiplier
        self.threshold = initial_threshold

        self.recent_ranges: list[float] = []
        self.current_open: float | None = None
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_volume: float = 0.0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        intra_bar_range = self.current_high - self.current_low

        if intra_bar_range >= self.threshold:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.recent_ranges.append(intra_bar_range)
            if len(self.recent_ranges) > self.atr_period:
                self.recent_ranges = self.recent_ranges[-self.atr_period:]
            if len(self.recent_ranges) >= self.atr_period:
                avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
                self.threshold = avg_range * self.atr_multiplier

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            return bar

        return None

8. Heikin-Ashi(平滑化変換)

Heikin-Ashi変換 Heikin-Ashi:平均化によりノイズの多いローソクを滑らかなトレンドシグナルに変換 — ただし正確な価格情報と引き換えに。

Heikin-Ashi(日本語で「平均足」)はバーの種類ではなく、どの基本バーの種類にも適用できる変換です。現在と前のバーの値を平均化してローソクを平滑化します:

  • HA終値 = (始値 + 高値 + 安値 + 終値) / 4
  • HA始値 = (前のHA始値 + 前のHA終値) / 2
  • HA高値 = max(高値, HA始値, HA終値)
  • HA安値 = min(安値, HA始値, HA終値)

トレンドは同色のローソクの連続として表示され、上昇トレンドでは下ヒゲなし、下降トレンドでは上ヒゲなしとなります。

class HeikinAshiTransformer:
    """
    Transforms standard OHLCV candles into Heikin-Ashi candles.

    Can be applied on top of ANY bar type: time bars, volume bars,
    rolling bars, etc. It's a transformation, not a sampling method.

    WARNING: HA prices are synthetic — they don't represent real
    traded prices. Never use HA close for order placement or
    PnL calculation. Use HA only for signal generation, then
    execute at real prices.
    """

    def __init__(self):
        self.prev_ha_open: float | None = None
        self.prev_ha_close: float | None = None

    def transform(self, candle: OHLCV) -> OHLCV:
        ha_close = (candle.open + candle.high + candle.low + candle.close) / 4

        if self.prev_ha_open is None:
            ha_open = (candle.open + candle.close) / 2
        else:
            ha_open = (self.prev_ha_open + self.prev_ha_close) / 2

        ha_high = max(candle.high, ha_open, ha_close)
        ha_low = min(candle.low, ha_open, ha_close)

        self.prev_ha_open = ha_open
        self.prev_ha_close = ha_close

        return OHLCV(
            timestamp=candle.timestamp,
            open=ha_open,
            high=ha_high,
            low=ha_low,
            close=ha_close,
            volume=candle.volume,
        )

    def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
        """Transform an entire series. Resets state first."""
        self.prev_ha_open = None
        self.prev_ha_close = None
        return [self.transform(c) for c in candles]


def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
    """
    Simple HA trend signal.

    Returns:
        +1: bullish (N consecutive green HA candles with no lower wick)
        -1: bearish (N consecutive red HA candles with no upper wick)
         0: no clear trend
    """
    if len(ha_candles) < lookback:
        return 0

    recent = ha_candles[-lookback:]

    all_bullish = all(
        c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
        for c in recent
    )

    all_bearish = all(
        c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
        for c in recent
    )

    if all_bullish:
        return 1
    elif all_bearish:
        return -1
    return 0

バックテストにおける重要な注意点: Heikin-Ashiの価格は合成です。バックテストがHA終値をエントリー価格として使用している場合、結果は誤りになります。常にHAはシグナル生成のみに使用し、実際のOHLC価格で約定してください。

HAが有用な場合: クリーンな「ポジション保持」シグナルが必要なトレンドフォロー戦略。任意の基本バーの種類(時間バー、出来高バー、ドルバー)にHAを適用して、誤ったクロスオーバーをフィルタリングします。

HAが有害な場合: 正確な価格水準が必要な戦略 — サポート/レジスタンス、板情報分析、PIQ(Position In Queue)。平均化により正確な価格情報が破壊されます。

9〜11. 日本の反転チャート

日本のチャート手法 カギ足、新値三本足、P&F:価格構造のみに焦点を当てた時間フリーのチャート手法。

これらはRenkoと並ぶ伝統的な日本のチャート手法で、時間を完全に排除し価格構造に焦点を当てます。

9. カギ足

カギ足は、価格が指定された量だけ反転したときに方向が変わる垂直線で構成されます。価格が前の高値を上抜けると線が太くなり(太線=「陽」=需要)、前の安値を下抜けると細くなります(細線=「陰」=供給)。

class KagiChartGenerator:
    """
    Generates Kagi chart lines based on price reversals.

    Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
    of each move and changes line thickness at breakout points.

    Useful for identifying support/resistance breaks and
    supply/demand shifts without time noise.
    """

    def __init__(self, reversal_amount: float = 10.0):
        self.reversal_amount = reversal_amount
        self.lines: list[dict] = []
        self.current_direction: int = 0  # 1=up, -1=down
        self.current_price: float | None = None
        self.extreme_price: float | None = None
        self.prev_high: float | None = None
        self.prev_low: float | None = None
        self.line_type: str = 'yang'  # 'yang' (thick) or 'yin' (thin)

    def on_price(self, timestamp: int, price: float):
        if self.current_price is None:
            self.current_price = price
            self.extreme_price = price
            return None

        if self.current_direction == 0:
            if price - self.current_price >= self.reversal_amount:
                self.current_direction = 1
                self.extreme_price = price
            elif self.current_price - price >= self.reversal_amount:
                self.current_direction = -1
                self.extreme_price = price
            return None

        if self.current_direction == 1:
            if price > self.extreme_price:
                self.extreme_price = price
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
            elif self.extreme_price - price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'up',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_high = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = -1
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
                return line
        else:
            if price < self.extreme_price:
                self.extreme_price = price
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
            elif price - self.extreme_price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'down',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_low = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = 1
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
                return line

        return None

10. 新値三本足

新値三本足チャートは、終値が前のN本の線の高値または安値を超えたときにのみ新しい線(ボックス)を描画します(通常N=3)。価格がレンジ内に留まる場合、新しい線は描画されません。

class LineBreakGenerator:
    """
    Generates Line Break bars (Three Line Break by default).

    A new bar is drawn only when the close exceeds the high or low
    of the last N bars. Filters out minor noise by requiring price
    to break through a multi-bar range.

    The 'N' parameter (line_count) controls sensitivity:
    - N=2: more sensitive, more bars, more noise
    - N=3: standard (Three Line Break)
    - N=4+: less sensitive, fewer bars, stronger signals
    """

    def __init__(self, line_count: int = 3):
        self.line_count = line_count
        self.lines: list[dict] = []

    def on_close(self, timestamp: int, close: float) -> dict | None:
        if not self.lines:
            self.lines.append({
                'timestamp': timestamp,
                'open': close,
                'close': close,
                'high': close,
                'low': close,
                'direction': 0,
            })
            return None

        lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines

        highest = max(l['high'] for l in lookback)
        lowest = min(l['low'] for l in lookback)
        last = self.lines[-1]

        new_line = None

        if close > highest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': close,
                'low': last['close'],
                'direction': 1,
            }
        elif close < lowest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': last['close'],
                'low': close,
                'direction': -1,
            }

        if new_line:
            self.lines.append(new_line)
            return new_line

        return None

11. ポイント&フィギュアチャート

ポイント&フィギュア(P&F)チャートはX列(上昇価格)とO列(下降価格)を使用します。列の切り替えには通常3ボックスサイズの反転が必要です。ノイズフィルタリングとサポート/レジスタンスの識別において最も古い手法の1つです。

class PointAndFigureGenerator:
    """
    Generates Point & Figure chart data.

    X column: price rising by box_size increments.
    O column: price falling by box_size increments.
    Column switch: requires reversal_boxes * box_size movement
    in the opposite direction.

    Classic setting: box_size based on ATR, reversal_boxes = 3.
    """

    def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
        self.box_size = box_size
        self.reversal_boxes = reversal_boxes
        self.reversal_amount = box_size * reversal_boxes

        self.columns: list[dict] = []
        self.current_direction: int = 0
        self.current_top: float | None = None
        self.current_bottom: float | None = None

    def on_price(self, timestamp: int, price: float):
        if self.current_top is None:
            box_price = self._round_to_box(price)
            self.current_top = box_price
            self.current_bottom = box_price
            self.current_direction = 1
            return None

        events = []

        if self.current_direction == 1:
            while price >= self.current_top + self.box_size:
                self.current_top += self.box_size
                events.append(('X', self.current_top, timestamp))

            if price <= self.current_top - self.reversal_amount:
                col = {
                    'type': 'X',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = -1
                self.current_top = self.current_top - self.box_size
                self.current_bottom = self._round_to_box(price)
                events.append(('new_column', 'O', timestamp))

        else:
            while price <= self.current_bottom - self.box_size:
                self.current_bottom -= self.box_size
                events.append(('O', self.current_bottom, timestamp))

            if price >= self.current_bottom + self.reversal_amount:
                col = {
                    'type': 'O',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = 1
                self.current_bottom = self.current_bottom + self.box_size
                self.current_top = self._round_to_box(price)
                events.append(('new_column', 'X', timestamp))

        return events if events else None

    def _round_to_box(self, price: float) -> float:
        return round(price / self.box_size) * self.box_size

カギ足、新値三本足、P&Fのアルゴリズミックトレーディングにおける位置づけ: 主に長期トレンド検出とサポート/レジスタンスの識別に使用されます。フィルターレイヤーとして — 「カギ足が陰モードのときはロングシグナルを取らない」— マクロ構造にトレードを合わせることで価値を付加します。

12〜14. 情報駆動型バー

情報駆動型バー インバランスバー、ランバー、CUSUMフィルター、エントロピーバー:市場が何かが変わったと教えてくれるときにサンプリング。

最も洗練されたアプローチで、Marcos Lopez de Pradoの Advances in Financial Machine Learning(2018年)に基づいています。核心的な洞察:固定間隔ではなく、市場に新しい情報が到着したときにサンプリングする。

12. TIB(Tick Imbalance Bars)

市場が均衡状態にある場合、買い手主導と売り手主導の取引はほぼ均衡するはずです。インバランスが期待値を超えたとき、何かが変化しています。その瞬間にバーをサンプリングします。

各取引はティックルールを使用して買い手主導(+1)または売り手主導(-1)に分類されます。累積インバランスθを追跡し、|θ|が動的な閾値を超えたときにサンプリングします。

class TickImbalanceBarGenerator:
    """
    Generates bars when the cumulative tick imbalance exceeds
    expected levels — i.e., when "new information" arrives.

    Based on Lopez de Prado (2018), Chapter 2.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        """Classify trade as buy (+1) or sell (-1) using tick rule."""
        if self.prev_price is None:
            self.prev_price = price
            return 1

        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign

        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.imbalances.append(self.theta / len(self.trades))

        if len(self.bar_lengths) >= 2:
            alpha = 2.0 / (self.ewma_window + 1)
            self.expected_ticks = (
                alpha * self.bar_lengths[-1]
                + (1 - alpha) * self.expected_ticks
            )
            self.expected_ticks = max(
                self.min_ticks,
                min(self.max_ticks, self.expected_ticks)
            )
            self.expected_imbalance = (
                alpha * self.imbalances[-1]
                + (1 - alpha) * self.expected_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

13. VIB(Volume Imbalance Bars)

TIBの拡張:各取引を±1とカウントする代わりに、符号付き出来高で重み付けします。100 BTCの買いは+100、1 BTCの売りは-1に寄与します。多数の小口取引に分割される可能性のある大口のインフォームドオーダーを捕捉します。

class VolumeImbalanceBarGenerator:
    """
    Like TIBs, but uses signed volume instead of signed ticks.

    Captures the insight that a 100-BTC buy signal is 100x more
    informative than a 1-BTC buy signal.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.volume_imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_vol_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign * qty
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= 10:
            return self._close_bar()
        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.volume_imbalances.append(self.theta / len(self.trades))

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = (
                alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            )
            self.expected_vol_imbalance = (
                alpha * self.volume_imbalances[-1]
                + (1 - alpha) * self.expected_vol_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

爆発問題

インバランスバーの既知の問題:EWMAベースの閾値が正のフィードバックループに入る可能性があります。解決策:min_ticksmax_ticksの境界でクランプすること。


self.expected_ticks = max(
    self.min_ticks,    # Floor: never less than 100 ticks
    min(
        self.max_ticks,  # Ceiling: never more than 50000 ticks
        new_expected_ticks
    )
)

14. ランバー

ランバーは現在の方向性ランの長さ — 買いまたは売りの最長連続シーケンスを追跡します。大口のインフォームドトレーダーが注文を多数の小口取引に分割すると、シーケンスが異常に長くなります。ランバーはこれを検出します。

class TickRunBarGenerator:
    """
    Generates bars when the length of a directional run exceeds expectations.

    Based on Lopez de Prado (2018), Chapter 2.

    Difference from imbalance bars:
    - Imbalance bars track NET imbalance (buys minus sells)
    - Run bars track the MAXIMUM run length (consecutive buys OR sells)
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        self.bar_lengths: list[int] = []
        self.max_runs: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_max_run = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.trades.append((timestamp, price, qty))

        if sign == 1:
            self.buy_run += 1
            self.sell_run = 0
        else:
            self.sell_run += 1
            self.buy_run = 0

        self.max_buy_run = max(self.max_buy_run, self.buy_run)
        self.max_sell_run = max(self.max_sell_run, self.sell_run)

        theta = max(self.max_buy_run, self.max_sell_run)
        threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3

        if theta >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
        self.bar_lengths.append(len(self.trades))
        self.max_runs.append(max_run)

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
            self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run

        self.trades = []
        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        return bar

ランバーは出来高ランドルランに拡張できます。

15. CUSUMフィルターバー

CUSUMフィルター(累積和)は、累積リターンを追跡することでいつサンプリングするかを決定します。インバランスバー(生の取引データで動作する)とは異なり、CUSUMは既存の1分足OHLCVデータに適用できます — ティックデータは不要です。

class CUSUMFilterBarGenerator:
    """
    Symmetric CUSUM filter for event-based sampling.

    Based on Lopez de Prado (2018), Chapter 2.5.

    Key advantage over Bollinger Bands: CUSUM requires a FULL
    run of threshold magnitude before triggering. Bollinger Bands
    trigger repeatedly when price hovers near the band.

    Can be applied to 1m OHLCV data — no tick data required.
    """

    def __init__(self, threshold: float = 0.01):
        self.threshold = threshold
        self.s_pos = 0.0
        self.s_neg = 0.0
        self.prev_price: float | None = None
        self.buffer: list[OHLCV] = []
        self.bars: list[OHLCV] = []

    def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
        self.buffer.append(candle)

        if self.prev_price is None:
            self.prev_price = candle.close
            return None

        import math
        log_ret = math.log(candle.close / self.prev_price)
        self.prev_price = candle.close

        self.s_pos = max(0.0, self.s_pos + log_ret)
        self.s_neg = min(0.0, self.s_neg + log_ret)

        triggered = False

        if self.s_pos > self.threshold:
            self.s_pos = 0.0
            triggered = True

        if self.s_neg < -self.threshold:
            self.s_neg = 0.0
            triggered = True

        if triggered and len(self.buffer) >= 2:
            bars = self.buffer
            bar = OHLCV(
                timestamp=bars[-1].timestamp,
                open=bars[0].open,
                high=max(b.high for b in bars),
                low=min(b.low for b in bars),
                close=bars[-1].close,
                volume=sum(b.volume for b in bars),
            )
            self.bars.append(bar)
            self.buffer = []
            return bar

        return None

CUSUM + トリプルバリア法: Lopez de Pradoのフレームワークでは、CUSUMイベントはトリプルバリア法のエントリーポイントとして使用されます — 各イベントがストップロス、テイクプロフィット、有効期限のバリアを持つ取引をトリガーします。このようなイベント駆動型戦略のロバストな検証については、Walk-Forward OptimizationMonte Carlo Bootstrap for Backtestingを参照してください。

16. エントロピーバー

最も理論的に洗練されたアプローチ:バー内の価格系列の情報量(シャノンエントロピー)が閾値を超えたときにサンプリングします。

class EntropyBarGenerator:
    """
    Generates bars when the entropy of intra-bar returns exceeds
    a threshold.

    Based on Shannon's information theory: bars are sampled when
    "new information" arrives, measured as the entropy of the
    return distribution within the current bar.

    This is the most theoretically "pure" information-driven bar.
    """

    def __init__(
        self,
        entropy_threshold: float = 2.0,
        min_trades: int = 50,
        n_bins: int = 10,
    ):
        self.entropy_threshold = entropy_threshold
        self.min_trades = min_trades
        self.n_bins = n_bins
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))

        if len(self.trades) < self.min_trades:
            return None

        entropy = self._compute_entropy()

        if entropy >= self.entropy_threshold:
            return self._close_bar()

        return None

    def _compute_entropy(self) -> float:
        import math

        prices = [t[1] for t in self.trades]
        if len(prices) < 2:
            return 0.0

        returns = [
            math.log(prices[i] / prices[i-1])
            for i in range(1, len(prices))
            if prices[i-1] > 0
        ]

        if not returns:
            return 0.0

        min_r = min(returns)
        max_r = max(returns)

        if max_r == min_r:
            return 0.0

        bin_width = (max_r - min_r) / self.n_bins
        bins = [0] * self.n_bins

        for r in returns:
            idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
            bins[idx] += 1

        total = sum(bins)
        entropy = 0.0
        for count in bins:
            if count > 0:
                p = count / total
                entropy -= p * math.log2(p)

        return entropy

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

実践的な注意点: エントロピーバーは計算コストが高く、主にリサーチ目的です — しかしMLベースの戦略では、各バーがほぼ等しい「情報」を含むため、より良い統計的特性を持つ特徴量を生成します。

17. Deltaバー(オーダーフロー)

Deltaバーとオーダーフロー 累積Delta:アグレッシブな買い手と売り手の純粋な力をリアルタイムで測定。

Deltaバーは累積Delta — 買い出来高と売り出来高の差分 — に基づいてサンプリングします。インバランスバー(ティックの符号±1を使用する)とは異なり、Deltaバーは実際の出来高加重オーダーフローを使用します。

class DeltaBarGenerator:
    """
    Generates bars based on cumulative order flow delta.

    Delta = Buy Volume - Sell Volume (classified by aggressor side).

    Requires trade-level data with side classification
    (available from Binance aggTrades, Bybit trades, etc.)
    """

    def __init__(self, threshold: float = 500.0):
        self.threshold = threshold
        self.cumulative_delta = 0.0
        self.trades: list[tuple[int, float, float, int]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
        side = -1 if is_buyer_maker else 1
        signed_qty = side * qty

        self.cumulative_delta += signed_qty
        self.trades.append((timestamp, price, qty, side))

        if abs(self.cumulative_delta) >= self.threshold:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        bar.delta = self.cumulative_delta  # type: ignore
        bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1)  # type: ignore
        bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1)  # type: ignore

        self.bars.append(bar)
        self.cumulative_delta = 0.0
        self.trades = []
        return bar

Deltaダイバージェンス: 最も強力なシグナルの1つ — 価格が上昇しているのに累積Deltaがマイナス(売り手がアグレッシブだが価格は上がっている。これはリミット買いの吸収を示す)。Digital Fingerprint: Trader Identificationの記事で説明されている行動フィンガープリンティングアプローチに直接関連しています。Avellaneda-Stoikovモデルを使用するマーケットメーカーにとって、Deltaバーは在庫リスクとアグレッサー圧力のリアルタイムビューを提供します。


ローリングウィンドウ集約 基本バーのサーキュラーバッファ:新しいデータが入り、古いデータが出て、集約されたローソクは常に有効。

集約方法は、基本バーをより高い時間足(HTF)のローソクにどのようにまとめるかを決定します。これはバーの種類とは独立しており — どの基本バーの種類にも任意の集約方法を適用できます。

方法A:カレンダー整列集約

固定のカレンダー境界内にあるすべての基本バーを集約します。「1時間足」ローソクは14:00:00から14:59:59までのすべてのバーをカバーします。

特性:

  • すべての市場参加者が同じ境界を見る — 市場構造分析、サポート/レジスタンス、PIQトリガーに不可欠
  • コールドスタート問題:再起動後の不完全なローソク
  • 時間バーには自然(取引所がネイティブに提供するもの)
  • 非時間バーにも機能:「14:00から15:00の間に閉じたすべての出来高バー」=出来高バーからのカレンダー整列1時間足ローソク

方法B:ローリングウィンドウ集約

最後のN本の確定した基本バーを集約し、新しいバーごとに再計算します。「1時間足」ローリングローソク=最後の60本の確定した1分足時間バー、毎分更新。

アトミック単位は確定した基本バーです。 この設計選択により:

  1. コールドスタートなし。 N本のバー後、ローソクは有効です。不完全なローソクのノイズなし。
  2. バックテストパリティ。 ライブトレーディングがバックテストエンジンと同じアトミック単位を使用すれば、シグナルは同一。
  3. シンプルな検証。 1つのルール:if buffer not full: skip
import numpy as np

class RollingCandleAggregator:
    """
    Produces rolling higher-timeframe candles from closed base bars.

    Works with ANY bar type: time bars, tick bars, volume bars,
    dollar bars, delta bars — anything that produces OHLCV output.

    Example: RollingCandleAggregator(window=60) with 1m time bars
    produces a "1h" candle updated every minute.

    Example: RollingCandleAggregator(window=24) with volume bars
    produces a candle spanning the last 24 volume bars.
    """

    def __init__(self, window: int):
        self.window = window
        self.buffer: deque[OHLCV] = deque(maxlen=window)

    def push(self, bar: OHLCV) -> OHLCV | None:
        """
        Add a closed base bar. Returns aggregated candle
        only when buffer is full (= candle is valid).
        """
        self.buffer.append(bar)

        if len(self.buffer) < self.window:
            return None

        return self._aggregate()

    def _aggregate(self) -> OHLCV:
        bars = list(self.buffer)
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

    @property
    def is_valid(self) -> bool:
        return len(self.buffer) == self.window

位相シフトのトレードオフ: ローリングローソクは:37に開始した場合:37に閉じ、他の人の:00のようにはなりません。群衆に見えるレベルに依存する戦略にとってこれは重要です。解決策:両方を使う — 市場構造にはカレンダー、シグナルにはローリング。

方法C:アダプティブローリング集約

ローリングと同様ですが、ウィンドウサイズが現在のボラティリティに適応します。穏やかな市場→より広いウィンドウ(より多くの平滑化)。ボラタイルな市場→より狭いウィンドウ(より速い反応)。

class AdaptiveRollingAggregator:
    """
    Rolling window where the window size adapts to volatility.

    Works with any base bar type. Uses ATR of recent bars
    as the volatility measure.

    Low volatility → wider window (more smoothing, fewer signals)
    High volatility → narrower window (faster reaction)
    """

    def __init__(
        self,
        base_window: int = 60,
        min_window: int = 15,
        max_window: int = 240,
        atr_period: int = 14,
        atr_base: float | None = None,
    ):
        self.base_window = base_window
        self.min_window = min_window
        self.max_window = max_window
        self.atr_period = atr_period
        self.atr_base = atr_base

        self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
        self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
        self.current_window = base_window

    def push(self, bar: OHLCV) -> OHLCV | None:
        self.all_candles.append(bar)

        tr = bar.high - bar.low
        self.atr_values.append(tr)

        if len(self.atr_values) < self.atr_period:
            return None

        current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period

        if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
            self.atr_base = sum(self.atr_values) / len(self.atr_values)

        if self.atr_base is None or self.atr_base == 0:
            return None

        vol_ratio = current_atr / self.atr_base
        self.current_window = int(self.base_window / vol_ratio)
        self.current_window = max(self.min_window, min(self.max_window, self.current_window))

        if len(self.all_candles) < self.current_window:
            return None

        bars = list(self.all_candles)[-self.current_window:]
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

すべての基本バーの種類は、すべての集約方法と組み合わせることができます。標準的な組み合わせ(カレンダー時間バー=取引所が提供するもの)もあれば、エキゾチックだが強力なものもあります。

組み合わせの例

基本バーの種類 カレンダー ローリング アダプティブ
時間 標準的な取引所ローソク 常に有効なHTF、コールドスタートなし ボラティリティ適応型時間足
出来高 「今1時間のすべての出来高バー」 最後の24本の出来高バー 穏やかな市場ではより広いウィンドウ
ドル 1時間ドルバー集約 最後のN本のドルバー アダプティブドルウィンドウ
TIB 1時間インバランス集約 最後のN回のインバランスイベント ボラタイルレジームでの高速反応
Delta 1時間のネットオーダーフロー ローリングDeltaスナップショット アダプティブフローウィンドウ
Renko 「今1時間のブリック」 最後のN個のブリック アダプティブブリック数

ハイブリッドエンジン:カレンダー + ローリング

実践では、カレンダーとローリングの集約を同時に使用したいものです。メモリオーバーヘッドは無視できます — シンボルごと、時間足ごとに2つのdequeバッファ。

class HybridCandleEngine:
    """
    Maintains both calendar-aligned and rolling candles
    for any base bar type.

    Calendar candles: for market structure, support/resistance, PIQ.
    Rolling candles: for indicators, signal generation, entries/exits.
    """

    def __init__(self):
        self.rolling = {
            '1h': RollingCandleAggregator(60),
            '4h': RollingCandleAggregator(240),
        }
        self.calendar: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }
        self._calendar_buffer: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }

    def on_bar(self, bar: OHLCV):
        """Process any base bar type — time, volume, tick, delta, etc."""
        rolling_results = {}
        for tf, agg in self.rolling.items():
            rolling_results[tf] = agg.push(bar)

        self._update_calendar(bar)

        return rolling_results

    def _update_calendar(self, bar: OHLCV):
        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)

        for tf, minutes in [('1h', 60), ('4h', 240)]:
            self._calendar_buffer[tf].append(bar)

            total_minutes = ts.hour * 60 + ts.minute
            if (total_minutes + 1) % minutes == 0:
                bars = self._calendar_buffer[tf]
                if bars:
                    agg = OHLCV(
                        timestamp=bars[-1].timestamp,
                        open=bars[0].open,
                        high=max(b.high for b in bars),
                        low=min(b.low for b in bars),
                        close=bars[-1].close,
                        volume=sum(b.volume for b in bars),
                    )
                    self.calendar[tf].append(agg)
                    self._calendar_buffer[tf] = []

時間-出来高ハイブリッド:出来高分割付きカレンダー

特別な集約バリアント:出来高が閾値を超えたときに早期に強制クローズするカレンダー整列ローソク。アクティビティスパイクに適応しながら時間同期を維持します。

class TimeVolumeHybridGenerator:
    """
    Calendar-aligned candles that split when volume spikes.

    Rule: close the candle at the calendar boundary OR when
    accumulated volume exceeds vol_threshold, whichever comes first.

    Works with any base bar type — the volume trigger adds an
    extra split dimension on top of calendar alignment.
    """

    def __init__(
        self,
        interval_minutes: int = 60,
        vol_threshold: float = 5000.0,
    ):
        self.interval_minutes = interval_minutes
        self.vol_threshold = vol_threshold

        self.buffer: list[OHLCV] = []
        self.accumulated_volume = 0.0
        self.bars: list[OHLCV] = []

    def on_bar(self, bar: OHLCV) -> OHLCV | None:
        self.buffer.append(bar)
        self.accumulated_volume += bar.volume

        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)
        total_minutes = ts.hour * 60 + ts.minute
        at_boundary = (total_minutes + 1) % self.interval_minutes == 0

        vol_spike = self.accumulated_volume >= self.vol_threshold

        if at_boundary or vol_spike:
            return self._close_bar(split_reason='volume' if vol_spike else 'time')

        return None

    def _close_bar(self, split_reason: str) -> OHLCV:
        bars = self.buffer
        bar = OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )
        bar.split_reason = split_reason  # type: ignore
        bar.num_bars = len(bars)  # type: ignore

        self.bars.append(bar)
        self.buffer = []
        self.accumulated_volume = 0.0
        return bar

実践的集約:カスケードプリロード

カスケード集約 カスケードプリロード:日足を時間足から、時間足を分足から構成 — API制限を回避。

取引所は提供する過去データの量を制限しています。BinanceはRESTリクエストあたり約1000本のローソク、OKXは300本が上限です。ローリング1Dローソク(1440分)が必要な場合、十分な1分足の履歴を常に取得できるとは限りません。WebSocketを介した取引と板情報のリアルタイムストリーミングについては、CCXT Pro WebSocket Methodsを参照してください。

解決策:カスケード集約 — 各深度で利用可能な最高解像度からより高い時間足を構築し、それらを結合します。

Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│   ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│   └── 1 partial hour:
│       └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain

OHLCV集約は合成可能であるため機能します:1Dローソクの高値は24本の1h高値の最大値であり、それは1440本の1m高値の最大値です。

マルチ取引所の制限

取引所 最大1分足数 最大1時間足数 特記すべき間隔
Binance 1,000 1,000 1分〜1月、フルレンジ
Bybit 1,000 1,000 1〜720、日/週/月
OKX 300 300 1分〜1月(より制限的)
Gate.io 1,000 1,000 10秒〜30日

集約整合性チェック

REST APIからの1時間足ローソクは、60本の1分足から計算したものと一致しない場合があります。常に検証してください:

def validate_aggregation(
    candle_htf: OHLCV,
    candles_ltf: list[OHLCV],
    tolerance_pct: float = 0.001,
) -> dict[str, bool]:
    agg = OHLCV(
        timestamp=candles_ltf[-1].timestamp,
        open=candles_ltf[0].open,
        high=max(c.high for c in candles_ltf),
        low=min(c.low for c in candles_ltf),
        close=candles_ltf[-1].close,
        volume=sum(c.volume for c in candles_ltf),
    )

    def close_enough(a: float, b: float) -> bool:
        if a == 0 and b == 0:
            return True
        return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct

    return {
        'open': close_enough(candle_htf.open, agg.open),
        'high': close_enough(candle_htf.high, agg.high),
        'low': close_enough(candle_htf.low, agg.low),
        'close': close_enough(candle_htf.close, agg.close),
        'volume': close_enough(candle_htf.volume, agg.volume),
    }

検証が一貫して失敗する場合、常に1分足から自分で集約してくださいバックテストパリティのために取引所のHTFローソクを信頼してはいけません。


比較マトリクス

軸1:基本バーの種類

# バーの種類 トリガー ティックデータ必要 最適な用途
1 時間 固定間隔 いいえ 市場構造、群集行動
2 ティック N回の取引 はい ML特徴量、均等意見サンプリング
3 出来高 N単位の取引 はい 正規化されたアクティビティ分析
4 ドル $Nの想定元本 はい クロスアセット比較
5 Renko 価格±N単位 いいえ トレンドフォロー、ノイズフィルタリング
6 レンジ 高値-安値≧N はい ブレイクアウト検出
7 ボラティリティ アダプティブレンジ はい レジーム適応型分析
8 Heikin-Ashi 変換 いいえ トレンド確認(合成価格!)
9 カギ足 価格反転 いいえ 需給構造
10 新値三本足 N線ブレイクアウト いいえ マクロトレンドフィルター
11 P&F ボックス+反転 いいえ サポート/レジスタンスマッピング
12 TIB ティックインバランス はい インフォームドフロー検出
13 VIB 出来高インバランス はい 大口注文検出
14 ラン ラン長 はい 注文分割検出
15 CUSUM 累積リターン いいえ(1分足終値) 構造的ブレイクイベント
16 エントロピー シャノンエントロピー はい MLリサーチ、特徴量純度
17 Delta オーダーフローDelta はい(aggTrades) アグレッサーフロー分析

軸2:集約方法

方法 整列 コールドスタート 位相シフト 最適な用途
カレンダー 壁時計 不完全バーリスク なし(群衆整列) 市場構造、PIQ、S/R
ローリング N本のバー なし(ウォームアップ後) あり(:00からシフト) インジケーター、シグナル
アダプティブ ボラティリティ駆動N ATRキャリブレーション後 あり ボラティリティ適応型戦略

実践的推奨

レイヤードアーキテクチャ 4レイヤーローソクアーキテクチャ:ローリングシグナル、カレンダー構造、ミクロ構造フロー、トレンドフィルター。

バックテストエンジンが1分足OHLCVデータで動作する場合:

  1. ローリング時間バー — 最もシンプルなアップグレード。追加データ不要。コールドスタートを排除。
  2. ハイブリッド(ローリング+カレンダー)時間バー — 市場構造にはカレンダー、シグナルにはローリング。
  3. CUSUMフィルター — 1分足終値で動作、ティックデータ不要。「何かが十分に動いて興味深い。」

ティック/取引データがある場合:

  1. ドルバー+ローリング — クオンタティブファイナンスの文献で推奨されるデフォルト。
  2. VIB+ローリング — インフォームドフローを検出、重要なイベント時により多くサンプリング。
  3. Deltaバー+カレンダー — アグレッサーサイド分類がある場合、誰が市場を動かしているかの最も直接的なビュー。

フィルターとして(任意のベース+集約の組み合わせの上にHeikin-Ashiまたは新値三本足を適用):

  1. ローリング出来高バー上のHeikin-Ashi — アクティビティ正規化データでのクリーンなトレンドシグナル。
  2. 日足カレンダーバー上の新値三本足/カギ足 — マクロトレンドフィルター。

Marketmaker.cc向け — レイヤードアプローチ:

  • レイヤー1(シグナル): インジケーターとエントリー/エグジットシグナルのための時間バーのローリング集約。コールドスタートなし、完全なバックテストパリティ。
  • レイヤー2(市場構造): サポート/レジスタンス、時間足終値分析、PIQトリガーのためのカレンダー整列時間バー。
  • レイヤー3(ミクロ構造): インフォームドフローの検出、注文分割の検出、大きな動きの予測のための、生の取引ストリームからのVIB+Deltaバー。オーダーフローデータでの行動パターン認識については、Digital Fingerprint: Trader Identificationも参照してください。
  • レイヤー4(トレンドフィルター): ローリングバー上のHeikin-Ashi変換、または4時間カレンダー終値上の新値三本足で、シグナルをマクロ方向に合わせる。

結論

ローソク足の構成は1つの選択ではなく、2つの独立した意思決定です:

  1. どのようなバーか? 時間は時計の間隔を捕捉する。アクティビティ(ティック、出来高、ドル)は市場参加を捕捉する。価格(Renko、レンジ、ボラティリティ)は変動を捕捉する。情報(インバランス、ラン、CUSUM、エントロピー)は新しい情報の到着を捕捉する。オーダーフロー(Delta)はアグレッシブな圧力を捕捉する。

  2. より高い時間足にどう集約するか? カレンダーは群衆に合わせる。ローリングはコールドスタートを排除する。アダプティブはボラティリティに反応する。

標準的な「Binanceの1時間足ローソク」は17×3マトリクスの1セルに過ぎません。他の50の組み合わせは、それらを実装する意思のある誰もが利用できます。プロダクションシステムの答えは「意思決定エンジンの各レイヤーに適切な組み合わせを選ぶ」です。

アトミック単位 — 確定した基本バー — が基盤であり続けます。それ以外はすべて集約です。

きめ細かいデータによるバックテスト精度の詳細については、Adaptive Drill-Down: Backtest with Variable Granularityを参照。マルチタイムフレーム戦略におけるインジケーター事前計算の影響については、Aggregated Parquet Cacheを参照。


参考リンク

  1. Lopez de Prado — Advances in Financial Machine Learning (2018)
  2. Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
  3. mlfinlab — Python library implementing information-driven bars
  4. Binance — Historical Market Data
  5. Apache Parquet — columnar storage format

Citation

@article{soloviov2026bartypes,
  author = {Soloviov, Eugen},
  title = {Bar Types and Aggregation Methods for Algorithmic Trading},
  year = {2026},
  url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
  description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}
blog.disclaimer

MarketMaker.cc Team

クオンツ・リサーチ&戦略

Telegramで議論する
Newsletter

市場の先を行く

ニュースレターを購読して、独占的なAI取引の洞察、市場分析、プラットフォームの更新情報を受け取りましょう。

プライバシーを尊重します。いつでも配信停止可能です。