Типы баров и методы агрегации для алготрейдинга
Каждый свечной график, который вы видели на Binance, TradingView или любой другой бирже, построен одинаково: агрегировать сделки за фиксированное временное окно — 1 минуту, 5 минут, 1 час — и сформировать OHLCV-бар. Это настолько привычно, что большинство трейдеров даже не задумываются об альтернативах. Но для алгоритмической торговли выбор типа бара и метода агрегации — это два независимых решения, и большинство систем их смешивают.
В этой статье мы разделяем две оси построения свечей: какой тип бара вы строите (17 типов) и как вы агрегируете их в старшие таймфреймы (3 метода). Комбинация даёт 51 возможную конфигурацию, каждая с различными свойствами для бэктестинга, live-торговли и генерации сигналов.
Об основах превращения сырых сделок в стандартные свечи читайте в статье Как устроены торговые свечи.
Коротко (TL;DR)
- Построение свечей имеет две независимые оси: тип бара и метод агрегации
- 17 базовых типов баров: временные, тиковые, объёмные, долларовые, Renko, диапазонные, волатильностные, Heikin-Ashi, Kagi, Line Break, P&F, тиковый дисбаланс (TIB), объёмный дисбаланс (VIB), бары серий (run), CUSUM, энтропийные, дельта
- 3 метода агрегации: календарная, скользящее окно, адаптивное скользящее окно
- 17 × 3 = 51 возможная комбинация, каждая со своими свойствами
- Большинство систем используют только одну комбинацию: календарные таймбары. Остальные 50 остаются неиспользованными.
- Практическая рекомендация: используйте несколько комбинаций слоями — скользящие таймбары для сигналов, календарные таймбары для рыночной структуры, информационные бары для микроструктуры
Две оси построения свечей
Традиционный подход размещает все типы баров в плоский список: таймбары, тиковые бары, объёмные бары, Renko и т.д. Это вводит в заблуждение. На самом деле есть два ортогональных выбора:
Ось 1 — Базовый тип бара (17 типов): Как вы решаете, когда закрывается новый бар? После фиксированного временного интервала? После N сделок? После ценового движения? Когда меняется информационное содержание? Это определяет, что означает «один бар».
Ось 2 — Метод агрегации (3 метода): Как вы составляете базовые бары в свечи старших таймфреймов? Выравнивание по календарным границам (00:00, 01:00, ...)? Скользящее окно из последних N баров? Адаптивное окно, зависящее от волатильности?
Эти две оси независимы. Вы можете получить:
- Календарные тиковые бары — агрегировать тиковые бары, закрывшиеся между 14:00 и 14:59, в одну часовую свечу
- Скользящие объёмные бары — взять последние 24 объёмных бара независимо от времени их закрытия
- Адаптивные дельта-бары — использовать окно, управляемое волатильностью, поверх дельта-баров
Стандартная «часовая свеча» — это лишь одна точка в матрице 17×3: таймбары + календарное выравнивание. Каждая другая комбинация — альтернатива, достойная рассмотрения.
1. Таймбары (стандартные)
Неравномерная информационная плотность: жёсткие временные границы одинаково обрабатывают тихие часы с 200 сделками и часы объявлений с 50 000 сделок.
Стандартный тип. Новый бар формируется через фиксированный временной интервал: 1 минута, 5 минут, 1 час. Каждая биржа предоставляет их изначально.
Свойства:
- Во время азиатской сессии (00:00–08:00 UTC) часовая свеча может содержать 200 сделок. Во время объявления листинга на Binance в то же окно может попасть 50 000 сделок. Таймбары обрабатывают оба случая одинаково. Обнаружение таких всплесков активности критично для защиты ботов — см. Обнаружение аномалий для торговых ботов.
- Все участники рынка видят одинаковые границы свечей — точка Шеллинга. Это делает таймбары незаменимыми для анализа поведения толпы.
- Индикаторы, рассчитанные на неполных свечах (после перезапуска), дают мусорные значения.
from datetime import datetime
def time_until_valid_hourly_candle():
"""How long until the first complete hourly candle after restart."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
2–4. Бары на основе активности
Тиковые, объёмные и долларовые бары: три способа позволить рыночной активности — а не часам — определять границы баров.
Вместо сэмплирования через фиксированные временные интервалы — сэмплировать после фиксированного объёма рыночной активности. Это создаёт бары с примерно одинаковым «информационным содержанием» вне зависимости от времени суток.
2. Тиковые бары
Новый бар формируется после каждых N сделок (тиков). При высокой активности бары формируются быстро. В тихие периоды один бар может охватывать часы.
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Generates a new bar every `threshold` trades.
Each bar contains equal number of market "opinions".
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
Плюсы: Естественная адаптация к рыночной активности. Доходности тиковых баров ближе к нормальному распределению, чем доходности таймбаров — свойство, улучшающее работу многих статистических моделей.
Минусы: Требует поток сырых сделок (доступен не у всех поставщиков исторических данных). Время закрытия бара непредсказуемо — невозможно сказать «следующий бар закроется в X».
3. Объёмные бары
Новый бар формируется после торговли N контрактов (или монет в криптовалюте). Аналогично тиковым барам, но с учётом размера сделки — одна сделка на 100 BTC вносит в 100 раз больший вклад, чем сделка на 1 BTC.
class VolumeBarGenerator:
"""
Generates a new bar every `threshold` units of volume.
Normalizes for trade size: one large order ≠ one small order.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
4. Долларовые бары
Новый бар формируется после обмена фиксированного номинала (в USD/USDT). Наиболее устойчивый из баров на основе активности, поскольку нормализует и количество сделок, и ценовой уровень.
Рассмотрим: если ETH вырос с 4 000, продажа ETH на 4 000, но 10 ETH при $1 000. Объёмные бары обрабатывают эти случаи по-разному; долларовые бары — одинаково.
class DollarBarGenerator:
"""
Generates a new bar every `threshold` dollars (USDT) of notional volume.
Most robust normalization: independent of price level.
Lopez de Prado (2018) recommends dollar bars as the default
for most quantitative applications.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
Выбор порога
Порог для баров на основе активности должен давать примерно такое же количество баров в день, как и заменяемые таймбары. Для BTCUSDT на Binance:
| Тип бара | Типичный порог | ~Баров/день | Эквивалент TF |
|---|---|---|---|
| Тиковый | 1 000 сделок | ~1 400 | ~1m |
| Тиковый | 50 000 сделок | ~28 | ~1h |
| Объёмный | 100 BTC | ~600 | ~2-3m |
| Объёмный | 2 400 BTC | ~25 | ~1h |
| Долларовый | $1M | ~1 400 | ~1m |
| Долларовый | $50M | ~28 | ~1h |
Эти числа приблизительны и резко меняются с рыночным режимом. Во время ралли или обвала бары на основе активности будут формировать в 5–10 раз больше баров, чем обычно, — что и является их целью.
5–7. Ценовые бары
Кирпичи Renko, диапазонные бары и волатильностные бары: сэмплирование только тогда, когда цена движется достаточно.
Ценовые бары игнорируют и время, и активность. Новый бар формируется только когда цена перемещается на заданную величину. Это естественным образом фильтрует боковой шум и выделяет тренды.
5. Бары Renko
Новый «кирпич» Renko формируется, когда цена закрытия сдвигается как минимум на N единиц от закрытия предыдущего кирпича. Кирпичи всегда одного размера, создавая чистое визуальное представление направления тренда.
class RenkoBarGenerator:
"""
Generates Renko bricks based on price movement.
Key property: during sideways movement, no new bricks form.
During strong trends, bricks form rapidly.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
Динамический Renko использует ATR (Average True Range) вместо фиксированного размера кирпича, автоматически адаптируясь к волатильности.
6. Диапазонные бары
Каждый бар имеет фиксированный диапазон high-low. Когда диапазон превышен, бар закрывается и начинается новый. В отличие от Renko, диапазонные бары включают тени и могут показывать внутрибарную волатильность.
class RangeBarGenerator:
"""
Generates bars with a fixed high-low range.
Difference from Renko: range bars show the full OHLC within
the range, not just brick direction. More information-rich.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
Ключевое отличие между Renko и диапазонными барами: Renko отслеживает только цены закрытия и показывает направление; диапазонные бары отслеживают полный ценовой диапазон и показывают структуру внутри бара. Диапазонные бары, как правило, более полезны для алгоритмической торговли, так как сохраняют информацию о high-low, необходимую для моделирования стоп-лоссов и тейк-профитов.
7. Волатильностные бары
Новый бар формируется, когда внутрибарная волатильность достигает динамического порога — например, кратного недавнего ATR. В отличие от диапазонных баров (фиксированный порог), волатильностные бары адаптируются к рыночным условиям.
class VolatilityBarGenerator:
"""
Generates bars when intra-bar volatility reaches a threshold.
Similar to range bars, but the threshold adapts to market conditions
using a rolling ATR measure. In calm markets, bars need less
absolute movement to close; in volatile markets, more.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
8. Heikin-Ashi (сглаженная трансформация)
Heikin-Ashi: усреднение преобразует шумные свечи в плавные трендовые сигналы — но ценой потери точной ценовой информации.
Heikin-Ashi (с японского — «средний бар») — это не тип бара, а трансформация, которую можно применить поверх любого базового типа бара. Она сглаживает свечи путём усреднения текущего и предыдущего значений:
- HA Close = (Open + High + Low + Close) / 4
- HA Open = (Предыдущий HA Open + Предыдущий HA Close) / 2
- HA High = max(High, HA Open, HA Close)
- HA Low = min(Low, HA Open, HA Close)
Тренды проявляются как последовательности одноцветных свечей без нижних теней (восходящий тренд) или без верхних теней (нисходящий тренд).
class HeikinAshiTransformer:
"""
Transforms standard OHLCV candles into Heikin-Ashi candles.
Can be applied on top of ANY bar type: time bars, volume bars,
rolling bars, etc. It's a transformation, not a sampling method.
WARNING: HA prices are synthetic — they don't represent real
traded prices. Never use HA close for order placement or
PnL calculation. Use HA only for signal generation, then
execute at real prices.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Transform an entire series. Resets state first."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Simple HA trend signal.
Returns:
+1: bullish (N consecutive green HA candles with no lower wick)
-1: bearish (N consecutive red HA candles with no upper wick)
0: no clear trend
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
Критическая оговорка для бэктестинга: Цены Heikin-Ashi — синтетические. Если ваш бэктест использует HA close как цену входа, результаты будут ошибочными. Всегда используйте HA только для генерации сигналов, а исполняйте ордера по реальным ценам OHLC.
Когда HA полезен: Трендовые стратегии, которым нужны чистые сигналы «оставаться в позиции». Применяйте HA поверх любого базового типа бара — таймбаров, объёмных баров, долларовых баров — для фильтрации ложных пересечений.
Когда HA вреден: Любая стратегия, требующая точных ценовых уровней — поддержка/сопротивление, анализ стакана, PIQ (Position In Queue). Усреднение уничтожает точную ценовую информацию.
9–11. Японские графики разворотов
Kagi, Line Break и Point & Figure: методы построения графиков без учёта времени, фокусирующиеся исключительно на ценовой структуре.
Это традиционные японские методы построения графиков (наряду с Renko), которые полностью отбрасывают время и фокусируются на ценовой структуре.
9. Графики Kagi
Графики Kagi состоят из вертикальных линий, которые меняют направление, когда цена разворачивается на заданную величину. Толщина линий меняется, когда цена пробивает предыдущий максимум (толстая = «ян» = спрос) или предыдущий минимум (тонкая = «инь» = предложение).
class KagiChartGenerator:
"""
Generates Kagi chart lines based on price reversals.
Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
of each move and changes line thickness at breakout points.
Useful for identifying support/resistance breaks and
supply/demand shifts without time noise.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=up, -1=down
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (thick) or 'yin' (thin)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
10. Графики Line Break
Графики Line Break рисуют новую линию (блок) только тогда, когда цена закрытия превышает максимум или минимум предыдущих N линий (обычно 3). Если цена остаётся в пределах диапазона, новая линия не рисуется.
class LineBreakGenerator:
"""
Generates Line Break bars (Three Line Break by default).
A new bar is drawn only when the close exceeds the high or low
of the last N bars. Filters out minor noise by requiring price
to break through a multi-bar range.
The 'N' parameter (line_count) controls sensitivity:
- N=2: more sensitive, more bars, more noise
- N=3: standard (Three Line Break)
- N=4+: less sensitive, fewer bars, stronger signals
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
11. Графики Point & Figure
Графики Point & Figure (P&F) используют столбцы крестиков X (рост цены) и ноликов O (падение цены). Смена столбца требует разворота обычно на 3 размера клетки. Один из старейших методов фильтрации шума и определения уровней поддержки/сопротивления.
class PointAndFigureGenerator:
"""
Generates Point & Figure chart data.
X column: price rising by box_size increments.
O column: price falling by box_size increments.
Column switch: requires reversal_boxes * box_size movement
in the opposite direction.
Classic setting: box_size based on ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
Kagi, Line Break и P&F в алгоритмической торговле: Используются преимущественно для определения долгосрочных трендов и уровней поддержки/сопротивления. Как фильтрующий слой — «не открывай лонги, когда график Kagi в режиме инь» — они добавляют ценность, выравнивая сделки с макроструктурой.
12–14. Информационные бары
Бары дисбаланса, бары серий, CUSUM-фильтры и энтропийные бары: сэмплирование, когда рынок сообщает нам, что что-то изменилось.
Наиболее изощрённый подход из книги Маркоса Лопеса де Прадо Advances in Financial Machine Learning (2018). Ключевая идея: сэмплировать, когда на рынок поступает новая информация, а не через фиксированные интервалы.
12. Бары тикового дисбаланса (TIB)
Если рынок находится в равновесии, сделки, инициированные покупателями и продавцами, должны примерно балансироваться. Когда дисбаланс превышает ожидания, что-то изменилось. Сэмплируем бар в этот момент.
Каждая сделка классифицируется как инициированная покупателем (+1) или продавцом (-1) по правилу тика. Мы отслеживаем кумулятивный дисбаланс θ и сэмплируем, когда |θ| превышает динамический порог.
class TickImbalanceBarGenerator:
"""
Generates bars when the cumulative tick imbalance exceeds
expected levels — i.e., when "new information" arrives.
Based on Lopez de Prado (2018), Chapter 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classify trade as buy (+1) or sell (-1) using tick rule."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
13. Бары объёмного дисбаланса (VIB)
Расширение TIB: вместо подсчёта каждой сделки как ±1, взвешиваем подписанным объёмом. Покупка на 100 BTC вносит +100, продажа на 1 BTC вносит -1. Выявляет крупные информированные ордера, которые могут быть разбиты на множество мелких сделок.
class VolumeImbalanceBarGenerator:
"""
Like TIBs, but uses signed volume instead of signed ticks.
Captures the insight that a 100-BTC buy signal is 100x more
informative than a 1-BTC buy signal.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
Проблема взрыва
Известная проблема баров дисбаланса: порог на основе EWMA может войти в петлю положительной обратной связи. Решение: ограничение параметрами min_ticks и max_ticks.
self.expected_ticks = max(
self.min_ticks, # Floor: never less than 100 ticks
min(
self.max_ticks, # Ceiling: never more than 50000 ticks
new_expected_ticks
)
)
14. Бары серий (Run Bars)
Бары серий отслеживают длину текущей направленной серии — самую длинную непрерывную последовательность покупок или продаж. Когда крупный информированный трейдер разбивает ордер на множество мелких сделок, серия становится необычно длинной. Бары серий это обнаруживают.
class TickRunBarGenerator:
"""
Generates bars when the length of a directional run exceeds expectations.
Based on Lopez de Prado (2018), Chapter 2.
Difference from imbalance bars:
- Imbalance bars track NET imbalance (buys minus sells)
- Run bars track the MAXIMUM run length (consecutive buys OR sells)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
Бары серий могут быть расширены до объёмных серий и долларовых серий.
15. CUSUM-фильтр
CUSUM-фильтр (Cumulative Sum) определяет когда сэмплировать, отслеживая кумулятивные доходности. В отличие от баров дисбаланса (работающих на сырых сделках), CUSUM можно применять к существующим 1m OHLCV-данным — тиковые данные не требуются.
class CUSUMFilterBarGenerator:
"""
Symmetric CUSUM filter for event-based sampling.
Based on Lopez de Prado (2018), Chapter 2.5.
Key advantage over Bollinger Bands: CUSUM requires a FULL
run of threshold magnitude before triggering. Bollinger Bands
trigger repeatedly when price hovers near the band.
Can be applied to 1m OHLCV data — no tick data required.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + метод тройного барьера: В фреймворке Лопеса де Прадо события CUSUM используются как точки входа для метода тройного барьера — где каждое событие запускает сделку с барьерами стоп-лосса, тейк-профита и экспирации. Для надёжной валидации таких событийных стратегий см. Walk-Forward оптимизация и Монте-Карло Bootstrap для бэктестинга.
16. Энтропийные бары
Наиболее теоретически элегантный подход: сэмплировать, когда информационное содержание (энтропия Шеннона) внутрибарного ценового ряда превышает порог.
class EntropyBarGenerator:
"""
Generates bars when the entropy of intra-bar returns exceeds
a threshold.
Based on Shannon's information theory: bars are sampled when
"new information" arrives, measured as the entropy of the
return distribution within the current bar.
This is the most theoretically "pure" information-driven bar.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
Практическое замечание: Энтропийные бары вычислительно дороги и представляют преимущественно исследовательский интерес — но для ML-стратегий они создают признаки с лучшими статистическими свойствами, поскольку каждый бар содержит приблизительно равное количество «информации».
17. Дельта-бары (поток ордеров)
Кумулятивная дельта: измерение чистой силы агрессивных покупателей против продавцов в реальном времени.
Дельта-бары сэмплируют на основе кумулятивной дельты — текущей разницы между объёмом покупок и объёмом продаж. В отличие от баров дисбаланса (использующих знаки тиков ±1), дельта-бары используют реальный поток ордеров, взвешенный по объёму.
class DeltaBarGenerator:
"""
Generates bars based on cumulative order flow delta.
Delta = Buy Volume - Sell Volume (classified by aggressor side).
Requires trade-level data with side classification
(available from Binance aggTrades, Bybit trades, etc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
Дивергенция дельты: Один из самых мощных сигналов — цена растёт, а кумулятивная дельта отрицательна (продавцы агрессивны, но цена всё равно идёт вверх, что указывает на абсорбцию лимитных покупок). Напрямую связано с подходом поведенческого отпечатка, описанным в статье Цифровой отпечаток: идентификация трейдера. Для маркет-мейкеров, использующих модель Авелланеды-Стоикова, дельта-бары предоставляют вид на инвентарный риск и давление агрессоров в реальном времени.
Кольцевой буфер базовых баров: новые данные входят, старые выходят, и агрегированная свеча всегда валидна.
Методы агрегации определяют, как базовые бары компонуются в свечи старших таймфреймов (HTF). Они независимы от типа бара — любой метод агрегации можно применить к любому базовому типу бара.
Метод A: Календарная агрегация
Агрегировать все базовые бары, попадающие в фиксированную календарную границу. «Часовая» свеча охватывает все бары с 14:00:00 по 14:59:59.
Свойства:
- Все участники рынка видят одинаковые границы — обязательно для анализа рыночной структуры, поддержки/сопротивления, PIQ-триггеров
- Проблема холодного старта: неполная свеча после перезапуска
- Естественна для таймбаров (именно это биржи предоставляют изначально)
- Работает и для нетаймовых баров: «все объёмные бары, закрывшиеся между 14:00 и 15:00» = календарная часовая свеча из объёмных баров
Метод B: Скользящее окно
Агрегировать последние N закрытых базовых баров, пересчитывая при каждом новом баре. «Часовая» скользящая свеча = последние 60 закрытых минутных таймбаров, обновляемых каждую минуту.
Атомарная единица — закрытый базовый бар. Этот выбор дизайна обеспечивает:
- Нет холодного старта. После N баров свеча валидна. Никакого шума неполных свечей.
- Паритет с бэктестом. Если live-торговля использует ту же атомарную единицу, что и движок бэктеста, сигналы идентичны.
- Простая валидация. Одно правило:
if buffer not full: skip.
import numpy as np
class RollingCandleAggregator:
"""
Produces rolling higher-timeframe candles from closed base bars.
Works with ANY bar type: time bars, tick bars, volume bars,
dollar bars, delta bars — anything that produces OHLCV output.
Example: RollingCandleAggregator(window=60) with 1m time bars
produces a "1h" candle updated every minute.
Example: RollingCandleAggregator(window=24) with volume bars
produces a candle spanning the last 24 volume bars.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Add a closed base bar. Returns aggregated candle
only when buffer is full (= candle is valid).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
Компромисс фазового сдвига: Скользящие свечи закрываются в :37, если вы стартовали в :37, а не в :00, как у всех остальных. Это важно для стратегий, зависящих от уровней, видимых толпой. Решение: используйте оба — календарную агрегацию для рыночной структуры, скользящую для сигналов.
Метод C: Адаптивное скользящее окно
Как скользящее окно, но размер окна адаптируется к текущей волатильности. Спокойные рынки → более широкое окно (больше сглаживания). Волатильные рынки → более узкое окно (быстрее реакция).
class AdaptiveRollingAggregator:
"""
Rolling window where the window size adapts to volatility.
Works with any base bar type. Uses ATR of recent bars
as the volatility measure.
Low volatility → wider window (more smoothing, fewer signals)
High volatility → narrower window (faster reaction)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
Каждый базовый тип бара может быть скомбинирован с любым методом агрегации. Некоторые комбинации стандартны (календарные таймбары = то, что дают биржи), другие экзотичны, но мощны.
Примеры комбинаций
| Базовый тип бара | Календарная | Скользящая | Адаптивная |
|---|---|---|---|
| Таймбары | Стандартные биржевые свечи | Всегда валидный HTF, без холодного старта | Таймфрейм, адаптивный к волатильности |
| Объёмные | «Все объёмные бары за этот час» | Последние 24 объёмных бара | Более широкое окно на спокойных рынках |
| Долларовые | Часовой агрегат долларовых баров | Последние N долларовых баров | Адаптивные долларовые окна |
| TIB | Часовой агрегат дисбаланса | Последние N событий дисбаланса | Быстрая реакция в волатильных режимах |
| Дельта | Часовой чистый поток ордеров | Скользящий снимок дельты | Адаптивное окно потока |
| Renko | «Кирпичи за этот час» | Последние N кирпичей | Адаптивное количество кирпичей |
Гибридный движок: календарная + скользящая
На практике вам нужны обе агрегации одновременно. Затраты памяти минимальны — два буфера deque на таймфрейм на символ.
class HybridCandleEngine:
"""
Maintains both calendar-aligned and rolling candles
for any base bar type.
Calendar candles: for market structure, support/resistance, PIQ.
Rolling candles: for indicators, signal generation, entries/exits.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Process any base bar type — time, volume, tick, delta, etc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
Гибрид время-объём: календарная с разбиением по объёму
Особый вариант агрегации: календарные свечи, которые принудительно закрываются раньше, когда объём превышает порог. Сохраняет временную синхронизацию, адаптируясь к всплескам активности.
class TimeVolumeHybridGenerator:
"""
Calendar-aligned candles that split when volume spikes.
Rule: close the candle at the calendar boundary OR when
accumulated volume exceeds vol_threshold, whichever comes first.
Works with any base bar type — the volume trigger adds an
extra split dimension on top of calendar alignment.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
Практическая агрегация: каскадная предзагрузка
Каскадная предзагрузка: составление дневных свечей из часовых, а часовых из минутных — обходя лимиты API.
Биржи ограничивают объём исторических данных. Binance выдаёт ~1000 свечей за один REST-запрос, OKX ограничивает до 300. Если вам нужна скользящая 1D-свеча (1440 минут), вы не всегда можете получить достаточно 1m-истории. Для стриминга сделок и стаканов в реальном времени через WebSocket см. CCXT Pro WebSocket методы.
Решение: каскадная агрегация — строить старшие таймфреймы из максимально доступного разрешения на каждом уровне, а затем сшивать их.
Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│ ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│ └── 1 partial hour:
│ └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain
Это работает, потому что агрегация OHLCV композитна: high дневной свечи — это максимум 24 часовых high, которые являются максимумом 1440 минутных high.
Лимиты бирж
| Биржа | Макс. 1m-свечей | Макс. 1h-свечей | Особые интервалы |
|---|---|---|---|
| Binance | 1 000 | 1 000 | 1m–1M, полный набор |
| Bybit | 1 000 | 1 000 | 1–720, D/W/M |
| OKX | 300 | 300 | 1m–1M (более ограничено) |
| Gate.io | 1 000 | 1 000 | 10s–30d |
Проверка согласованности агрегации
Часовая свеча из REST API может не совпадать с тем, что вы вычислили бы из 60 минутных свечей. Всегда проверяйте:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
Если проверка стабильно не проходит, всегда агрегируйте из 1m самостоятельно — никогда не доверяйте HTF-свечам биржи для паритета бэктеста.
Сравнительная матрица
Ось 1: Базовые типы баров
| # | Тип бара | Триггер | Требуются тиковые данные | Лучше всего для |
|---|---|---|---|---|
| 1 | Таймбар | Фиксированный интервал | Нет | Рыночная структура, поведение толпы |
| 2 | Тиковый | N сделок | Да | ML-признаки, равномерная выборка |
| 3 | Объёмный | N единиц объёма | Да | Нормализованный анализ активности |
| 4 | Долларовый | $N номинала | Да | Кросс-активное сравнение |
| 5 | Renko | Цена ± N единиц | Нет | Следование за трендом, фильтрация шума |
| 6 | Диапазонный | High-Low >= N | Да | Обнаружение пробоев |
| 7 | Волатильностный | Адаптивный диапазон | Да | Режимно-адаптивный анализ |
| 8 | Heikin-Ashi | Трансформация | Нет | Подтверждение тренда (синтетические цены!) |
| 9 | Kagi | Разворот цены | Нет | Структура спроса/предложения |
| 10 | Line Break | Пробой N линий | Нет | Фильтр макротренда |
| 11 | Point & Figure | Клетка + разворот | Нет | Карта поддержки/сопротивления |
| 12 | TIB | Тиковый дисбаланс | Да | Обнаружение информированного потока |
| 13 | VIB | Объёмный дисбаланс | Да | Обнаружение крупных ордеров |
| 14 | Run | Длина серии | Да | Обнаружение дробления ордеров |
| 15 | CUSUM | Кумулятивная доходность | Нет (1m closes) | Структурные разрывы |
| 16 | Энтропийный | Энтропия Шеннона | Да | ML-исследования, чистота признаков |
| 17 | Delta | Дельта потока ордеров | Да (aggTrades) | Анализ потока агрессоров |
Ось 2: Методы агрегации
| Метод | Привязка | Холодный старт | Фазовый сдвиг | Лучше всего для |
|---|---|---|---|---|
| Календарная | Часы | Риск неполного бара | Нет (синхронно с толпой) | Рыночная структура, PIQ, S/R |
| Скользящая | N баров | Нет (после прогрева) | Да (сдвинуто от :00) | Индикаторы, сигналы |
| Адаптивная | Волатильность N | После калибровки ATR | Да | Стратегии, адаптивные к волатильности |
Практические рекомендации
Четырёхслойная архитектура свечей: скользящие сигналы, календарная структура, микроструктура потока и трендовые фильтры.
Если ваш движок бэктеста работает на 1m OHLCV-данных:
- Скользящие таймбары — простейшее улучшение. Никаких дополнительных данных. Устраняет холодный старт.
- Гибридные (скользящие + календарные) таймбары — календарные для рыночной структуры, скользящие для сигналов.
- CUSUM-фильтр — работает на 1m closes, без тиковых данных. «Цена сдвинулась достаточно, чтобы это было интересно».
Если у вас есть тиковые/сделочные данные:
- Долларовые бары + скользящие — рекомендуемый стандарт из литературы по количественным финансам.
- Бары объёмного дисбаланса + скользящие — обнаруживают информированный поток, чаще сэмплируют во время значимых событий.
- Дельта-бары + календарные — если есть классификация стороны агрессора, наиболее прямой взгляд на то, кто двигает рынок.
Как фильтры (применяйте Heikin-Ashi или Line Break поверх любой комбинации базовый тип + агрегация):
- Heikin-Ashi поверх скользящих объёмных баров — чистые трендовые сигналы на данных, нормализованных по активности.
- Line Break / Kagi поверх дневных календарных баров — фильтр макротренда.
Для Marketmaker.cc конкретно — многослойный подход:
- Слой 1 (сигналы): Скользящая агрегация таймбаров для индикаторов и сигналов входа/выхода. Без холодного старта, идеальный паритет с бэктестом.
- Слой 2 (рыночная структура): Календарные таймбары для поддержки/сопротивления, анализа часовых закрытий и PIQ-триггеров.
- Слой 3 (микроструктура): Бары объёмного дисбаланса + дельта-бары из сырого потока сделок для обнаружения информированного потока, дробления ордеров и предвосхищения крупных движений. См. также Цифровой отпечаток: идентификация трейдера для распознавания поведенческих паттернов в данных потока ордеров.
- Слой 4 (трендовый фильтр): Трансформация Heikin-Ashi на скользящих барах или Line Break на 4h-календарных закрытиях для удержания сигналов в направлении макротренда.
Заключение
Построение свечей — это не один выбор, а два независимых решения:
-
Какой тип бара? Таймбары фиксируют часовые интервалы. Бары активности (тиковые, объёмные, долларовые) фиксируют рыночное участие. Ценовые (Renko, диапазонные, волатильностные) фиксируют движения. Информационные (дисбаланс, серии, CUSUM, энтропия) фиксируют поступление новой информации. Бары потока ордеров (дельта) фиксируют агрессивное давление.
-
Как агрегировать в старшие таймфреймы? Календарная агрегация синхронизирует с толпой. Скользящая устраняет холодный старт. Адаптивная реагирует на волатильность.
Стандартная «часовая свеча с Binance» — это лишь одна ячейка в матрице 17×3. Остальные 50 комбинаций доступны каждому, кто готов их реализовать. Для продакшн-системы ответ — «подбирайте правильную комбинацию для каждого слоя вашего движка принятия решений».
Атомарная единица — закрытый базовый бар — остаётся фундаментом. Всё остальное — агрегация.
Подробнее о точности бэктестинга с детализированными данными читайте в статье Адаптивный Drill-Down: бэктест с переменной гранулярностью. О влиянии предвычисления индикаторов на мультитаймфреймовые стратегии — в статье Агрегированный Parquet-кэш.
Полезные ссылки
- Lopez de Prado — Advances in Financial Machine Learning (2018)
- Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
- mlfinlab — библиотека Python, реализующая информационные бары
- Binance — исторические рыночные данные
- Apache Parquet — колоночный формат хранения
Цитирование
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {17 × 3: Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}
MarketMaker.cc Team
Количественные исследования и стратегии