← Torna agli articoli
March 22, 2026
5 min di lettura

Tipi di Bar e Metodi di Aggregazione per il Trading Algoritmico

Tipi di Bar e Metodi di Aggregazione per il Trading Algoritmico
#algotrading
#candele
#microstruttura di mercato
#Lopez de Prado
#order flow
#backtesting
#ricerca

Ogni grafico a candele che hai mai visto su Binance, TradingView o qualsiasi interfaccia di exchange è costruito nello stesso modo: aggrega i trade all'interno di una finestra temporale fissa — 1 minuto, 5 minuti, 1 ora — e produce una bar OHLCV. Questo è così diffuso che la maggior parte dei trader non lo mette mai in discussione. Ma per il trading algoritmico, la scelta del tipo di bar e del metodo di aggregazione sono due decisioni indipendenti — e la maggior parte dei sistemi le confonde.

Questo articolo separa i due assi della costruzione delle candele: che tipo di bar costruisci (17 tipi) e come le aggreghi in timeframe superiori (3 metodi). La combinazione dà 51 possibili configurazioni, ciascuna con proprietà diverse per il backtesting, il trading live e la generazione di segnali.

Per un'introduzione a come i trade grezzi diventano candele standard, vedi Trading Candles Demystified.


TL;DR

  • La costruzione delle candele ha due assi indipendenti: tipo di bar e metodo di aggregazione
  • 17 tipi base di bar: tempo, tick, volume, dollaro, Renko, range, volatilità, Heikin-Ashi, Kagi, Line Break, P&F, tick imbalance (TIB), volume imbalance (VIB), run, CUSUM, entropia, delta
  • 3 metodi di aggregazione: allineato al calendario, rolling window, rolling adattivo
  • 17 × 3 = 51 combinazioni possibili, ciascuna con proprietà diverse
  • La maggior parte dei sistemi usa solo una combinazione: bar temporali allineate al calendario. Le altre 50 sono inesplorate.
  • Raccomandazione pratica: usa più combinazioni in strati — bar temporali rolling per i segnali, bar temporali calendario per la struttura di mercato, bar information-driven per la microstruttura

Due Assi della Costruzione delle Candele

La visione tradizionale mette tutti i tipi di bar in un elenco piatto: bar temporali, bar tick, bar volume, Renko, ecc. Questo è fuorviante. In realtà ci sono due scelte ortogonali:

Asse 1 — Tipo di Bar Base (17 tipi): Come decidi quando una nuova bar si chiude? Dopo un intervallo di tempo fisso? Dopo N trade? Dopo un movimento di prezzo? Quando il contenuto informativo cambia? Questo determina cosa significa "una bar".

Asse 2 — Metodo di Aggregazione (3 metodi): Come componi le bar base in candele di timeframe superiore? Allineate ai confini del calendario (00:00, 01:00, ...)? Usando una rolling window delle ultime N bar? Adattando la dimensione della finestra alla volatilità?

Questi due assi sono indipendenti. Puoi avere:

  • Bar tick allineate al calendario — aggrega le bar tick chiuse tra le 14:00 e le 14:59 in una singola candela oraria
  • Bar volume rolling — prende le ultime 24 bar volume indipendentemente da quando si sono chiuse
  • Bar delta adattive — usa una finestra guidata dalla volatilità sulle bar delta

La "candela di 1 ora" standard è solo un punto in questa matrice 17×3: bar temporali + allineamento al calendario. Ogni altra combinazione è un'alternativa da considerare.


1. Bar Temporali (Standard)

Problema delle bar temporali a calendario Densità informativa disomogenea: i confini temporali rigidi trattano allo stesso modo le ore tranquille con 200 trade e le ore di annunci con 50.000 trade.

Il default. Una nuova bar si forma dopo un intervallo di tempo fisso: 1 minuto, 5 minuti, 1 ora. Ogni exchange le fornisce nativamente.

Proprietà:

  • Durante la sessione asiatica (00:00–08:00 UTC), una candela di 1 ora potrebbe contenere 200 trade. Durante un annuncio di listing su Binance, la stessa finestra potrebbe contenerne 50.000. Le bar temporali trattano entrambi come equivalenti. Rilevare tali picchi di attività è fondamentale per la protezione dei bot — vedi Rilevamento di Anomalie per i Bot di Trading.
  • Tutti i partecipanti al mercato vedono gli stessi confini delle candele — un punto di Schelling. Questo rende le bar temporali essenziali per analizzare il comportamento della folla.
  • Gli indicatori calcolati su candele parziali (dopo un riavvio) producono valori errati.
from datetime import datetime

def time_until_valid_hourly_candle():
    """Quanto tempo manca alla prima candela oraria completa dopo il riavvio."""
    now = datetime.utcnow()
    minutes_into_hour = now.minute
    seconds_into_minute = now.second

    wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
    wait_seconds += 3600

    return wait_seconds

2–4. Bar Basate sull'Attività

Bar basate sull'attività Bar tick, volume e dollaro: tre modi per lasciare che sia la partecipazione al mercato — non l'orologio — a determinare i confini delle bar.

Invece di campionare a intervalli di tempo fissi, si campiona dopo una quantità fissa di attività di mercato. Questo produce bar con un "contenuto informativo" approssimativamente uguale indipendentemente dall'ora del giorno.

2. Bar Tick

Una nuova bar si forma dopo ogni N trade (tick). Durante l'alta attività, le bar si formano rapidamente. Durante i periodi tranquilli, una singola bar potrebbe estendersi per ore.

from collections import deque
from dataclasses import dataclass

@dataclass
class OHLCV:
    timestamp: int
    open: float
    high: float
    low: float
    close: float
    volume: float

class TickBarGenerator:
    """
    Genera una nuova bar ogni `threshold` trade.
    Ogni bar contiene un numero uguale di "opinioni" di mercato.
    """

    def __init__(self, threshold: int = 1000):
        self.threshold = threshold
        self.trades: list[tuple[float, float]] = []  # (price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((price, qty))

        if len(self.trades) >= self.threshold:
            self._close_bar(timestamp)

    def _close_bar(self, timestamp: int):
        prices = [t[0] for t in self.trades]
        volumes = [t[1] for t in self.trades]

        bar = OHLCV(
            timestamp=timestamp,
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

Pro: Si adatta naturalmente all'attività di mercato. I rendimenti delle bar tick tendono ad essere più vicini a una distribuzione normale rispetto ai rendimenti delle bar temporali — una proprietà che migliora le prestazioni di molti modelli statistici.

Contro: Richiede un flusso grezzo di trade (non disponibile da tutti i fornitori di dati per i dati storici). Il timing delle bar è imprevedibile — non puoi dire "la prossima bar si chiuderà alle X."

3. Bar Volume

Una nuova bar si forma dopo che N contratti (o monete, nel crypto) sono stati scambiati. Simile alle bar tick, ma ponderata per la dimensione del trade — un singolo trade da 100 BTC contribuisce 100 volte di più rispetto a un trade da 1 BTC.

class VolumeBarGenerator:
    """
    Genera una nuova bar ogni `threshold` unità di volume.
    Normalizza per la dimensione del trade: un ordine grande ≠ un ordine piccolo.
    """

    def __init__(self, threshold: float = 100.0):
        self.threshold = threshold
        self.accumulated_volume = 0.0
        self.trades: list[tuple[int, float, float]] = []  # (ts, price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_volume += qty

        if self.accumulated_volume >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_volume = 0.0
        self.trades = []
        return bar

4. Bar Dollaro

Una nuova bar si forma dopo che un valore nozionale fisso (in USD/USDT) è stato scambiato. La più robusta delle bar basate sull'attività perché normalizza sia per il conteggio dei trade che per il livello di prezzo.

Considera: se ETH passa da 1.000a1.000 a 4.000, vendere 10.000diETHrichiede2,5ETHa10.000 di ETH richiede 2,5 ETH a 4.000 ma 10 ETH a $1.000. Le bar volume tratterebbero questi casi in modo diverso; le bar dollaro li trattano allo stesso modo.

class DollarBarGenerator:
    """
    Genera una nuova bar ogni `threshold` dollari (USDT) di volume nozionale.
    Normalizzazione più robusta: indipendente dal livello di prezzo.

    Lopez de Prado (2018) raccomanda le bar dollaro come default
    per la maggior parte delle applicazioni quantitative.
    """

    def __init__(self, threshold: float = 1_000_000.0):
        self.threshold = threshold
        self.accumulated_dollars = 0.0
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_dollars += price * qty

        if self.accumulated_dollars >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_dollars = 0.0
        self.trades = []
        return bar

Scegliere la Soglia

La soglia per le bar basate sull'attività dovrebbe produrre all'incirca lo stesso numero di bar al giorno delle bar temporali che stai sostituendo. Per BTCUSDT su Binance:

Tipo di Bar Soglia Tipica ~Bar/Giorno TF Equivalente
Tick 1.000 trade ~1.400 ~1m
Tick 50.000 trade ~28 ~1h
Volume 100 BTC ~600 ~2-3m
Volume 2.400 BTC ~25 ~1h
Dollaro $1M ~1.400 ~1m
Dollaro $50M ~28 ~1h

Questi numeri sono approssimativi e cambiano drasticamente con il regime di mercato. Durante un rally o un crollo, le bar basate sull'attività producono 5-10 volte più bar del solito — il che è esattamente il punto.

5–7. Bar Basate sul Prezzo

Bar basate sul prezzo Mattoni Renko, bar range e bar volatilità: campionamento solo quando il prezzo si muove abbastanza da essere significativo.

Le bar basate sul prezzo ignorano sia il tempo che l'attività. Una nuova bar si forma solo quando il prezzo si muove di una quantità specificata. Questo filtra naturalmente il rumore laterale ed evidenzia i trend.

5. Bar Renko

Un nuovo "mattone" Renko si forma quando il prezzo di chiusura si muove di almeno N unità rispetto alla chiusura del mattone precedente. I mattoni hanno sempre la stessa dimensione, creando una rappresentazione visiva pulita della direzione del trend.

class RenkoBarGenerator:
    """
    Genera mattoni Renko basati sul movimento di prezzo.

    Proprietà chiave: durante il movimento laterale, non si formano nuovi mattoni.
    Durante i trend forti, i mattoni si formano rapidamente.
    """

    def __init__(self, brick_size: float = 10.0):
        self.brick_size = brick_size
        self.bricks: list[dict] = []
        self.last_close: float | None = None

    def on_price(self, timestamp: int, price: float, volume: float = 0.0):
        if self.last_close is None:
            self.last_close = price
            return []

        new_bricks = []
        diff = price - self.last_close
        num_bricks = int(abs(diff) / self.brick_size)

        if num_bricks == 0:
            return []

        direction = 1 if diff > 0 else -1

        for i in range(num_bricks):
            brick_open = self.last_close
            brick_close = self.last_close + direction * self.brick_size

            brick = {
                'timestamp': timestamp,
                'open': brick_open,
                'high': max(brick_open, brick_close),
                'low': min(brick_open, brick_close),
                'close': brick_close,
                'volume': volume / num_bricks if num_bricks > 0 else 0,
                'direction': direction,
            }
            new_bricks.append(brick)
            self.last_close = brick_close

        self.bricks.extend(new_bricks)
        return new_bricks

Renko Dinamico usa l'ATR (Average True Range) invece di una dimensione del mattone fissa, adattandosi automaticamente alla volatilità.

6. Bar Range

Ogni bar ha un range fisso alto-basso. Quando il range viene superato, la bar si chiude e ne inizia una nuova. A differenza di Renko, le bar range includono le ombre e possono mostrare la volatilità intra-bar.

class RangeBarGenerator:
    """
    Genera bar con un range fisso alto-basso.

    Differenza da Renko: le bar range mostrano l'OHLC completo all'interno
    del range, non solo la direzione del mattone. Più ricche di informazioni.
    """

    def __init__(self, range_size: float = 20.0):
        self.range_size = range_size
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_open: float | None = None
        self.current_volume: float = 0.0
        self.current_start_ts: int = 0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_start_ts = timestamp

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        if self.current_high - self.current_low >= self.range_size:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            self.current_start_ts = timestamp

            return bar

        return None

Differenza chiave tra Renko e bar Range: Renko traccia solo i prezzi di chiusura e mostra la direzione; le bar range tracciano il range completo del prezzo e mostrano la struttura all'interno della bar. Le bar range sono generalmente più utili per il trading algoritmico perché preservano le informazioni alto-basso necessarie per la simulazione di stop-loss e take-profit.

7. Bar Volatilità

Una nuova bar si forma quando la volatilità intra-bar raggiunge una soglia dinamica — ad esempio, un multiplo dell'ATR recente. A differenza delle bar range (soglia fissa), le bar volatilità si adattano alle condizioni di mercato.

class VolatilityBarGenerator:
    """
    Genera bar quando la volatilità intra-bar raggiunge una soglia.

    Simile alle bar range, ma la soglia si adatta alle condizioni di mercato
    usando una misura ATR rolling. Nei mercati calmi, le bar richiedono meno
    movimento assoluto per chiudersi; nei mercati volatili, di più.
    """

    def __init__(
        self,
        atr_period: int = 14,
        atr_multiplier: float = 1.0,
        initial_threshold: float = 20.0,
    ):
        self.atr_period = atr_period
        self.atr_multiplier = atr_multiplier
        self.threshold = initial_threshold

        self.recent_ranges: list[float] = []
        self.current_open: float | None = None
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_volume: float = 0.0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        intra_bar_range = self.current_high - self.current_low

        if intra_bar_range >= self.threshold:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.recent_ranges.append(intra_bar_range)
            if len(self.recent_ranges) > self.atr_period:
                self.recent_ranges = self.recent_ranges[-self.atr_period:]
            if len(self.recent_ranges) >= self.atr_period:
                avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
                self.threshold = avg_range * self.atr_multiplier

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            return bar

        return None

8. Heikin-Ashi (Trasformazione Smussata)

Trasformazione Heikin-Ashi Heikin-Ashi: la media trasforma le candele rumorose in segnali di trend fluidi — ma a costo delle informazioni esatte sul prezzo.

Heikin-Ashi (giapponese per "bar media") non è un tipo di bar — è una trasformazione che può essere applicata sopra qualsiasi tipo di bar base. Smussa le candele mediando i valori della bar corrente e precedente:

  • HA Close = (Open + High + Low + Close) / 4
  • HA Open = (Previous HA Open + Previous HA Close) / 2
  • HA High = max(High, HA Open, HA Close)
  • HA Low = min(Low, HA Open, HA Close)

I trend appaiono come sequenze di candele dello stesso colore senza ombre inferiori (uptrend) o senza ombre superiori (downtrend).

class HeikinAshiTransformer:
    """
    Trasforma le candele OHLCV standard in candele Heikin-Ashi.

    Può essere applicata sopra QUALSIASI tipo di bar: bar temporali, bar volume,
    bar rolling, ecc. È una trasformazione, non un metodo di campionamento.

    ATTENZIONE: i prezzi HA sono sintetici — non rappresentano prezzi
    realmente scambiati. Non usare mai la chiusura HA per il posizionamento
    degli ordini o il calcolo del PnL. Usa HA solo per la generazione dei
    segnali, poi esegui ai prezzi reali.
    """

    def __init__(self):
        self.prev_ha_open: float | None = None
        self.prev_ha_close: float | None = None

    def transform(self, candle: OHLCV) -> OHLCV:
        ha_close = (candle.open + candle.high + candle.low + candle.close) / 4

        if self.prev_ha_open is None:
            ha_open = (candle.open + candle.close) / 2
        else:
            ha_open = (self.prev_ha_open + self.prev_ha_close) / 2

        ha_high = max(candle.high, ha_open, ha_close)
        ha_low = min(candle.low, ha_open, ha_close)

        self.prev_ha_open = ha_open
        self.prev_ha_close = ha_close

        return OHLCV(
            timestamp=candle.timestamp,
            open=ha_open,
            high=ha_high,
            low=ha_low,
            close=ha_close,
            volume=candle.volume,
        )

    def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
        """Trasforma un'intera serie. Reimposta prima lo stato."""
        self.prev_ha_open = None
        self.prev_ha_close = None
        return [self.transform(c) for c in candles]


def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
    """
    Segnale di trend HA semplice.

    Restituisce:
        +1: rialzista (N candele HA verdi consecutive senza ombra inferiore)
        -1: ribassista (N candele HA rosse consecutive senza ombra superiore)
         0: nessun trend chiaro
    """
    if len(ha_candles) < lookback:
        return 0

    recent = ha_candles[-lookback:]

    all_bullish = all(
        c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
        for c in recent
    )

    all_bearish = all(
        c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
        for c in recent
    )

    if all_bullish:
        return 1
    elif all_bearish:
        return -1
    return 0

Avvertenza critica per il backtesting: I prezzi Heikin-Ashi sono sintetici. Se il tuo backtest usa la chiusura HA come prezzo di ingresso, i risultati saranno sbagliati. Usa sempre HA solo per la generazione di segnali ed esegui ai prezzi OHLC reali.

Quando HA è utile: Strategie trend-following che necessitano di segnali puliti di "rimani dentro". Applica HA sopra qualsiasi tipo di bar base — bar temporali, bar volume, bar dollaro — per filtrare i falsi crossover.

Quando HA è dannoso: Qualsiasi strategia che necessita di livelli di prezzo precisi — supporto/resistenza, analisi del book degli ordini, PIQ (Position In Queue). La media distrugge le informazioni esatte sul prezzo.

9–11. Grafici di Inversione Giapponesi

Metodi di grafico giapponesi Kagi, Line Break e Point & Figure: metodi di grafico privi di tempo che si concentrano puramente sulla struttura del prezzo.

Questi sono metodi di grafico giapponesi tradizionali (insieme a Renko) che scartano completamente il tempo e si concentrano sulla struttura del prezzo.

9. Grafici Kagi

I grafici Kagi consistono in linee verticali che cambiano direzione quando il prezzo inverte di una quantità specificata. Le linee cambiano spessore quando il prezzo rompe un precedente massimo (spessa = "yang" = domanda) o un precedente minimo (sottile = "yin" = offerta).

class KagiChartGenerator:
    """
    Genera linee di grafico Kagi basate sulle inversioni di prezzo.

    A differenza di Renko (dimensione del mattone fissa), Kagi traccia l'ampiezza
    effettiva di ogni movimento e cambia lo spessore della linea nei punti di breakout.

    Utile per identificare le rotture di supporto/resistenza e
    i cambiamenti di domanda/offerta senza il rumore del tempo.
    """

    def __init__(self, reversal_amount: float = 10.0):
        self.reversal_amount = reversal_amount
        self.lines: list[dict] = []
        self.current_direction: int = 0  # 1=su, -1=giù
        self.current_price: float | None = None
        self.extreme_price: float | None = None
        self.prev_high: float | None = None
        self.prev_low: float | None = None
        self.line_type: str = 'yang'  # 'yang' (spessa) o 'yin' (sottile)

    def on_price(self, timestamp: int, price: float):
        if self.current_price is None:
            self.current_price = price
            self.extreme_price = price
            return None

        if self.current_direction == 0:
            if price - self.current_price >= self.reversal_amount:
                self.current_direction = 1
                self.extreme_price = price
            elif self.current_price - price >= self.reversal_amount:
                self.current_direction = -1
                self.extreme_price = price
            return None

        if self.current_direction == 1:
            if price > self.extreme_price:
                self.extreme_price = price
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
            elif self.extreme_price - price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'up',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_high = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = -1
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
                return line
        else:
            if price < self.extreme_price:
                self.extreme_price = price
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
            elif price - self.extreme_price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'down',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_low = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = 1
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
                return line

        return None

10. Grafici Line Break

I grafici Line Break disegnano una nuova linea (box) solo quando il prezzo di chiusura supera il massimo o il minimo delle ultime N linee (tipicamente 3). Non viene disegnata nessuna nuova linea se il prezzo rimane all'interno del range.

class LineBreakGenerator:
    """
    Genera bar Line Break (Three Line Break per default).

    Una nuova bar viene disegnata solo quando la chiusura supera il massimo o il minimo
    delle ultime N bar. Filtra il rumore minore richiedendo che il prezzo
    rompa attraverso un range multi-bar.

    Il parametro 'N' (line_count) controlla la sensibilità:
    - N=2: più sensibile, più bar, più rumore
    - N=3: standard (Three Line Break)
    - N=4+: meno sensibile, meno bar, segnali più forti
    """

    def __init__(self, line_count: int = 3):
        self.line_count = line_count
        self.lines: list[dict] = []

    def on_close(self, timestamp: int, close: float) -> dict | None:
        if not self.lines:
            self.lines.append({
                'timestamp': timestamp,
                'open': close,
                'close': close,
                'high': close,
                'low': close,
                'direction': 0,
            })
            return None

        lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines

        highest = max(l['high'] for l in lookback)
        lowest = min(l['low'] for l in lookback)
        last = self.lines[-1]

        new_line = None

        if close > highest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': close,
                'low': last['close'],
                'direction': 1,
            }
        elif close < lowest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': last['close'],
                'low': close,
                'direction': -1,
            }

        if new_line:
            self.lines.append(new_line)
            return new_line

        return None

11. Grafici Point & Figure

I grafici Point & Figure (P&F) usano colonne di X (prezzi in salita) e O (prezzi in discesa). Il cambio di colonna richiede un'inversione tipicamente di 3 dimensioni box. Uno dei metodi più antichi per filtrare il rumore e identificare supporto/resistenza.

class PointAndFigureGenerator:
    """
    Genera dati per grafici Point & Figure.

    Colonna X: prezzo che sale per incrementi di box_size.
    Colonna O: prezzo che scende per incrementi di box_size.
    Cambio di colonna: richiede il movimento di reversal_boxes * box_size
    nella direzione opposta.

    Impostazione classica: box_size basato su ATR, reversal_boxes = 3.
    """

    def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
        self.box_size = box_size
        self.reversal_boxes = reversal_boxes
        self.reversal_amount = box_size * reversal_boxes

        self.columns: list[dict] = []
        self.current_direction: int = 0
        self.current_top: float | None = None
        self.current_bottom: float | None = None

    def on_price(self, timestamp: int, price: float):
        if self.current_top is None:
            box_price = self._round_to_box(price)
            self.current_top = box_price
            self.current_bottom = box_price
            self.current_direction = 1
            return None

        events = []

        if self.current_direction == 1:
            while price >= self.current_top + self.box_size:
                self.current_top += self.box_size
                events.append(('X', self.current_top, timestamp))

            if price <= self.current_top - self.reversal_amount:
                col = {
                    'type': 'X',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = -1
                self.current_top = self.current_top - self.box_size
                self.current_bottom = self._round_to_box(price)
                events.append(('new_column', 'O', timestamp))

        else:
            while price <= self.current_bottom - self.box_size:
                self.current_bottom -= self.box_size
                events.append(('O', self.current_bottom, timestamp))

            if price >= self.current_bottom + self.reversal_amount:
                col = {
                    'type': 'O',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = 1
                self.current_bottom = self.current_bottom + self.box_size
                self.current_top = self._round_to_box(price)
                events.append(('new_column', 'X', timestamp))

        return events if events else None

    def _round_to_box(self, price: float) -> float:
        return round(price / self.box_size) * self.box_size

Kagi, Line Break e P&F nel trading algoritmico: Principalmente usati per il rilevamento del trend a lungo termine e l'identificazione di supporto/resistenza. Come livello filtro — "non prendere segnali long quando il grafico Kagi è in modalità yin" — aggiungono valore allineando i trade con la struttura macro.

12–14. Bar Information-Driven

Bar information-driven Bar imbalance, bar run, filtri CUSUM e bar entropia: campionamento quando il mercato ci dice che qualcosa è cambiato.

L'approccio più sofisticato, da Advances in Financial Machine Learning di Marcos Lopez de Prado (2018). L'intuizione chiave: campiona quando arrivano nuove informazioni al mercato, non a intervalli fissi.

12. Tick Imbalance Bars (TIB)

Se il mercato è in equilibrio, i trade avviati dagli acquirenti e dai venditori dovrebbero bilanciarsi approssimativamente. Quando lo squilibrio supera le nostre aspettative, qualcosa è cambiato. Campiona una bar in quel momento.

Ogni trade viene classificato come avviato dall'acquirente (+1) o dal venditore (-1) usando la regola del tick. Tracciamo lo squilibrio cumulativo θ e campionamo quando |θ| supera una soglia dinamica.

class TickImbalanceBarGenerator:
    """
    Genera bar quando lo squilibrio cumulativo dei tick supera
    i livelli attesi — cioè quando arrivano "nuove informazioni".

    Basato su Lopez de Prado (2018), Capitolo 2.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        """Classifica il trade come acquisto (+1) o vendita (-1) usando la regola del tick."""
        if self.prev_price is None:
            self.prev_price = price
            return 1

        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign

        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.imbalances.append(self.theta / len(self.trades))

        if len(self.bar_lengths) >= 2:
            alpha = 2.0 / (self.ewma_window + 1)
            self.expected_ticks = (
                alpha * self.bar_lengths[-1]
                + (1 - alpha) * self.expected_ticks
            )
            self.expected_ticks = max(
                self.min_ticks,
                min(self.max_ticks, self.expected_ticks)
            )
            self.expected_imbalance = (
                alpha * self.imbalances[-1]
                + (1 - alpha) * self.expected_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

13. Volume Imbalance Bars (VIB)

Estensione dei TIB: invece di contare ogni trade come ±1, si pesa per il volume firmato. Un acquisto da 100 BTC contribuisce +100, una vendita da 1 BTC contribuisce -1. Cattura i grandi ordini informati che potrebbero essere suddivisi in molti piccoli trade.

class VolumeImbalanceBarGenerator:
    """
    Come i TIB, ma usa il volume firmato invece dei tick firmati.

    Cattura l'intuizione che un segnale di acquisto da 100 BTC è 100 volte più
    informativo di un segnale di acquisto da 1 BTC.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.volume_imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_vol_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign * qty
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= 10:
            return self._close_bar()
        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.volume_imbalances.append(self.theta / len(self.trades))

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = (
                alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            )
            self.expected_vol_imbalance = (
                alpha * self.volume_imbalances[-1]
                + (1 - alpha) * self.expected_vol_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

Il Problema dell'Esplosione

Un problema noto con le bar imbalance: la soglia basata su EWMA può entrare in un ciclo di feedback positivo. La soluzione: limitare con i vincoli min_ticks e max_ticks.


self.expected_ticks = max(
    self.min_ticks,    # Pavimento: mai meno di 100 tick
    min(
        self.max_ticks,  # Soffitto: mai più di 50000 tick
        new_expected_ticks
    )
)

14. Run Bars

Le run bar tracciano la lunghezza della corsa direzionale corrente — la più lunga sequenza consecutiva di acquisti o vendite. Quando un grande trader informato suddivide un ordine in molti piccoli trade, la sequenza diventa insolitamente lunga. Le run bar rilevano questo.

class TickRunBarGenerator:
    """
    Genera bar quando la lunghezza di una corsa direzionale supera le aspettative.

    Basato su Lopez de Prado (2018), Capitolo 2.

    Differenza dalle bar imbalance:
    - Le bar imbalance tracciano lo squilibrio NETTO (acquisti meno vendite)
    - Le run bar tracciano la lunghezza MASSIMA della corsa (acquisti OPPURE vendite consecutivi)
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        self.bar_lengths: list[int] = []
        self.max_runs: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_max_run = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.trades.append((timestamp, price, qty))

        if sign == 1:
            self.buy_run += 1
            self.sell_run = 0
        else:
            self.sell_run += 1
            self.buy_run = 0

        self.max_buy_run = max(self.max_buy_run, self.buy_run)
        self.max_sell_run = max(self.max_sell_run, self.sell_run)

        theta = max(self.max_buy_run, self.max_sell_run)
        threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3

        if theta >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
        self.bar_lengths.append(len(self.trades))
        self.max_runs.append(max_run)

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
            self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run

        self.trades = []
        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        return bar

Le run bar possono essere estese a run di volume e run di dollaro.

15. Bar Filtro CUSUM

Il filtro CUSUM (Cumulative Sum) determina quando campionare tracciando i rendimenti cumulativi. A differenza delle bar imbalance (che lavorano sui trade grezzi), CUSUM può essere applicato ai dati OHLCV 1m esistenti — non sono richiesti dati tick.

class CUSUMFilterBarGenerator:
    """
    Filtro CUSUM simmetrico per il campionamento basato su eventi.

    Basato su Lopez de Prado (2018), Capitolo 2.5.

    Vantaggio chiave rispetto alle Bande di Bollinger: CUSUM richiede
    un'intera corsa di ampiezza threshold prima di attivarsi. Le Bande
    di Bollinger si attivano ripetutamente quando il prezzo si aggira
    vicino alla banda.

    Può essere applicato ai dati OHLCV 1m — non sono richiesti dati tick.
    """

    def __init__(self, threshold: float = 0.01):
        self.threshold = threshold
        self.s_pos = 0.0
        self.s_neg = 0.0
        self.prev_price: float | None = None
        self.buffer: list[OHLCV] = []
        self.bars: list[OHLCV] = []

    def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
        self.buffer.append(candle)

        if self.prev_price is None:
            self.prev_price = candle.close
            return None

        import math
        log_ret = math.log(candle.close / self.prev_price)
        self.prev_price = candle.close

        self.s_pos = max(0.0, self.s_pos + log_ret)
        self.s_neg = min(0.0, self.s_neg + log_ret)

        triggered = False

        if self.s_pos > self.threshold:
            self.s_pos = 0.0
            triggered = True

        if self.s_neg < -self.threshold:
            self.s_neg = 0.0
            triggered = True

        if triggered and len(self.buffer) >= 2:
            bars = self.buffer
            bar = OHLCV(
                timestamp=bars[-1].timestamp,
                open=bars[0].open,
                high=max(b.high for b in bars),
                low=min(b.low for b in bars),
                close=bars[-1].close,
                volume=sum(b.volume for b in bars),
            )
            self.bars.append(bar)
            self.buffer = []
            return bar

        return None

CUSUM + Metodo Triple Barrier: Nel framework di Lopez de Prado, gli eventi CUSUM vengono usati come punti di ingresso per il metodo Triple Barrier — dove ogni evento attiva un trade con stop-loss, take-profit e barriere di scadenza. Per una validazione robusta di tali strategie event-driven, vedi Walk-Forward Optimization e Monte Carlo Bootstrap per il Backtesting.

16. Bar Entropia

L'approccio più elegante dal punto di vista teorico: campiona quando il contenuto informativo (entropia di Shannon) della serie di prezzi intra-bar supera una soglia.

class EntropyBarGenerator:
    """
    Genera bar quando l'entropia dei rendimenti intra-bar supera
    una soglia.

    Basato sulla teoria dell'informazione di Shannon: le bar vengono campionate
    quando arrivano "nuove informazioni", misurate come l'entropia della
    distribuzione dei rendimenti all'interno della bar corrente.

    Questa è la bar information-driven più "pura" dal punto di vista teorico.
    """

    def __init__(
        self,
        entropy_threshold: float = 2.0,
        min_trades: int = 50,
        n_bins: int = 10,
    ):
        self.entropy_threshold = entropy_threshold
        self.min_trades = min_trades
        self.n_bins = n_bins
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))

        if len(self.trades) < self.min_trades:
            return None

        entropy = self._compute_entropy()

        if entropy >= self.entropy_threshold:
            return self._close_bar()

        return None

    def _compute_entropy(self) -> float:
        import math

        prices = [t[1] for t in self.trades]
        if len(prices) < 2:
            return 0.0

        returns = [
            math.log(prices[i] / prices[i-1])
            for i in range(1, len(prices))
            if prices[i-1] > 0
        ]

        if not returns:
            return 0.0

        min_r = min(returns)
        max_r = max(returns)

        if max_r == min_r:
            return 0.0

        bin_width = (max_r - min_r) / self.n_bins
        bins = [0] * self.n_bins

        for r in returns:
            idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
            bins[idx] += 1

        total = sum(bins)
        entropy = 0.0
        for count in bins:
            if count > 0:
                p = count / total
                entropy -= p * math.log2(p)

        return entropy

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

Nota pratica: Le bar entropia sono computazionalmente costose e sono principalmente di interesse per la ricerca — ma per le strategie basate su ML, producono feature con migliori proprietà statistiche perché ogni bar contiene approssimativamente uguale "informazione."

17. Bar Delta (Order Flow)

Bar delta e order flow Delta cumulativo: misurare la forza netta degli acquirenti aggressivi rispetto ai venditori in tempo reale.

Le bar delta campionano in base al delta cumulativo — la differenza progressiva tra il volume di acquisto e il volume di vendita. A differenza delle bar imbalance (che usano segni di tick ±1), le bar delta usano il flusso di ordini effettivo ponderato per il volume.

class DeltaBarGenerator:
    """
    Genera bar basate sul delta cumulativo dell'order flow.

    Delta = Volume di Acquisto - Volume di Vendita (classificato per lato aggressore).

    Richiede dati a livello di trade con classificazione del lato
    (disponibile da Binance aggTrades, Bybit trades, ecc.)
    """

    def __init__(self, threshold: float = 500.0):
        self.threshold = threshold
        self.cumulative_delta = 0.0
        self.trades: list[tuple[int, float, float, int]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
        side = -1 if is_buyer_maker else 1
        signed_qty = side * qty

        self.cumulative_delta += signed_qty
        self.trades.append((timestamp, price, qty, side))

        if abs(self.cumulative_delta) >= self.threshold:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        bar.delta = self.cumulative_delta  # type: ignore
        bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1)  # type: ignore
        bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1)  # type: ignore

        self.bars.append(bar)
        self.cumulative_delta = 0.0
        self.trades = []
        return bar

Divergenza del delta: Uno dei segnali più potenti — il prezzo sale mentre il delta cumulativo è negativo (i venditori sono aggressivi ma il prezzo sale comunque, indicando assorbimento degli acquisti limit). Direttamente rilevante per l'approccio di fingerprinting comportamentale descritto nell'articolo Impronta Digitale: Identificazione del Trader. Per i market maker che usano il modello Avellaneda-Stoikov, le bar delta forniscono una visione in tempo reale del rischio di inventario e della pressione degli aggressori.


Aggregazione rolling window Un buffer circolare di bar base: i nuovi dati entrano, i vecchi escono, e la candela aggregata è sempre valida.

I metodi di aggregazione determinano come le bar base vengono composte in candele di timeframe superiore (HTF). Sono indipendenti dal tipo di bar — puoi applicare qualsiasi metodo di aggregazione a qualsiasi tipo di bar base.

Metodo A: Aggregazione Allineata al Calendario

Aggrega tutte le bar base che rientrano in un confine di calendario fisso. La candela "di 1 ora" copre tutte le bar dalle 14:00:00 alle 14:59:59.

Proprietà:

  • Tutti i partecipanti al mercato vedono gli stessi confini — essenziale per l'analisi della struttura di mercato, supporto/resistenza, trigger PIQ
  • Problema del cold start: candela parziale dopo il riavvio
  • Naturale per le bar temporali (è quello che gli exchange forniscono nativamente)
  • Funziona anche per le bar non temporali: "tutte le bar volume chiuse tra le 14:00 e le 15:00" = una candela oraria allineata al calendario da bar volume

Metodo B: Aggregazione Rolling Window

Aggrega le ultime N bar base chiuse, ricalcolata ad ogni nuova bar. Una candela rolling "di 1 ora" = le ultime 60 bar temporali di 1 minuto chiuse, aggiornata ogni minuto.

L'unità atomica è la bar base chiusa. Questa scelta di design offre:

  1. Nessun cold start. Dopo N bar, la candela è valida. Nessun rumore da candela parziale.
  2. Parità del backtest. Se il trading live usa la stessa unità atomica del motore di backtesting, i segnali sono identici.
  3. Validazione semplice. Una regola: if buffer not full: skip.
import numpy as np

class RollingCandleAggregator:
    """
    Produce candele rolling di timeframe superiore da bar base chiuse.

    Funziona con QUALSIASI tipo di bar: bar temporali, bar tick, bar volume,
    bar dollaro, bar delta — tutto ciò che produce output OHLCV.

    Esempio: RollingCandleAggregator(window=60) con bar temporali 1m
    produce una candela "1h" aggiornata ogni minuto.

    Esempio: RollingCandleAggregator(window=24) con bar volume
    produce una candela che copre le ultime 24 bar volume.
    """

    def __init__(self, window: int):
        self.window = window
        self.buffer: deque[OHLCV] = deque(maxlen=window)

    def push(self, bar: OHLCV) -> OHLCV | None:
        """
        Aggiunge una bar base chiusa. Restituisce la candela aggregata
        solo quando il buffer è pieno (= la candela è valida).
        """
        self.buffer.append(bar)

        if len(self.buffer) < self.window:
            return None

        return self._aggregate()

    def _aggregate(self) -> OHLCV:
        bars = list(self.buffer)
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

    @property
    def is_valid(self) -> bool:
        return len(self.buffer) == self.window

Trade-off dello sfasamento di fase: Le candele rolling si chiudono alle :37 se hai iniziato alle :37, non alle :00 come tutti gli altri. Questo è importante per le strategie che dipendono dai livelli visibili alla folla. La soluzione: usare entrambi — calendario per la struttura di mercato, rolling per i segnali.

Metodo C: Aggregazione Rolling Adattiva

Come rolling, ma la dimensione della finestra si adatta alla volatilità corrente. Mercati calmi → finestra più ampia (più smussamento). Mercati volatili → finestra più stretta (reazione più rapida).

class AdaptiveRollingAggregator:
    """
    Rolling window dove la dimensione della finestra si adatta alla volatilità.

    Funziona con qualsiasi tipo di bar base. Usa l'ATR delle bar recenti
    come misura della volatilità.

    Bassa volatilità → finestra più ampia (più smussamento, meno segnali)
    Alta volatilità → finestra più stretta (reazione più rapida)
    """

    def __init__(
        self,
        base_window: int = 60,
        min_window: int = 15,
        max_window: int = 240,
        atr_period: int = 14,
        atr_base: float | None = None,
    ):
        self.base_window = base_window
        self.min_window = min_window
        self.max_window = max_window
        self.atr_period = atr_period
        self.atr_base = atr_base

        self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
        self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
        self.current_window = base_window

    def push(self, bar: OHLCV) -> OHLCV | None:
        self.all_candles.append(bar)

        tr = bar.high - bar.low
        self.atr_values.append(tr)

        if len(self.atr_values) < self.atr_period:
            return None

        current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period

        if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
            self.atr_base = sum(self.atr_values) / len(self.atr_values)

        if self.atr_base is None or self.atr_base == 0:
            return None

        vol_ratio = current_atr / self.atr_base
        self.current_window = int(self.base_window / vol_ratio)
        self.current_window = max(self.min_window, min(self.max_window, self.current_window))

        if len(self.all_candles) < self.current_window:
            return None

        bars = list(self.all_candles)[-self.current_window:]
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

Ogni tipo di bar base può essere combinato con ogni metodo di aggregazione. Alcune combinazioni sono standard (bar temporali calendar = ciò che gli exchange forniscono), altre sono esotiche ma potenti.

Esempi di Combinazione

Tipo di Bar Base Calendario Rolling Adattivo
Tempo Candele standard dell'exchange HTF sempre valido, nessun cold start Timeframe vol-adattivo
Volume "Tutte le bar volume di quest'ora" Ultime 24 bar volume Finestra più ampia nei mercati calmi
Dollaro Aggregato orario di bar dollaro Ultime N bar dollaro Finestre dollaro adattive
Tick Imbalance Aggregato orario di imbalance Ultimi N eventi di imbalance Reazione rapida nei regimi volatili
Delta Order flow netto orario Snapshot delta rolling Finestra di flusso adattiva
Renko "Mattoni di quest'ora" Ultimi N mattoni Conteggio mattoni adattivo

Motore Ibrido: Calendario + Rolling

In pratica, vuoi sia l'aggregazione calendario che rolling simultaneamente. L'overhead di memoria è trascurabile — due buffer deque per timeframe per simbolo.

class HybridCandleEngine:
    """
    Mantiene sia le candele allineate al calendario che le candele rolling
    per qualsiasi tipo di bar base.

    Candele calendario: per la struttura di mercato, supporto/resistenza, PIQ.
    Candele rolling: per gli indicatori, la generazione di segnali, ingressi/uscite.
    """

    def __init__(self):
        self.rolling = {
            '1h': RollingCandleAggregator(60),
            '4h': RollingCandleAggregator(240),
        }
        self.calendar: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }
        self._calendar_buffer: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }

    def on_bar(self, bar: OHLCV):
        """Processa qualsiasi tipo di bar base — tempo, volume, tick, delta, ecc."""
        rolling_results = {}
        for tf, agg in self.rolling.items():
            rolling_results[tf] = agg.push(bar)

        self._update_calendar(bar)

        return rolling_results

    def _update_calendar(self, bar: OHLCV):
        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)

        for tf, minutes in [('1h', 60), ('4h', 240)]:
            self._calendar_buffer[tf].append(bar)

            total_minutes = ts.hour * 60 + ts.minute
            if (total_minutes + 1) % minutes == 0:
                bars = self._calendar_buffer[tf]
                if bars:
                    agg = OHLCV(
                        timestamp=bars[-1].timestamp,
                        open=bars[0].open,
                        high=max(b.high for b in bars),
                        low=min(b.low for b in bars),
                        close=bars[-1].close,
                        volume=sum(b.volume for b in bars),
                    )
                    self.calendar[tf].append(agg)
                    self._calendar_buffer[tf] = []

Ibrido Tempo-Volume: Calendario con Split di Volume

Una variante speciale di aggregazione: candele allineate al calendario che si chiudono anticipatamente quando il volume supera una soglia. Mantiene la sincronizzazione temporale adattandosi ai picchi di attività.

class TimeVolumeHybridGenerator:
    """
    Candele allineate al calendario che si dividono quando il volume aumenta.

    Regola: chiudi la candela al confine del calendario OPPURE quando
    il volume accumulato supera vol_threshold, a seconda di quale avviene prima.

    Funziona con qualsiasi tipo di bar base — il trigger di volume aggiunge una
    dimensione di split extra sopra l'allineamento al calendario.
    """

    def __init__(
        self,
        interval_minutes: int = 60,
        vol_threshold: float = 5000.0,
    ):
        self.interval_minutes = interval_minutes
        self.vol_threshold = vol_threshold

        self.buffer: list[OHLCV] = []
        self.accumulated_volume = 0.0
        self.bars: list[OHLCV] = []

    def on_bar(self, bar: OHLCV) -> OHLCV | None:
        self.buffer.append(bar)
        self.accumulated_volume += bar.volume

        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)
        total_minutes = ts.hour * 60 + ts.minute
        at_boundary = (total_minutes + 1) % self.interval_minutes == 0

        vol_spike = self.accumulated_volume >= self.vol_threshold

        if at_boundary or vol_spike:
            return self._close_bar(split_reason='volume' if vol_spike else 'time')

        return None

    def _close_bar(self, split_reason: str) -> OHLCV:
        bars = self.buffer
        bar = OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )
        bar.split_reason = split_reason  # type: ignore
        bar.num_bars = len(bars)  # type: ignore

        self.bars.append(bar)
        self.buffer = []
        self.accumulated_volume = 0.0
        return bar

Aggregazione Pratica: Precaricamento a Cascata

Aggregazione a cascata Precaricamento a cascata: comporre candele giornaliere dalle orarie, e le orarie dai minuti — bypassando i limiti delle API.

Gli exchange limitano la quantità di dati storici che servono. Binance fornisce ~1000 candele per richiesta REST, OKX ha un limite di 300. Se hai bisogno di una candela rolling 1D (1440 minuti), non puoi sempre ottenere abbastanza storia 1m. Per lo streaming in tempo reale di trade e book degli ordini via WebSocket, vedi Metodi WebSocket CCXT Pro.

La soluzione: aggregazione a cascata — costruisci timeframe superiori dalla risoluzione più alta disponibile a ogni profondità, poi uniscili.

Candela rolling 1W:
├── 6 candele 1D completate ← fetch da REST /klines?interval=1d
├── 1 giorno parziale:
│   ├── 23 candele 1h completate ← fetch da REST /klines?interval=1h
│   └── 1 ora parziale:
│       └── N candele 1m completate ← fetch da REST /klines?interval=1m
└── Live: ogni nuova candela 1m chiusa aggiorna l'intera catena

Questo funziona perché l'aggregazione OHLCV è componibile: il massimo di una candela 1D è il massimo di 24 massimi 1h, che è il massimo di 1440 massimi 1m.

Limiti Multi-Exchange

Exchange Max Candele 1m Max Candele 1h Intervalli Notevoli
Binance 1.000 1.000 1m–1M, gamma completa
Bybit 1.000 1.000 1–720, D/W/M
OKX 300 300 1m–1M (più restrittivo)
Gate.io 1.000 1.000 10s–30d

Verifica della Consistenza dell'Aggregazione

La candela 1h da un'API REST potrebbe non corrispondere a ciò che calcoleresti da 60 candele 1m. Valida sempre:

def validate_aggregation(
    candle_htf: OHLCV,
    candles_ltf: list[OHLCV],
    tolerance_pct: float = 0.001,
) -> dict[str, bool]:
    agg = OHLCV(
        timestamp=candles_ltf[-1].timestamp,
        open=candles_ltf[0].open,
        high=max(c.high for c in candles_ltf),
        low=min(c.low for c in candles_ltf),
        close=candles_ltf[-1].close,
        volume=sum(c.volume for c in candles_ltf),
    )

    def close_enough(a: float, b: float) -> bool:
        if a == 0 and b == 0:
            return True
        return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct

    return {
        'open': close_enough(candle_htf.open, agg.open),
        'high': close_enough(candle_htf.high, agg.high),
        'low': close_enough(candle_htf.low, agg.low),
        'close': close_enough(candle_htf.close, agg.close),
        'volume': close_enough(candle_htf.volume, agg.volume),
    }

Se la validazione fallisce sistematicamente, aggrega sempre da 1m tu stesso — non fidarti mai della candela HTF dell'exchange per la parità del backtest.


Matrice di Confronto

Asse 1: Tipi di Bar Base

# Tipo di Bar Trigger Dati Tick Richiesti Migliore Per
1 Tempo Intervallo fisso No Struttura di mercato, comportamento della folla
2 Tick N trade Feature ML, campionamento equal-opinion
3 Volume N unità scambiate Analisi dell'attività normalizzata
4 Dollaro $N nozionale Confronto cross-asset
5 Renko Prezzo ± N unità No Trend following, filtraggio rumore
6 Range High-Low ≥ N Rilevamento breakout
7 Volatilità Range adattivo Analisi regime-adattiva
8 Heikin-Ashi Trasformazione No Conferma trend (prezzi sintetici!)
9 Kagi Inversione di prezzo No Struttura domanda/offerta
10 Line Break Breakout N-linee No Filtro trend macro
11 Point & Figure Box + inversione No Mappatura supporto/resistenza
12 TIB Tick imbalance Rilevamento flusso informato
13 VIB Volume imbalance Rilevamento ordini grandi
14 Run Lunghezza della corsa Rilevamento suddivisione ordini
15 CUSUM Rendimento cumulativo No (chiusure 1m) Eventi di rottura strutturale
16 Entropia Entropia di Shannon Ricerca ML, purezza delle feature
17 Delta Delta order flow Sì (aggTrades) Analisi del flusso aggressore

Asse 2: Metodi di Aggregazione

Metodo Allineamento Cold Start Sfasamento di Fase Migliore Per
Calendario Orologio a muro Rischio bar parziale Nessuno (allineato alla folla) Struttura di mercato, PIQ, S/R
Rolling N bar Nessuno (dopo warmup) Sì (sfasato da :00) Indicatori, segnali
Adattivo N guidato dalla volatilità Dopo calibrazione ATR Strategie vol-adattive

Raccomandazioni Pratiche

Architettura a strati Architettura a quattro strati di candele: segnali rolling, struttura calendario, flusso di microstruttura e filtri di trend.

Se il tuo motore di backtest gira su dati OHLCV 1m:

  1. Bar temporali rolling — aggiornamento più semplice. Zero dati aggiuntivi. Elimina il cold start.
  2. Bar temporali ibride (rolling + calendario) — calendario per la struttura di mercato, rolling per i segnali.
  3. Filtro CUSUM — funziona su chiusure 1m, nessun dato tick. "Qualcosa si è mosso abbastanza da essere interessante."

Se hai dati tick/trade:

  1. Bar dollaro + rolling — default raccomandato dalla letteratura di finanza quantitativa.
  2. Bar volume imbalance + rolling — rileva il flusso informato, campiona di più durante eventi significativi.
  3. Bar delta + calendario — se hai la classificazione del lato aggressore, la visione più diretta di chi sta spingendo il mercato.

Come filtri (applica Heikin-Ashi o Line Break sopra qualsiasi combinazione base+aggregazione):

  1. Heikin-Ashi sulle bar volume rolling — segnali di trend puliti su dati normalizzati per attività.
  2. Line Break / Kagi sulle bar calendario giornaliere — filtro trend macro.

Per Marketmaker.cc specificamente — un approccio a strati:

  • Strato 1 (segnali): Aggregazione rolling di bar temporali per indicatori e segnali di ingresso/uscita. Nessun cold start, perfetta parità backtest.
  • Strato 2 (struttura di mercato): Bar temporali allineate al calendario per supporto/resistenza, analisi della chiusura oraria e trigger PIQ.
  • Strato 3 (microstruttura): Bar volume imbalance + bar delta dal flusso grezzo di trade per rilevare il flusso informato, la suddivisione degli ordini e anticipare i grandi movimenti. Vedi anche Impronta Digitale: Identificazione del Trader per il riconoscimento di pattern comportamentali sui dati di order flow.
  • Strato 4 (filtro trend): Trasformazione Heikin-Ashi sulle bar rolling, o Line Break sulle chiusure calendario 4h, per mantenere i segnali allineati con la direzione macro.

Conclusione

La costruzione delle candele non è una singola scelta — sono due decisioni indipendenti:

  1. Che tipo di bar? Il tempo cattura gli intervalli dell'orologio. L'attività (tick, volume, dollaro) cattura la partecipazione al mercato. Il prezzo (Renko, range, volatilità) cattura i movimenti. L'informazione (imbalance, run, CUSUM, entropia) cattura gli arrivi di nuove informazioni. L'order flow (delta) cattura la pressione aggressiva.

  2. Come aggregare nei timeframe superiori? Il calendario si allinea con la folla. Il rolling elimina il cold start. L'adattivo reagisce alla volatilità.

La "candela di 1 ora da Binance" standard è solo una cella in una matrice 17×3. Le altre 50 combinazioni sono disponibili per chiunque sia disposto a implementarle. Per un sistema di produzione, la risposta è "scegli la combinazione giusta per ogni strato del tuo motore decisionale."

L'unità atomica — la bar base chiusa — rimane la fondazione. Tutto il resto è aggregazione.

Per ulteriori informazioni sulla precisione del backtest con dati a grana fine, vedi Drill-Down Adattivo: Backtest con Granularità Variabile. Per l'impatto della precomputazione degli indicatori sulle strategie multi-timeframe, vedi Cache Parquet Aggregata.


Link Utili

  1. Lopez de Prado — Advances in Financial Machine Learning (2018)
  2. Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
  3. mlfinlab — libreria Python che implementa le bar information-driven
  4. Binance — Historical Market Data
  5. Apache Parquet — formato di archiviazione colonnare

Citazione

@article{soloviov2026bartypes,
  author = {Soloviov, Eugen},
  title = {Bar Types and Aggregation Methods for Algorithmic Trading},
  year = {2026},
  url = {https://marketmaker.cc/it/blog/post/beyond-time-bars-candle-construction},
  description = {Classificazione a due assi della costruzione delle candele: 17 tipi base di bar × 3 metodi di aggregazione = 51 combinazioni, con codice di implementazione e raccomandazioni pratiche per l'algotrading crypto.}
}
Disclaimer: le informazioni fornite in questo articolo hanno solo scopo didattico e informativo e non costituiscono consulenza finanziaria, di investimento o di trading. Il trading di criptovalute comporta un rischio significativo di perdita.

Autori

Eugen Soloviov
Eugen Soloviov

Trading-systems engineer

Trading-systems engineer building bots since 2017: cross-exchange arbitrage (connected up to 30 venues), cointegration-based pairs arbitrage across spot and futures, scalping, news and sentiment-driven strategies, trend algorithms, and portfolio management and balancing algorithms. Also builds sub-millisecond order execution, big-data warehouses, backtesting engines, AI agents, and trading interfaces (incl. open-source profitmaker.cc). Stack: JS/TS, Python, Rust/Zig/Go, DevOps, backend, frontend, architecture.

Newsletter

Resta un Passo Avanti al Mercato

Iscriviti alla nostra newsletter per approfondimenti esclusivi sul trading con IA, analisi di mercato e aggiornamenti sulla piattaforma.

Rispettiamo la tua privacy. Annulla l'iscrizione in qualsiasi momento.