Tipi di Bar e Metodi di Aggregazione per il Trading Algoritmico
Ogni grafico a candele che hai mai visto su Binance, TradingView o qualsiasi interfaccia di exchange è costruito nello stesso modo: aggrega i trade all'interno di una finestra temporale fissa — 1 minuto, 5 minuti, 1 ora — e produce una bar OHLCV. Questo è così diffuso che la maggior parte dei trader non lo mette mai in discussione. Ma per il trading algoritmico, la scelta del tipo di bar e del metodo di aggregazione sono due decisioni indipendenti — e la maggior parte dei sistemi le confonde.
Questo articolo separa i due assi della costruzione delle candele: che tipo di bar costruisci (17 tipi) e come le aggreghi in timeframe superiori (3 metodi). La combinazione dà 51 possibili configurazioni, ciascuna con proprietà diverse per il backtesting, il trading live e la generazione di segnali.
Per un'introduzione a come i trade grezzi diventano candele standard, vedi Trading Candles Demystified.
TL;DR
- La costruzione delle candele ha due assi indipendenti: tipo di bar e metodo di aggregazione
- 17 tipi base di bar: tempo, tick, volume, dollaro, Renko, range, volatilità, Heikin-Ashi, Kagi, Line Break, P&F, tick imbalance (TIB), volume imbalance (VIB), run, CUSUM, entropia, delta
- 3 metodi di aggregazione: allineato al calendario, rolling window, rolling adattivo
- 17 × 3 = 51 combinazioni possibili, ciascuna con proprietà diverse
- La maggior parte dei sistemi usa solo una combinazione: bar temporali allineate al calendario. Le altre 50 sono inesplorate.
- Raccomandazione pratica: usa più combinazioni in strati — bar temporali rolling per i segnali, bar temporali calendario per la struttura di mercato, bar information-driven per la microstruttura
Due Assi della Costruzione delle Candele
La visione tradizionale mette tutti i tipi di bar in un elenco piatto: bar temporali, bar tick, bar volume, Renko, ecc. Questo è fuorviante. In realtà ci sono due scelte ortogonali:
Asse 1 — Tipo di Bar Base (17 tipi): Come decidi quando una nuova bar si chiude? Dopo un intervallo di tempo fisso? Dopo N trade? Dopo un movimento di prezzo? Quando il contenuto informativo cambia? Questo determina cosa significa "una bar".
Asse 2 — Metodo di Aggregazione (3 metodi): Come componi le bar base in candele di timeframe superiore? Allineate ai confini del calendario (00:00, 01:00, ...)? Usando una rolling window delle ultime N bar? Adattando la dimensione della finestra alla volatilità?
Questi due assi sono indipendenti. Puoi avere:
- Bar tick allineate al calendario — aggrega le bar tick chiuse tra le 14:00 e le 14:59 in una singola candela oraria
- Bar volume rolling — prende le ultime 24 bar volume indipendentemente da quando si sono chiuse
- Bar delta adattive — usa una finestra guidata dalla volatilità sulle bar delta
La "candela di 1 ora" standard è solo un punto in questa matrice 17×3: bar temporali + allineamento al calendario. Ogni altra combinazione è un'alternativa da considerare.
1. Bar Temporali (Standard)
Densità informativa disomogenea: i confini temporali rigidi trattano allo stesso modo le ore tranquille con 200 trade e le ore di annunci con 50.000 trade.
Il default. Una nuova bar si forma dopo un intervallo di tempo fisso: 1 minuto, 5 minuti, 1 ora. Ogni exchange le fornisce nativamente.
Proprietà:
- Durante la sessione asiatica (00:00–08:00 UTC), una candela di 1 ora potrebbe contenere 200 trade. Durante un annuncio di listing su Binance, la stessa finestra potrebbe contenerne 50.000. Le bar temporali trattano entrambi come equivalenti. Rilevare tali picchi di attività è fondamentale per la protezione dei bot — vedi Rilevamento di Anomalie per i Bot di Trading.
- Tutti i partecipanti al mercato vedono gli stessi confini delle candele — un punto di Schelling. Questo rende le bar temporali essenziali per analizzare il comportamento della folla.
- Gli indicatori calcolati su candele parziali (dopo un riavvio) producono valori errati.
from datetime import datetime
def time_until_valid_hourly_candle():
"""Quanto tempo manca alla prima candela oraria completa dopo il riavvio."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
2–4. Bar Basate sull'Attività
Bar tick, volume e dollaro: tre modi per lasciare che sia la partecipazione al mercato — non l'orologio — a determinare i confini delle bar.
Invece di campionare a intervalli di tempo fissi, si campiona dopo una quantità fissa di attività di mercato. Questo produce bar con un "contenuto informativo" approssimativamente uguale indipendentemente dall'ora del giorno.
2. Bar Tick
Una nuova bar si forma dopo ogni N trade (tick). Durante l'alta attività, le bar si formano rapidamente. Durante i periodi tranquilli, una singola bar potrebbe estendersi per ore.
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Genera una nuova bar ogni `threshold` trade.
Ogni bar contiene un numero uguale di "opinioni" di mercato.
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
Pro: Si adatta naturalmente all'attività di mercato. I rendimenti delle bar tick tendono ad essere più vicini a una distribuzione normale rispetto ai rendimenti delle bar temporali — una proprietà che migliora le prestazioni di molti modelli statistici.
Contro: Richiede un flusso grezzo di trade (non disponibile da tutti i fornitori di dati per i dati storici). Il timing delle bar è imprevedibile — non puoi dire "la prossima bar si chiuderà alle X."
3. Bar Volume
Una nuova bar si forma dopo che N contratti (o monete, nel crypto) sono stati scambiati. Simile alle bar tick, ma ponderata per la dimensione del trade — un singolo trade da 100 BTC contribuisce 100 volte di più rispetto a un trade da 1 BTC.
class VolumeBarGenerator:
"""
Genera una nuova bar ogni `threshold` unità di volume.
Normalizza per la dimensione del trade: un ordine grande ≠ un ordine piccolo.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
4. Bar Dollaro
Una nuova bar si forma dopo che un valore nozionale fisso (in USD/USDT) è stato scambiato. La più robusta delle bar basate sull'attività perché normalizza sia per il conteggio dei trade che per il livello di prezzo.
Considera: se ETH passa da 4.000, vendere 4.000 ma 10 ETH a $1.000. Le bar volume tratterebbero questi casi in modo diverso; le bar dollaro li trattano allo stesso modo.
class DollarBarGenerator:
"""
Genera una nuova bar ogni `threshold` dollari (USDT) di volume nozionale.
Normalizzazione più robusta: indipendente dal livello di prezzo.
Lopez de Prado (2018) raccomanda le bar dollaro come default
per la maggior parte delle applicazioni quantitative.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
Scegliere la Soglia
La soglia per le bar basate sull'attività dovrebbe produrre all'incirca lo stesso numero di bar al giorno delle bar temporali che stai sostituendo. Per BTCUSDT su Binance:
| Tipo di Bar | Soglia Tipica | ~Bar/Giorno | TF Equivalente |
|---|---|---|---|
| Tick | 1.000 trade | ~1.400 | ~1m |
| Tick | 50.000 trade | ~28 | ~1h |
| Volume | 100 BTC | ~600 | ~2-3m |
| Volume | 2.400 BTC | ~25 | ~1h |
| Dollaro | $1M | ~1.400 | ~1m |
| Dollaro | $50M | ~28 | ~1h |
Questi numeri sono approssimativi e cambiano drasticamente con il regime di mercato. Durante un rally o un crollo, le bar basate sull'attività producono 5-10 volte più bar del solito — il che è esattamente il punto.
5–7. Bar Basate sul Prezzo
Mattoni Renko, bar range e bar volatilità: campionamento solo quando il prezzo si muove abbastanza da essere significativo.
Le bar basate sul prezzo ignorano sia il tempo che l'attività. Una nuova bar si forma solo quando il prezzo si muove di una quantità specificata. Questo filtra naturalmente il rumore laterale ed evidenzia i trend.
5. Bar Renko
Un nuovo "mattone" Renko si forma quando il prezzo di chiusura si muove di almeno N unità rispetto alla chiusura del mattone precedente. I mattoni hanno sempre la stessa dimensione, creando una rappresentazione visiva pulita della direzione del trend.
class RenkoBarGenerator:
"""
Genera mattoni Renko basati sul movimento di prezzo.
Proprietà chiave: durante il movimento laterale, non si formano nuovi mattoni.
Durante i trend forti, i mattoni si formano rapidamente.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
Renko Dinamico usa l'ATR (Average True Range) invece di una dimensione del mattone fissa, adattandosi automaticamente alla volatilità.
6. Bar Range
Ogni bar ha un range fisso alto-basso. Quando il range viene superato, la bar si chiude e ne inizia una nuova. A differenza di Renko, le bar range includono le ombre e possono mostrare la volatilità intra-bar.
class RangeBarGenerator:
"""
Genera bar con un range fisso alto-basso.
Differenza da Renko: le bar range mostrano l'OHLC completo all'interno
del range, non solo la direzione del mattone. Più ricche di informazioni.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
Differenza chiave tra Renko e bar Range: Renko traccia solo i prezzi di chiusura e mostra la direzione; le bar range tracciano il range completo del prezzo e mostrano la struttura all'interno della bar. Le bar range sono generalmente più utili per il trading algoritmico perché preservano le informazioni alto-basso necessarie per la simulazione di stop-loss e take-profit.
7. Bar Volatilità
Una nuova bar si forma quando la volatilità intra-bar raggiunge una soglia dinamica — ad esempio, un multiplo dell'ATR recente. A differenza delle bar range (soglia fissa), le bar volatilità si adattano alle condizioni di mercato.
class VolatilityBarGenerator:
"""
Genera bar quando la volatilità intra-bar raggiunge una soglia.
Simile alle bar range, ma la soglia si adatta alle condizioni di mercato
usando una misura ATR rolling. Nei mercati calmi, le bar richiedono meno
movimento assoluto per chiudersi; nei mercati volatili, di più.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
8. Heikin-Ashi (Trasformazione Smussata)
Heikin-Ashi: la media trasforma le candele rumorose in segnali di trend fluidi — ma a costo delle informazioni esatte sul prezzo.
Heikin-Ashi (giapponese per "bar media") non è un tipo di bar — è una trasformazione che può essere applicata sopra qualsiasi tipo di bar base. Smussa le candele mediando i valori della bar corrente e precedente:
- HA Close = (Open + High + Low + Close) / 4
- HA Open = (Previous HA Open + Previous HA Close) / 2
- HA High = max(High, HA Open, HA Close)
- HA Low = min(Low, HA Open, HA Close)
I trend appaiono come sequenze di candele dello stesso colore senza ombre inferiori (uptrend) o senza ombre superiori (downtrend).
class HeikinAshiTransformer:
"""
Trasforma le candele OHLCV standard in candele Heikin-Ashi.
Può essere applicata sopra QUALSIASI tipo di bar: bar temporali, bar volume,
bar rolling, ecc. È una trasformazione, non un metodo di campionamento.
ATTENZIONE: i prezzi HA sono sintetici — non rappresentano prezzi
realmente scambiati. Non usare mai la chiusura HA per il posizionamento
degli ordini o il calcolo del PnL. Usa HA solo per la generazione dei
segnali, poi esegui ai prezzi reali.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Trasforma un'intera serie. Reimposta prima lo stato."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Segnale di trend HA semplice.
Restituisce:
+1: rialzista (N candele HA verdi consecutive senza ombra inferiore)
-1: ribassista (N candele HA rosse consecutive senza ombra superiore)
0: nessun trend chiaro
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
Avvertenza critica per il backtesting: I prezzi Heikin-Ashi sono sintetici. Se il tuo backtest usa la chiusura HA come prezzo di ingresso, i risultati saranno sbagliati. Usa sempre HA solo per la generazione di segnali ed esegui ai prezzi OHLC reali.
Quando HA è utile: Strategie trend-following che necessitano di segnali puliti di "rimani dentro". Applica HA sopra qualsiasi tipo di bar base — bar temporali, bar volume, bar dollaro — per filtrare i falsi crossover.
Quando HA è dannoso: Qualsiasi strategia che necessita di livelli di prezzo precisi — supporto/resistenza, analisi del book degli ordini, PIQ (Position In Queue). La media distrugge le informazioni esatte sul prezzo.
9–11. Grafici di Inversione Giapponesi
Kagi, Line Break e Point & Figure: metodi di grafico privi di tempo che si concentrano puramente sulla struttura del prezzo.
Questi sono metodi di grafico giapponesi tradizionali (insieme a Renko) che scartano completamente il tempo e si concentrano sulla struttura del prezzo.
9. Grafici Kagi
I grafici Kagi consistono in linee verticali che cambiano direzione quando il prezzo inverte di una quantità specificata. Le linee cambiano spessore quando il prezzo rompe un precedente massimo (spessa = "yang" = domanda) o un precedente minimo (sottile = "yin" = offerta).
class KagiChartGenerator:
"""
Genera linee di grafico Kagi basate sulle inversioni di prezzo.
A differenza di Renko (dimensione del mattone fissa), Kagi traccia l'ampiezza
effettiva di ogni movimento e cambia lo spessore della linea nei punti di breakout.
Utile per identificare le rotture di supporto/resistenza e
i cambiamenti di domanda/offerta senza il rumore del tempo.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=su, -1=giù
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (spessa) o 'yin' (sottile)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
10. Grafici Line Break
I grafici Line Break disegnano una nuova linea (box) solo quando il prezzo di chiusura supera il massimo o il minimo delle ultime N linee (tipicamente 3). Non viene disegnata nessuna nuova linea se il prezzo rimane all'interno del range.
class LineBreakGenerator:
"""
Genera bar Line Break (Three Line Break per default).
Una nuova bar viene disegnata solo quando la chiusura supera il massimo o il minimo
delle ultime N bar. Filtra il rumore minore richiedendo che il prezzo
rompa attraverso un range multi-bar.
Il parametro 'N' (line_count) controlla la sensibilità:
- N=2: più sensibile, più bar, più rumore
- N=3: standard (Three Line Break)
- N=4+: meno sensibile, meno bar, segnali più forti
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
11. Grafici Point & Figure
I grafici Point & Figure (P&F) usano colonne di X (prezzi in salita) e O (prezzi in discesa). Il cambio di colonna richiede un'inversione tipicamente di 3 dimensioni box. Uno dei metodi più antichi per filtrare il rumore e identificare supporto/resistenza.
class PointAndFigureGenerator:
"""
Genera dati per grafici Point & Figure.
Colonna X: prezzo che sale per incrementi di box_size.
Colonna O: prezzo che scende per incrementi di box_size.
Cambio di colonna: richiede il movimento di reversal_boxes * box_size
nella direzione opposta.
Impostazione classica: box_size basato su ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
Kagi, Line Break e P&F nel trading algoritmico: Principalmente usati per il rilevamento del trend a lungo termine e l'identificazione di supporto/resistenza. Come livello filtro — "non prendere segnali long quando il grafico Kagi è in modalità yin" — aggiungono valore allineando i trade con la struttura macro.
12–14. Bar Information-Driven
Bar imbalance, bar run, filtri CUSUM e bar entropia: campionamento quando il mercato ci dice che qualcosa è cambiato.
L'approccio più sofisticato, da Advances in Financial Machine Learning di Marcos Lopez de Prado (2018). L'intuizione chiave: campiona quando arrivano nuove informazioni al mercato, non a intervalli fissi.
12. Tick Imbalance Bars (TIB)
Se il mercato è in equilibrio, i trade avviati dagli acquirenti e dai venditori dovrebbero bilanciarsi approssimativamente. Quando lo squilibrio supera le nostre aspettative, qualcosa è cambiato. Campiona una bar in quel momento.
Ogni trade viene classificato come avviato dall'acquirente (+1) o dal venditore (-1) usando la regola del tick. Tracciamo lo squilibrio cumulativo θ e campionamo quando |θ| supera una soglia dinamica.
class TickImbalanceBarGenerator:
"""
Genera bar quando lo squilibrio cumulativo dei tick supera
i livelli attesi — cioè quando arrivano "nuove informazioni".
Basato su Lopez de Prado (2018), Capitolo 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classifica il trade come acquisto (+1) o vendita (-1) usando la regola del tick."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
13. Volume Imbalance Bars (VIB)
Estensione dei TIB: invece di contare ogni trade come ±1, si pesa per il volume firmato. Un acquisto da 100 BTC contribuisce +100, una vendita da 1 BTC contribuisce -1. Cattura i grandi ordini informati che potrebbero essere suddivisi in molti piccoli trade.
class VolumeImbalanceBarGenerator:
"""
Come i TIB, ma usa il volume firmato invece dei tick firmati.
Cattura l'intuizione che un segnale di acquisto da 100 BTC è 100 volte più
informativo di un segnale di acquisto da 1 BTC.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
Il Problema dell'Esplosione
Un problema noto con le bar imbalance: la soglia basata su EWMA può entrare in un ciclo di feedback positivo. La soluzione: limitare con i vincoli min_ticks e max_ticks.
self.expected_ticks = max(
self.min_ticks, # Pavimento: mai meno di 100 tick
min(
self.max_ticks, # Soffitto: mai più di 50000 tick
new_expected_ticks
)
)
14. Run Bars
Le run bar tracciano la lunghezza della corsa direzionale corrente — la più lunga sequenza consecutiva di acquisti o vendite. Quando un grande trader informato suddivide un ordine in molti piccoli trade, la sequenza diventa insolitamente lunga. Le run bar rilevano questo.
class TickRunBarGenerator:
"""
Genera bar quando la lunghezza di una corsa direzionale supera le aspettative.
Basato su Lopez de Prado (2018), Capitolo 2.
Differenza dalle bar imbalance:
- Le bar imbalance tracciano lo squilibrio NETTO (acquisti meno vendite)
- Le run bar tracciano la lunghezza MASSIMA della corsa (acquisti OPPURE vendite consecutivi)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
Le run bar possono essere estese a run di volume e run di dollaro.
15. Bar Filtro CUSUM
Il filtro CUSUM (Cumulative Sum) determina quando campionare tracciando i rendimenti cumulativi. A differenza delle bar imbalance (che lavorano sui trade grezzi), CUSUM può essere applicato ai dati OHLCV 1m esistenti — non sono richiesti dati tick.
class CUSUMFilterBarGenerator:
"""
Filtro CUSUM simmetrico per il campionamento basato su eventi.
Basato su Lopez de Prado (2018), Capitolo 2.5.
Vantaggio chiave rispetto alle Bande di Bollinger: CUSUM richiede
un'intera corsa di ampiezza threshold prima di attivarsi. Le Bande
di Bollinger si attivano ripetutamente quando il prezzo si aggira
vicino alla banda.
Può essere applicato ai dati OHLCV 1m — non sono richiesti dati tick.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + Metodo Triple Barrier: Nel framework di Lopez de Prado, gli eventi CUSUM vengono usati come punti di ingresso per il metodo Triple Barrier — dove ogni evento attiva un trade con stop-loss, take-profit e barriere di scadenza. Per una validazione robusta di tali strategie event-driven, vedi Walk-Forward Optimization e Monte Carlo Bootstrap per il Backtesting.
16. Bar Entropia
L'approccio più elegante dal punto di vista teorico: campiona quando il contenuto informativo (entropia di Shannon) della serie di prezzi intra-bar supera una soglia.
class EntropyBarGenerator:
"""
Genera bar quando l'entropia dei rendimenti intra-bar supera
una soglia.
Basato sulla teoria dell'informazione di Shannon: le bar vengono campionate
quando arrivano "nuove informazioni", misurate come l'entropia della
distribuzione dei rendimenti all'interno della bar corrente.
Questa è la bar information-driven più "pura" dal punto di vista teorico.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
Nota pratica: Le bar entropia sono computazionalmente costose e sono principalmente di interesse per la ricerca — ma per le strategie basate su ML, producono feature con migliori proprietà statistiche perché ogni bar contiene approssimativamente uguale "informazione."
17. Bar Delta (Order Flow)
Delta cumulativo: misurare la forza netta degli acquirenti aggressivi rispetto ai venditori in tempo reale.
Le bar delta campionano in base al delta cumulativo — la differenza progressiva tra il volume di acquisto e il volume di vendita. A differenza delle bar imbalance (che usano segni di tick ±1), le bar delta usano il flusso di ordini effettivo ponderato per il volume.
class DeltaBarGenerator:
"""
Genera bar basate sul delta cumulativo dell'order flow.
Delta = Volume di Acquisto - Volume di Vendita (classificato per lato aggressore).
Richiede dati a livello di trade con classificazione del lato
(disponibile da Binance aggTrades, Bybit trades, ecc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
Divergenza del delta: Uno dei segnali più potenti — il prezzo sale mentre il delta cumulativo è negativo (i venditori sono aggressivi ma il prezzo sale comunque, indicando assorbimento degli acquisti limit). Direttamente rilevante per l'approccio di fingerprinting comportamentale descritto nell'articolo Impronta Digitale: Identificazione del Trader. Per i market maker che usano il modello Avellaneda-Stoikov, le bar delta forniscono una visione in tempo reale del rischio di inventario e della pressione degli aggressori.
Un buffer circolare di bar base: i nuovi dati entrano, i vecchi escono, e la candela aggregata è sempre valida.
I metodi di aggregazione determinano come le bar base vengono composte in candele di timeframe superiore (HTF). Sono indipendenti dal tipo di bar — puoi applicare qualsiasi metodo di aggregazione a qualsiasi tipo di bar base.
Metodo A: Aggregazione Allineata al Calendario
Aggrega tutte le bar base che rientrano in un confine di calendario fisso. La candela "di 1 ora" copre tutte le bar dalle 14:00:00 alle 14:59:59.
Proprietà:
- Tutti i partecipanti al mercato vedono gli stessi confini — essenziale per l'analisi della struttura di mercato, supporto/resistenza, trigger PIQ
- Problema del cold start: candela parziale dopo il riavvio
- Naturale per le bar temporali (è quello che gli exchange forniscono nativamente)
- Funziona anche per le bar non temporali: "tutte le bar volume chiuse tra le 14:00 e le 15:00" = una candela oraria allineata al calendario da bar volume
Metodo B: Aggregazione Rolling Window
Aggrega le ultime N bar base chiuse, ricalcolata ad ogni nuova bar. Una candela rolling "di 1 ora" = le ultime 60 bar temporali di 1 minuto chiuse, aggiornata ogni minuto.
L'unità atomica è la bar base chiusa. Questa scelta di design offre:
- Nessun cold start. Dopo N bar, la candela è valida. Nessun rumore da candela parziale.
- Parità del backtest. Se il trading live usa la stessa unità atomica del motore di backtesting, i segnali sono identici.
- Validazione semplice. Una regola:
if buffer not full: skip.
import numpy as np
class RollingCandleAggregator:
"""
Produce candele rolling di timeframe superiore da bar base chiuse.
Funziona con QUALSIASI tipo di bar: bar temporali, bar tick, bar volume,
bar dollaro, bar delta — tutto ciò che produce output OHLCV.
Esempio: RollingCandleAggregator(window=60) con bar temporali 1m
produce una candela "1h" aggiornata ogni minuto.
Esempio: RollingCandleAggregator(window=24) con bar volume
produce una candela che copre le ultime 24 bar volume.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Aggiunge una bar base chiusa. Restituisce la candela aggregata
solo quando il buffer è pieno (= la candela è valida).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
Trade-off dello sfasamento di fase: Le candele rolling si chiudono alle :37 se hai iniziato alle :37, non alle :00 come tutti gli altri. Questo è importante per le strategie che dipendono dai livelli visibili alla folla. La soluzione: usare entrambi — calendario per la struttura di mercato, rolling per i segnali.
Metodo C: Aggregazione Rolling Adattiva
Come rolling, ma la dimensione della finestra si adatta alla volatilità corrente. Mercati calmi → finestra più ampia (più smussamento). Mercati volatili → finestra più stretta (reazione più rapida).
class AdaptiveRollingAggregator:
"""
Rolling window dove la dimensione della finestra si adatta alla volatilità.
Funziona con qualsiasi tipo di bar base. Usa l'ATR delle bar recenti
come misura della volatilità.
Bassa volatilità → finestra più ampia (più smussamento, meno segnali)
Alta volatilità → finestra più stretta (reazione più rapida)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
Ogni tipo di bar base può essere combinato con ogni metodo di aggregazione. Alcune combinazioni sono standard (bar temporali calendar = ciò che gli exchange forniscono), altre sono esotiche ma potenti.
Esempi di Combinazione
| Tipo di Bar Base | Calendario | Rolling | Adattivo |
|---|---|---|---|
| Tempo | Candele standard dell'exchange | HTF sempre valido, nessun cold start | Timeframe vol-adattivo |
| Volume | "Tutte le bar volume di quest'ora" | Ultime 24 bar volume | Finestra più ampia nei mercati calmi |
| Dollaro | Aggregato orario di bar dollaro | Ultime N bar dollaro | Finestre dollaro adattive |
| Tick Imbalance | Aggregato orario di imbalance | Ultimi N eventi di imbalance | Reazione rapida nei regimi volatili |
| Delta | Order flow netto orario | Snapshot delta rolling | Finestra di flusso adattiva |
| Renko | "Mattoni di quest'ora" | Ultimi N mattoni | Conteggio mattoni adattivo |
Motore Ibrido: Calendario + Rolling
In pratica, vuoi sia l'aggregazione calendario che rolling simultaneamente. L'overhead di memoria è trascurabile — due buffer deque per timeframe per simbolo.
class HybridCandleEngine:
"""
Mantiene sia le candele allineate al calendario che le candele rolling
per qualsiasi tipo di bar base.
Candele calendario: per la struttura di mercato, supporto/resistenza, PIQ.
Candele rolling: per gli indicatori, la generazione di segnali, ingressi/uscite.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Processa qualsiasi tipo di bar base — tempo, volume, tick, delta, ecc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
Ibrido Tempo-Volume: Calendario con Split di Volume
Una variante speciale di aggregazione: candele allineate al calendario che si chiudono anticipatamente quando il volume supera una soglia. Mantiene la sincronizzazione temporale adattandosi ai picchi di attività.
class TimeVolumeHybridGenerator:
"""
Candele allineate al calendario che si dividono quando il volume aumenta.
Regola: chiudi la candela al confine del calendario OPPURE quando
il volume accumulato supera vol_threshold, a seconda di quale avviene prima.
Funziona con qualsiasi tipo di bar base — il trigger di volume aggiunge una
dimensione di split extra sopra l'allineamento al calendario.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
Aggregazione Pratica: Precaricamento a Cascata
Precaricamento a cascata: comporre candele giornaliere dalle orarie, e le orarie dai minuti — bypassando i limiti delle API.
Gli exchange limitano la quantità di dati storici che servono. Binance fornisce ~1000 candele per richiesta REST, OKX ha un limite di 300. Se hai bisogno di una candela rolling 1D (1440 minuti), non puoi sempre ottenere abbastanza storia 1m. Per lo streaming in tempo reale di trade e book degli ordini via WebSocket, vedi Metodi WebSocket CCXT Pro.
La soluzione: aggregazione a cascata — costruisci timeframe superiori dalla risoluzione più alta disponibile a ogni profondità, poi uniscili.
Candela rolling 1W:
├── 6 candele 1D completate ← fetch da REST /klines?interval=1d
├── 1 giorno parziale:
│ ├── 23 candele 1h completate ← fetch da REST /klines?interval=1h
│ └── 1 ora parziale:
│ └── N candele 1m completate ← fetch da REST /klines?interval=1m
└── Live: ogni nuova candela 1m chiusa aggiorna l'intera catena
Questo funziona perché l'aggregazione OHLCV è componibile: il massimo di una candela 1D è il massimo di 24 massimi 1h, che è il massimo di 1440 massimi 1m.
Limiti Multi-Exchange
| Exchange | Max Candele 1m | Max Candele 1h | Intervalli Notevoli |
|---|---|---|---|
| Binance | 1.000 | 1.000 | 1m–1M, gamma completa |
| Bybit | 1.000 | 1.000 | 1–720, D/W/M |
| OKX | 300 | 300 | 1m–1M (più restrittivo) |
| Gate.io | 1.000 | 1.000 | 10s–30d |
Verifica della Consistenza dell'Aggregazione
La candela 1h da un'API REST potrebbe non corrispondere a ciò che calcoleresti da 60 candele 1m. Valida sempre:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
Se la validazione fallisce sistematicamente, aggrega sempre da 1m tu stesso — non fidarti mai della candela HTF dell'exchange per la parità del backtest.
Matrice di Confronto
Asse 1: Tipi di Bar Base
| # | Tipo di Bar | Trigger | Dati Tick Richiesti | Migliore Per |
|---|---|---|---|---|
| 1 | Tempo | Intervallo fisso | No | Struttura di mercato, comportamento della folla |
| 2 | Tick | N trade | Sì | Feature ML, campionamento equal-opinion |
| 3 | Volume | N unità scambiate | Sì | Analisi dell'attività normalizzata |
| 4 | Dollaro | $N nozionale | Sì | Confronto cross-asset |
| 5 | Renko | Prezzo ± N unità | No | Trend following, filtraggio rumore |
| 6 | Range | High-Low ≥ N | Sì | Rilevamento breakout |
| 7 | Volatilità | Range adattivo | Sì | Analisi regime-adattiva |
| 8 | Heikin-Ashi | Trasformazione | No | Conferma trend (prezzi sintetici!) |
| 9 | Kagi | Inversione di prezzo | No | Struttura domanda/offerta |
| 10 | Line Break | Breakout N-linee | No | Filtro trend macro |
| 11 | Point & Figure | Box + inversione | No | Mappatura supporto/resistenza |
| 12 | TIB | Tick imbalance | Sì | Rilevamento flusso informato |
| 13 | VIB | Volume imbalance | Sì | Rilevamento ordini grandi |
| 14 | Run | Lunghezza della corsa | Sì | Rilevamento suddivisione ordini |
| 15 | CUSUM | Rendimento cumulativo | No (chiusure 1m) | Eventi di rottura strutturale |
| 16 | Entropia | Entropia di Shannon | Sì | Ricerca ML, purezza delle feature |
| 17 | Delta | Delta order flow | Sì (aggTrades) | Analisi del flusso aggressore |
Asse 2: Metodi di Aggregazione
| Metodo | Allineamento | Cold Start | Sfasamento di Fase | Migliore Per |
|---|---|---|---|---|
| Calendario | Orologio a muro | Rischio bar parziale | Nessuno (allineato alla folla) | Struttura di mercato, PIQ, S/R |
| Rolling | N bar | Nessuno (dopo warmup) | Sì (sfasato da :00) | Indicatori, segnali |
| Adattivo | N guidato dalla volatilità | Dopo calibrazione ATR | Sì | Strategie vol-adattive |
Raccomandazioni Pratiche
Architettura a quattro strati di candele: segnali rolling, struttura calendario, flusso di microstruttura e filtri di trend.
Se il tuo motore di backtest gira su dati OHLCV 1m:
- Bar temporali rolling — aggiornamento più semplice. Zero dati aggiuntivi. Elimina il cold start.
- Bar temporali ibride (rolling + calendario) — calendario per la struttura di mercato, rolling per i segnali.
- Filtro CUSUM — funziona su chiusure 1m, nessun dato tick. "Qualcosa si è mosso abbastanza da essere interessante."
Se hai dati tick/trade:
- Bar dollaro + rolling — default raccomandato dalla letteratura di finanza quantitativa.
- Bar volume imbalance + rolling — rileva il flusso informato, campiona di più durante eventi significativi.
- Bar delta + calendario — se hai la classificazione del lato aggressore, la visione più diretta di chi sta spingendo il mercato.
Come filtri (applica Heikin-Ashi o Line Break sopra qualsiasi combinazione base+aggregazione):
- Heikin-Ashi sulle bar volume rolling — segnali di trend puliti su dati normalizzati per attività.
- Line Break / Kagi sulle bar calendario giornaliere — filtro trend macro.
Per Marketmaker.cc specificamente — un approccio a strati:
- Strato 1 (segnali): Aggregazione rolling di bar temporali per indicatori e segnali di ingresso/uscita. Nessun cold start, perfetta parità backtest.
- Strato 2 (struttura di mercato): Bar temporali allineate al calendario per supporto/resistenza, analisi della chiusura oraria e trigger PIQ.
- Strato 3 (microstruttura): Bar volume imbalance + bar delta dal flusso grezzo di trade per rilevare il flusso informato, la suddivisione degli ordini e anticipare i grandi movimenti. Vedi anche Impronta Digitale: Identificazione del Trader per il riconoscimento di pattern comportamentali sui dati di order flow.
- Strato 4 (filtro trend): Trasformazione Heikin-Ashi sulle bar rolling, o Line Break sulle chiusure calendario 4h, per mantenere i segnali allineati con la direzione macro.
Conclusione
La costruzione delle candele non è una singola scelta — sono due decisioni indipendenti:
-
Che tipo di bar? Il tempo cattura gli intervalli dell'orologio. L'attività (tick, volume, dollaro) cattura la partecipazione al mercato. Il prezzo (Renko, range, volatilità) cattura i movimenti. L'informazione (imbalance, run, CUSUM, entropia) cattura gli arrivi di nuove informazioni. L'order flow (delta) cattura la pressione aggressiva.
-
Come aggregare nei timeframe superiori? Il calendario si allinea con la folla. Il rolling elimina il cold start. L'adattivo reagisce alla volatilità.
La "candela di 1 ora da Binance" standard è solo una cella in una matrice 17×3. Le altre 50 combinazioni sono disponibili per chiunque sia disposto a implementarle. Per un sistema di produzione, la risposta è "scegli la combinazione giusta per ogni strato del tuo motore decisionale."
L'unità atomica — la bar base chiusa — rimane la fondazione. Tutto il resto è aggregazione.
Per ulteriori informazioni sulla precisione del backtest con dati a grana fine, vedi Drill-Down Adattivo: Backtest con Granularità Variabile. Per l'impatto della precomputazione degli indicatori sulle strategie multi-timeframe, vedi Cache Parquet Aggregata.
Link Utili
- Lopez de Prado — Advances in Financial Machine Learning (2018)
- Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
- mlfinlab — libreria Python che implementa le bar information-driven
- Binance — Historical Market Data
- Apache Parquet — formato di archiviazione colonnare
Citazione
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/it/blog/post/beyond-time-bars-candle-construction},
description = {Classificazione a due assi della costruzione delle candele: 17 tipi base di bar × 3 metodi di aggregazione = 51 combinazioni, con codice di implementazione e raccomandazioni pratiche per l'algotrading crypto.}
}
Autori
Trading-systems engineer
Trading-systems engineer building bots since 2017: cross-exchange arbitrage (connected up to 30 venues), cointegration-based pairs arbitrage across spot and futures, scalping, news and sentiment-driven strategies, trend algorithms, and portfolio management and balancing algorithms. Also builds sub-millisecond order execution, big-data warehouses, backtesting engines, AI agents, and trading interfaces (incl. open-source profitmaker.cc). Stack: JS/TS, Python, Rust/Zig/Go, DevOps, backend, frontend, architecture.