← กลับไปยังบทความ
March 22, 2026
อ่าน 5 นาที

ประเภทแท่งเทียนและวิธีการรวมข้อมูลสำหรับการซื้อขายแบบ Algorithmic

ประเภทแท่งเทียนและวิธีการรวมข้อมูลสำหรับการซื้อขายแบบ Algorithmic
#algotrading
#แท่งเทียน
#microstructure ของตลาด
#Lopez de Prado
#order flow
#backtesting
#วิจัย

ทุกกราฟแท่งเทียนที่คุณเคยเห็นบน Binance, TradingView หรือ UI ของกระดานซื้อขายใดก็ตาม ถูกสร้างขึ้นในแบบเดียวกัน: รวบรวมการซื้อขายภายในช่วงเวลาที่กำหนด — 1 นาที, 5 นาที, 1 ชั่วโมง — แล้วสร้างแท่ง OHLCV วิธีนี้แพร่หลายมากจนนักเทรดส่วนใหญ่ไม่เคยตั้งคำถาม แต่สำหรับการซื้อขายแบบ algorithmic การเลือกประเภทแท่งและวิธีการรวมข้อมูลเป็นสองการตัดสินใจที่เป็นอิสระจากกัน — และระบบส่วนใหญ่มักนำสองสิ่งนี้มารวมกันโดยไม่ตั้งใจ

บทความนี้แยกสองแกนของการสร้างแท่งเทียน: ประเภทของแท่ง ที่คุณสร้าง (17 ประเภท) และ วิธีการรวมข้อมูล เป็นกรอบเวลาที่สูงขึ้น (3 วิธี) การผสมผสานนี้ให้ 51 การกำหนดค่าที่เป็นไปได้ แต่ละแบบมีคุณสมบัติแตกต่างกันสำหรับการ backtesting, การซื้อขายจริง และการสร้างสัญญาณ

สำหรับบทแนะนำเกี่ยวกับวิธีที่การซื้อขายดิบกลายเป็นแท่งเทียนมาตรฐาน ดูได้ที่ Trading Candles Demystified


TL;DR

  • การสร้างแท่งเทียนมี สองแกนที่เป็นอิสระ: ประเภทแท่งและวิธีการรวมข้อมูล
  • 17 ประเภทแท่งพื้นฐาน: เวลา, tick, ปริมาณ, ดอลลาร์, Renko, range, ความผันผวน, Heikin-Ashi, Kagi, Line Break, P&F, tick imbalance (TIB), volume imbalance (VIB), run, CUSUM, entropy, delta
  • 3 วิธีการรวมข้อมูล: จัดตามปฏิทิน, rolling window, adaptive rolling
  • 17 × 3 = 51 การผสมผสานที่เป็นไปได้ แต่ละแบบมีคุณสมบัติแตกต่างกัน
  • ระบบส่วนใหญ่ใช้เพียงการผสมผสานเดียว: แท่งเวลาจัดตามปฏิทิน อีก 50 แบบที่เหลือยังไม่ถูกนำมาใช้
  • คำแนะนำเชิงปฏิบัติ: ใช้หลายการผสมผสานเป็นชั้นๆ — rolling time bars สำหรับสัญญาณ, calendar time bars สำหรับโครงสร้างตลาด, information-driven bars สำหรับ microstructure

สองแกนของการสร้างแท่งเทียน

มุมมองดั้งเดิมจัดประเภทแท่งทั้งหมดไว้ในรายการเดียว: time bars, tick bars, volume bars, Renko ฯลฯ ซึ่งทำให้เกิดความเข้าใจผิด ในความเป็นจริงมีการเลือกสองแบบที่ตั้งฉากกัน:

แกนที่ 1 — ประเภทแท่งพื้นฐาน (17 ประเภท): คุณตัดสินใจอย่างไรว่าแท่งใหม่จะปิดเมื่อใด? หลังจากช่วงเวลาที่กำหนด? หลังจาก N การซื้อขาย? หลังจากการเคลื่อนไหวของราคา? เมื่อเนื้อหาของข้อมูลเปลี่ยนแปลง? สิ่งนี้กำหนดว่า "หนึ่งแท่ง" หมายความว่าอะไร

แกนที่ 2 — วิธีการรวมข้อมูล (3 วิธี): คุณรวบรวมแท่งพื้นฐานเป็นแท่งเทียนกรอบเวลาที่สูงขึ้นอย่างไร? จัดตามขอบเขตปฏิทิน (00:00, 01:00, ...)? ใช้ rolling window ของ N แท่งล่าสุด? ปรับขนาด window ตามความผันผวน?

สองแกนนี้เป็นอิสระจากกัน คุณสามารถมี:

  • Calendar-aligned tick bars — รวม tick bars ที่ปิดระหว่าง 14:00 ถึง 14:59 เป็นแท่งรายชั่วโมงเดียว
  • Rolling volume bars — นำ 24 volume bars ล่าสุดโดยไม่คำนึงว่าปิดเมื่อใด
  • Adaptive delta bars — ใช้ window ที่ขับเคลื่อนด้วยความผันผวนเหนือ delta bars

"แท่ง 1 ชั่วโมง" มาตรฐานเป็นเพียงจุดเดียวในเมทริกซ์ 17×3 นี้: time bars + calendar alignment ทุกการผสมผสานอื่นเป็นทางเลือกที่ควรพิจารณา


1. Time Bars (มาตรฐาน)

Calendar time bars problem ความหนาแน่นของข้อมูลที่ไม่สม่ำเสมอ: ขอบเขตเวลาที่ตายตัวปฏิบัติต่อช่วงเวลาเงียบที่มี 200 การซื้อขายเหมือนกับช่วงเวลาประกาศที่มี 50,000 การซื้อขาย

ค่าเริ่มต้น แท่งใหม่เกิดขึ้นหลังจากช่วงเวลาที่กำหนด: 1 นาที, 5 นาที, 1 ชั่วโมง ทุกกระดานซื้อขายให้สิ่งเหล่านี้โดยกำเนิด

คุณสมบัติ:

  • ในช่วง Asian session (00:00–08:00 UTC) แท่ง 1 ชั่วโมงอาจมี 200 การซื้อขาย ในระหว่างการประกาศ listing ของ Binance ช่วงเวลาเดียวกันอาจมี 50,000 การซื้อขาย Time bars ปฏิบัติต่อทั้งสองอย่างเหมือนกัน การตรวจจับการพุ่งของกิจกรรมดังกล่าวมีความสำคัญอย่างยิ่งสำหรับการป้องกันบอต — ดู Anomaly Detection for Trading Bots
  • ผู้เข้าร่วมตลาดทั้งหมดเห็นขอบเขตแท่งเดียวกัน — Schelling point ทำให้ time bars จำเป็นสำหรับการวิเคราะห์พฤติกรรมของกลุ่ม
  • ตัวชี้วัดที่คำนวณบนแท่งที่ยังไม่สมบูรณ์ (หลังรีสตาร์ท) ให้ค่าที่ผิดพลาด
from datetime import datetime

def time_until_valid_hourly_candle():
    """How long until the first complete hourly candle after restart."""
    now = datetime.utcnow()
    minutes_into_hour = now.minute
    seconds_into_minute = now.second

    wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
    wait_seconds += 3600

    return wait_seconds

2–4. แท่งตามกิจกรรม

Activity-based bars Tick, volume และ dollar bars: สามวิธีให้การมีส่วนร่วมของตลาด — ไม่ใช่นาฬิกา — กำหนดขอบเขตแท่ง

แทนที่จะสุ่มตัวอย่างที่ช่วงเวลาที่กำหนด ให้สุ่มตัวอย่างหลังจากปริมาณกิจกรรมของตลาดที่กำหนด วิธีนี้สร้างแท่งที่มี "เนื้อหาข้อมูล" ใกล้เคียงกันโดยไม่คำนึงถึงเวลาในวัน

2. Tick Bars

แท่งใหม่เกิดขึ้นหลังจากทุก N การซื้อขาย (tick) ในช่วงกิจกรรมสูง แท่งจะก่อตัวอย่างรวดเร็ว ในช่วงเงียบ แท่งเดียวอาจใช้เวลาหลายชั่วโมง

from collections import deque
from dataclasses import dataclass

@dataclass
class OHLCV:
    timestamp: int
    open: float
    high: float
    low: float
    close: float
    volume: float

class TickBarGenerator:
    """
    Generates a new bar every `threshold` trades.
    Each bar contains equal number of market "opinions".
    """

    def __init__(self, threshold: int = 1000):
        self.threshold = threshold
        self.trades: list[tuple[float, float]] = []  # (price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((price, qty))

        if len(self.trades) >= self.threshold:
            self._close_bar(timestamp)

    def _close_bar(self, timestamp: int):
        prices = [t[0] for t in self.trades]
        volumes = [t[1] for t in self.trades]

        bar = OHLCV(
            timestamp=timestamp,
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

ข้อดี: ปรับตัวตามกิจกรรมของตลาดได้ตามธรรมชาติ ผลตอบแทนจาก tick bars มักมีการกระจายตัวใกล้เคียง normal มากกว่าผลตอบแทนจาก time bars — คุณสมบัตินี้ปรับปรุงประสิทธิภาพของโมเดลทางสถิติหลายรูปแบบ

ข้อเสีย: ต้องการ trade stream ดิบ (ไม่มีจากผู้ให้บริการข้อมูลทุกรายสำหรับข้อมูลย้อนหลัง) เวลาของแท่งไม่สามารถคาดเดาได้ — คุณไม่สามารถบอกได้ว่า "แท่งถัดไปจะปิดที่ X"

3. Volume Bars

แท่งใหม่เกิดขึ้นหลังจาก N สัญญา (หรือเหรียญในกรณีของ crypto) ถูกซื้อขาย คล้ายกับ tick bars แต่ถ่วงน้ำหนักตามขนาดการซื้อขาย — การซื้อขาย 100 BTC ครั้งเดียวมีส่วนร่วม 100 เท่าของการซื้อขาย 1 BTC

class VolumeBarGenerator:
    """
    Generates a new bar every `threshold` units of volume.
    Normalizes for trade size: one large order ≠ one small order.
    """

    def __init__(self, threshold: float = 100.0):
        self.threshold = threshold
        self.accumulated_volume = 0.0
        self.trades: list[tuple[int, float, float]] = []  # (ts, price, qty)
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_volume += qty

        if self.accumulated_volume >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_volume = 0.0
        self.trades = []
        return bar

4. Dollar Bars

แท่งใหม่เกิดขึ้นหลังจากมูลค่าตามสัญญาที่กำหนด (เป็น USD/USDT) ถูกแลกเปลี่ยน เป็นแท่งตามกิจกรรมที่แข็งแกร่งที่สุดเพราะปรับให้เป็นมาตรฐานทั้งจำนวนการซื้อขายและระดับราคา

พิจารณา: ถ้า ETH ไปจาก 1,000ถึง1,000 ถึง 4,000 การขาย ETH มูลค่า 10,000ต้องการ2.5ETHที่10,000 ต้องการ 2.5 ETH ที่ 4,000 แต่ต้องการ 10 ETH ที่ $1,000 Volume bars จะปฏิบัติต่อสิ่งเหล่านี้แตกต่างกัน; dollar bars ปฏิบัติต่อเหมือนกัน

class DollarBarGenerator:
    """
    Generates a new bar every `threshold` dollars (USDT) of notional volume.
    Most robust normalization: independent of price level.

    Lopez de Prado (2018) recommends dollar bars as the default
    for most quantitative applications.
    """

    def __init__(self, threshold: float = 1_000_000.0):
        self.threshold = threshold
        self.accumulated_dollars = 0.0
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))
        self.accumulated_dollars += price * qty

        if self.accumulated_dollars >= self.threshold:
            self._close_bar()

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.accumulated_dollars = 0.0
        self.trades = []
        return bar

การเลือก Threshold

Threshold สำหรับแท่งตามกิจกรรมควรสร้างแท่งในจำนวนที่ใกล้เคียงกันต่อวันกับ time bars ที่คุณกำลังแทนที่ สำหรับ BTCUSDT บน Binance:

ประเภทแท่ง Threshold ทั่วไป แท่ง/วัน (ประมาณ) TF เทียบเท่า
Tick 1,000 การซื้อขาย ~1,400 ~1m
Tick 50,000 การซื้อขาย ~28 ~1h
Volume 100 BTC ~600 ~2-3m
Volume 2,400 BTC ~25 ~1h
Dollar $1M ~1,400 ~1m
Dollar $50M ~28 ~1h

ตัวเลขเหล่านี้เป็นค่าประมาณและเปลี่ยนแปลงอย่างมากตามสภาวะตลาด ในระหว่างการพุ่งขึ้นหรือตก แท่งตามกิจกรรมจะสร้างแท่งมากกว่าปกติ 5-10 เท่า — ซึ่งนั่นคือจุดประสงค์ที่แท้จริง

5–7. แท่งตามราคา

Price-based bars Renko bricks, range bars และ volatility bars: สุ่มตัวอย่างเฉพาะเมื่อราคาเคลื่อนไหวมากพอ

แท่งตามราคาไม่สนใจทั้งเวลาและกิจกรรม แท่งใหม่เกิดขึ้นเฉพาะเมื่อราคาเคลื่อนไหวในปริมาณที่กำหนด สิ่งนี้กรองสัญญาณรบกวนในช่วง sideways และเน้น trend โดยธรรมชาติ

5. Renko Bars

"อิฐ" Renko ใหม่เกิดขึ้นเมื่อราคาปิดเคลื่อนไหวอย่างน้อย N หน่วยจากราคาปิดของอิฐก่อนหน้า อิฐมีขนาดเท่ากันเสมอ สร้างการแสดงผลที่สะอาดตาของทิศทาง trend

class RenkoBarGenerator:
    """
    Generates Renko bricks based on price movement.

    Key property: during sideways movement, no new bricks form.
    During strong trends, bricks form rapidly.
    """

    def __init__(self, brick_size: float = 10.0):
        self.brick_size = brick_size
        self.bricks: list[dict] = []
        self.last_close: float | None = None

    def on_price(self, timestamp: int, price: float, volume: float = 0.0):
        if self.last_close is None:
            self.last_close = price
            return []

        new_bricks = []
        diff = price - self.last_close
        num_bricks = int(abs(diff) / self.brick_size)

        if num_bricks == 0:
            return []

        direction = 1 if diff > 0 else -1

        for i in range(num_bricks):
            brick_open = self.last_close
            brick_close = self.last_close + direction * self.brick_size

            brick = {
                'timestamp': timestamp,
                'open': brick_open,
                'high': max(brick_open, brick_close),
                'low': min(brick_open, brick_close),
                'close': brick_close,
                'volume': volume / num_bricks if num_bricks > 0 else 0,
                'direction': direction,
            }
            new_bricks.append(brick)
            self.last_close = brick_close

        self.bricks.extend(new_bricks)
        return new_bricks

Dynamic Renko ใช้ ATR (Average True Range) แทนขนาดอิฐที่กำหนด ปรับตัวตามความผันผวนโดยอัตโนมัติ

6. Range Bars

แต่ละแท่งมี range สูง-ต่ำที่กำหนด เมื่อ range ถูกเกินไป แท่งจะปิดและเริ่มอันใหม่ ต่างจาก Renko, range bars มี wicks และสามารถแสดงความผันผวนภายในแท่งได้

class RangeBarGenerator:
    """
    Generates bars with a fixed high-low range.

    Difference from Renko: range bars show the full OHLC within
    the range, not just brick direction. More information-rich.
    """

    def __init__(self, range_size: float = 20.0):
        self.range_size = range_size
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_open: float | None = None
        self.current_volume: float = 0.0
        self.current_start_ts: int = 0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_start_ts = timestamp

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        if self.current_high - self.current_low >= self.range_size:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            self.current_start_ts = timestamp

            return bar

        return None

ความแตกต่างหลักระหว่าง Renko และ Range bars: Renko ติดตามเฉพาะราคาปิดและแสดงทิศทาง; range bars ติดตาม range ราคาเต็มและแสดงโครงสร้างภายในแท่ง Range bars มักมีประโยชน์มากกว่าสำหรับการซื้อขายแบบ algorithmic เพราะรักษาข้อมูล high-low ที่จำเป็นสำหรับการจำลอง stop-loss และ take-profit

7. Volatility Bars

แท่งใหม่เกิดขึ้นเมื่อความผันผวนภายในแท่งถึง threshold แบบ dynamic — ตัวอย่างเช่น เป็นทวีคูณของ ATR ล่าสุด ต่างจาก range bars (threshold ที่กำหนด), volatility bars ปรับตัวตามสภาวะตลาด

class VolatilityBarGenerator:
    """
    Generates bars when intra-bar volatility reaches a threshold.

    Similar to range bars, but the threshold adapts to market conditions
    using a rolling ATR measure. In calm markets, bars need less
    absolute movement to close; in volatile markets, more.
    """

    def __init__(
        self,
        atr_period: int = 14,
        atr_multiplier: float = 1.0,
        initial_threshold: float = 20.0,
    ):
        self.atr_period = atr_period
        self.atr_multiplier = atr_multiplier
        self.threshold = initial_threshold

        self.recent_ranges: list[float] = []
        self.current_open: float | None = None
        self.current_high: float | None = None
        self.current_low: float | None = None
        self.current_volume: float = 0.0
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        if self.current_open is None:
            self.current_open = price
            self.current_high = price
            self.current_low = price

        self.current_high = max(self.current_high, price)
        self.current_low = min(self.current_low, price)
        self.current_volume += qty

        intra_bar_range = self.current_high - self.current_low

        if intra_bar_range >= self.threshold:
            bar = OHLCV(
                timestamp=timestamp,
                open=self.current_open,
                high=self.current_high,
                low=self.current_low,
                close=price,
                volume=self.current_volume,
            )
            self.bars.append(bar)

            self.recent_ranges.append(intra_bar_range)
            if len(self.recent_ranges) > self.atr_period:
                self.recent_ranges = self.recent_ranges[-self.atr_period:]
            if len(self.recent_ranges) >= self.atr_period:
                avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
                self.threshold = avg_range * self.atr_multiplier

            self.current_open = price
            self.current_high = price
            self.current_low = price
            self.current_volume = 0.0
            return bar

        return None

8. Heikin-Ashi (การแปลงแบบ Smoothed)

Heikin-Ashi transformation Heikin-Ashi: การเฉลี่ยแปลงแท่งที่มีสัญญาณรบกวนให้เป็นสัญญาณ trend ที่ราบรื่น — แต่แลกมาด้วยการสูญเสียข้อมูลราคาที่แน่นอน

Heikin-Ashi (ภาษาญี่ปุ่นแปลว่า "แท่งเฉลี่ย") ไม่ใช่ประเภทแท่ง — มันเป็น การแปลง ที่สามารถนำไปใช้กับประเภทแท่งพื้นฐานใดก็ได้ มันทำให้แท่งเรียบโดยเฉลี่ยค่าแท่งปัจจุบันและก่อนหน้า:

  • HA Close = (Open + High + Low + Close) / 4
  • HA Open = (Previous HA Open + Previous HA Close) / 2
  • HA High = max(High, HA Open, HA Close)
  • HA Low = min(Low, HA Open, HA Close)

Trend ปรากฏเป็นลำดับแท่งสีเดียวกันโดยไม่มี lower wicks (uptrend) หรือไม่มี upper wicks (downtrend)

class HeikinAshiTransformer:
    """
    Transforms standard OHLCV candles into Heikin-Ashi candles.

    Can be applied on top of ANY bar type: time bars, volume bars,
    rolling bars, etc. It's a transformation, not a sampling method.

    WARNING: HA prices are synthetic — they don't represent real
    traded prices. Never use HA close for order placement or
    PnL calculation. Use HA only for signal generation, then
    execute at real prices.
    """

    def __init__(self):
        self.prev_ha_open: float | None = None
        self.prev_ha_close: float | None = None

    def transform(self, candle: OHLCV) -> OHLCV:
        ha_close = (candle.open + candle.high + candle.low + candle.close) / 4

        if self.prev_ha_open is None:
            ha_open = (candle.open + candle.close) / 2
        else:
            ha_open = (self.prev_ha_open + self.prev_ha_close) / 2

        ha_high = max(candle.high, ha_open, ha_close)
        ha_low = min(candle.low, ha_open, ha_close)

        self.prev_ha_open = ha_open
        self.prev_ha_close = ha_close

        return OHLCV(
            timestamp=candle.timestamp,
            open=ha_open,
            high=ha_high,
            low=ha_low,
            close=ha_close,
            volume=candle.volume,
        )

    def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
        """Transform an entire series. Resets state first."""
        self.prev_ha_open = None
        self.prev_ha_close = None
        return [self.transform(c) for c in candles]


def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
    """
    Simple HA trend signal.

    Returns:
        +1: bullish (N consecutive green HA candles with no lower wick)
        -1: bearish (N consecutive red HA candles with no upper wick)
         0: no clear trend
    """
    if len(ha_candles) < lookback:
        return 0

    recent = ha_candles[-lookback:]

    all_bullish = all(
        c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
        for c in recent
    )

    all_bearish = all(
        c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
        for c in recent
    )

    if all_bullish:
        return 1
    elif all_bearish:
        return -1
    return 0

ข้อควรระวังสำคัญสำหรับการ backtesting: ราคา Heikin-Ashi เป็นสังเคราะห์ ถ้า backtest ของคุณใช้ HA close เป็นราคาเข้า ผลลัพธ์จะผิดพลาด ใช้ HA สำหรับการสร้างสัญญาณเท่านั้นและดำเนินการที่ราคา OHLC จริงเสมอ

เมื่อไร HA มีประโยชน์: กลยุทธ์ตาม trend ที่ต้องการสัญญาณ "อยู่ใน" ที่สะอาด ใช้ HA กับประเภทแท่งพื้นฐานใดก็ได้ — time bars, volume bars, dollar bars — เพื่อกรอง crossovers ที่เป็นสัญญาณเท็จ

เมื่อไร HA เป็นอันตราย: กลยุทธ์ใดๆ ที่ต้องการระดับราคาที่แม่นยำ — support/resistance, การวิเคราะห์ order book, PIQ (Position In Queue) การเฉลี่ยทำลายข้อมูลราคาที่แน่นอน

9–11. กราฟ Reversal สไตล์ญี่ปุ่น

Japanese charting methods Kagi, Line Break และ Point & Figure: วิธีการเขียนกราฟที่ไม่ใช้เวลาซึ่งมุ่งเน้นที่โครงสร้างราคาล้วนๆ

วิธีการเขียนกราฟดั้งเดิมแบบญี่ปุ่นเหล่านี้ (พร้อมกับ Renko) ทิ้งเวลาโดยสิ้นเชิงและมุ่งเน้นที่โครงสร้างราคา

9. Kagi Charts

กราฟ Kagi ประกอบด้วยเส้นแนวตั้งที่เปลี่ยนทิศทางเมื่อราคากลับตัวในปริมาณที่กำหนด เส้นเปลี่ยนความหนาเมื่อราคาทำลาย high ก่อนหน้า (หนา = "yang" = demand) หรือ low ก่อนหน้า (บาง = "yin" = supply)

class KagiChartGenerator:
    """
    Generates Kagi chart lines based on price reversals.

    Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
    of each move and changes line thickness at breakout points.

    Useful for identifying support/resistance breaks and
    supply/demand shifts without time noise.
    """

    def __init__(self, reversal_amount: float = 10.0):
        self.reversal_amount = reversal_amount
        self.lines: list[dict] = []
        self.current_direction: int = 0  # 1=up, -1=down
        self.current_price: float | None = None
        self.extreme_price: float | None = None
        self.prev_high: float | None = None
        self.prev_low: float | None = None
        self.line_type: str = 'yang'  # 'yang' (thick) or 'yin' (thin)

    def on_price(self, timestamp: int, price: float):
        if self.current_price is None:
            self.current_price = price
            self.extreme_price = price
            return None

        if self.current_direction == 0:
            if price - self.current_price >= self.reversal_amount:
                self.current_direction = 1
                self.extreme_price = price
            elif self.current_price - price >= self.reversal_amount:
                self.current_direction = -1
                self.extreme_price = price
            return None

        if self.current_direction == 1:
            if price > self.extreme_price:
                self.extreme_price = price
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
            elif self.extreme_price - price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'up',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_high = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = -1
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
                return line
        else:
            if price < self.extreme_price:
                self.extreme_price = price
                if self.prev_low is not None and price < self.prev_low:
                    self.line_type = 'yin'
            elif price - self.extreme_price >= self.reversal_amount:
                line = {
                    'timestamp': timestamp,
                    'start': self.current_price,
                    'end': self.extreme_price,
                    'direction': 'down',
                    'type': self.line_type,
                }
                self.lines.append(line)
                self.prev_low = self.extreme_price
                self.current_price = self.extreme_price
                self.extreme_price = price
                self.current_direction = 1
                if self.prev_high is not None and price > self.prev_high:
                    self.line_type = 'yang'
                return line

        return None

10. Line Break Charts

กราฟ Line Break วาดเส้นใหม่ (กล่อง) เฉพาะเมื่อราคาปิดเกิน high หรือ low ของ N เส้นก่อนหน้า (โดยทั่วไป 3) ไม่มีเส้นใหม่ถูกวาดหากราคาอยู่ในช่วงนั้น

class LineBreakGenerator:
    """
    Generates Line Break bars (Three Line Break by default).

    A new bar is drawn only when the close exceeds the high or low
    of the last N bars. Filters out minor noise by requiring price
    to break through a multi-bar range.

    The 'N' parameter (line_count) controls sensitivity:
    - N=2: more sensitive, more bars, more noise
    - N=3: standard (Three Line Break)
    - N=4+: less sensitive, fewer bars, stronger signals
    """

    def __init__(self, line_count: int = 3):
        self.line_count = line_count
        self.lines: list[dict] = []

    def on_close(self, timestamp: int, close: float) -> dict | None:
        if not self.lines:
            self.lines.append({
                'timestamp': timestamp,
                'open': close,
                'close': close,
                'high': close,
                'low': close,
                'direction': 0,
            })
            return None

        lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines

        highest = max(l['high'] for l in lookback)
        lowest = min(l['low'] for l in lookback)
        last = self.lines[-1]

        new_line = None

        if close > highest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': close,
                'low': last['close'],
                'direction': 1,
            }
        elif close < lowest:
            new_line = {
                'timestamp': timestamp,
                'open': last['close'],
                'close': close,
                'high': last['close'],
                'low': close,
                'direction': -1,
            }

        if new_line:
            self.lines.append(new_line)
            return new_line

        return None

11. Point & Figure Charts

กราฟ Point & Figure (P&F) ใช้คอลัมน์ของ X (ราคาขึ้น) และ O (ราคาลง) การเปลี่ยนคอลัมน์ต้องการการกลับตัวโดยทั่วไป 3 ขนาดกล่อง เป็นหนึ่งในวิธีที่เก่าแก่ที่สุดในการกรองสัญญาณรบกวนและระบุ support/resistance

class PointAndFigureGenerator:
    """
    Generates Point & Figure chart data.

    X column: price rising by box_size increments.
    O column: price falling by box_size increments.
    Column switch: requires reversal_boxes * box_size movement
    in the opposite direction.

    Classic setting: box_size based on ATR, reversal_boxes = 3.
    """

    def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
        self.box_size = box_size
        self.reversal_boxes = reversal_boxes
        self.reversal_amount = box_size * reversal_boxes

        self.columns: list[dict] = []
        self.current_direction: int = 0
        self.current_top: float | None = None
        self.current_bottom: float | None = None

    def on_price(self, timestamp: int, price: float):
        if self.current_top is None:
            box_price = self._round_to_box(price)
            self.current_top = box_price
            self.current_bottom = box_price
            self.current_direction = 1
            return None

        events = []

        if self.current_direction == 1:
            while price >= self.current_top + self.box_size:
                self.current_top += self.box_size
                events.append(('X', self.current_top, timestamp))

            if price <= self.current_top - self.reversal_amount:
                col = {
                    'type': 'X',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = -1
                self.current_top = self.current_top - self.box_size
                self.current_bottom = self._round_to_box(price)
                events.append(('new_column', 'O', timestamp))

        else:
            while price <= self.current_bottom - self.box_size:
                self.current_bottom -= self.box_size
                events.append(('O', self.current_bottom, timestamp))

            if price >= self.current_bottom + self.reversal_amount:
                col = {
                    'type': 'O',
                    'top': self.current_top,
                    'bottom': self.current_bottom,
                    'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
                    'timestamp': timestamp,
                }
                self.columns.append(col)
                self.current_direction = 1
                self.current_bottom = self.current_bottom + self.box_size
                self.current_top = self._round_to_box(price)
                events.append(('new_column', 'X', timestamp))

        return events if events else None

    def _round_to_box(self, price: float) -> float:
        return round(price / self.box_size) * self.box_size

Kagi, Line Break และ P&F ในการซื้อขายแบบ algorithmic: ใช้เป็นหลักสำหรับการตรวจจับ trend ระยะยาวและการระบุ support/resistance ในฐานะ ชั้นกรอง — "อย่ารับสัญญาณ long เมื่อกราฟ Kagi อยู่ในโหมด yin" — จะเพิ่มมูลค่าโดยการจัดการซื้อขายให้สอดคล้องกับโครงสร้างมหภาค

12–14. แท่งขับเคลื่อนด้วยข้อมูล

Information-driven bars Imbalance bars, run bars, CUSUM filters และ entropy bars: สุ่มตัวอย่างเมื่อตลาดบอกเราว่ามีบางอย่างเปลี่ยนแปลง

วิธีการที่ซับซ้อนที่สุด จาก Advances in Financial Machine Learning (2018) ของ Marcos Lopez de Prado แนวคิดหลัก: สุ่มตัวอย่างเมื่อข้อมูลใหม่มาถึงตลาด ไม่ใช่ที่ช่วงเวลาที่กำหนด

12. Tick Imbalance Bars (TIB)

ถ้าตลาดอยู่ในสมดุล การซื้อขายที่เริ่มต้นโดยผู้ซื้อและผู้ขายควรสมดุลกันโดยประมาณ เมื่อ imbalance เกินความคาดหวังของเรา บางอย่างได้เปลี่ยนแปลง ให้สุ่มตัวอย่างแท่ง ณ จุดนั้น

การซื้อขายแต่ละรายการถูกจัดประเภทว่าเริ่มต้นโดยผู้ซื้อ (+1) หรือผู้ขาย (-1) โดยใช้กฎ tick เราติดตาม cumulative imbalance θ และสุ่มตัวอย่างเมื่อ |θ| เกิน threshold แบบ dynamic

class TickImbalanceBarGenerator:
    """
    Generates bars when the cumulative tick imbalance exceeds
    expected levels — i.e., when "new information" arrives.

    Based on Lopez de Prado (2018), Chapter 2.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        """Classify trade as buy (+1) or sell (-1) using tick rule."""
        if self.prev_price is None:
            self.prev_price = price
            return 1

        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign

        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.imbalances.append(self.theta / len(self.trades))

        if len(self.bar_lengths) >= 2:
            alpha = 2.0 / (self.ewma_window + 1)
            self.expected_ticks = (
                alpha * self.bar_lengths[-1]
                + (1 - alpha) * self.expected_ticks
            )
            self.expected_ticks = max(
                self.min_ticks,
                min(self.max_ticks, self.expected_ticks)
            )
            self.expected_imbalance = (
                alpha * self.imbalances[-1]
                + (1 - alpha) * self.expected_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

13. Volume Imbalance Bars (VIB)

การขยาย TIBs: แทนที่จะนับแต่ละการซื้อขายเป็น ±1 ให้ถ่วงน้ำหนักตาม signed volume การซื้อ 100 BTC มีส่วนร่วม +100 การขาย 1 BTC มีส่วนร่วม -1 จับ orders ที่มีข้อมูลขนาดใหญ่ที่อาจถูกแบ่งเป็นหลายการซื้อขายเล็กๆ

class VolumeImbalanceBarGenerator:
    """
    Like TIBs, but uses signed volume instead of signed ticks.

    Captures the insight that a 100-BTC buy signal is 100x more
    informative than a 1-BTC buy signal.
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window

        self.theta = 0.0
        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.bar_lengths: list[int] = []
        self.volume_imbalances: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_vol_imbalance = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.theta += sign * qty
        self.trades.append((timestamp, price, qty))

        threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
        if threshold == 0:
            threshold = self.expected_ticks_init * 0.5

        if abs(self.theta) >= threshold and len(self.trades) >= 10:
            return self._close_bar()
        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        self.bar_lengths.append(len(self.trades))
        self.volume_imbalances.append(self.theta / len(self.trades))

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = (
                alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            )
            self.expected_vol_imbalance = (
                alpha * self.volume_imbalances[-1]
                + (1 - alpha) * self.expected_vol_imbalance
            )

        self.theta = 0.0
        self.trades = []
        return bar

ปัญหาการระเบิด

ปัญหาที่ทราบกันดีกับ imbalance bars: threshold ที่ใช้ EWMA สามารถเข้าสู่ positive feedback loop ได้ วิธีแก้ไข: จำกัดด้วยขอบเขต min_ticks และ max_ticks


self.expected_ticks = max(
    self.min_ticks,    # Floor: never less than 100 ticks
    min(
        self.max_ticks,  # Ceiling: never more than 50000 ticks
        new_expected_ticks
    )
)

14. Run Bars

Run bars ติดตาม ความยาวของ directional run ปัจจุบัน — ลำดับต่อเนื่องที่ยาวที่สุดของการซื้อหรือขาย เมื่อผู้ซื้อขายที่มีข้อมูลรายใหญ่แบ่ง order เป็นการซื้อขายเล็กๆ หลายรายการ ลำดับจะยาวผิดปกติ Run bars ตรวจจับสิ่งนี้

class TickRunBarGenerator:
    """
    Generates bars when the length of a directional run exceeds expectations.

    Based on Lopez de Prado (2018), Chapter 2.

    Difference from imbalance bars:
    - Imbalance bars track NET imbalance (buys minus sells)
    - Run bars track the MAXIMUM run length (consecutive buys OR sells)
    """

    def __init__(
        self,
        expected_ticks_init: int = 1000,
        ewma_window: int = 100,
        min_ticks: int = 100,
        max_ticks: int = 50000,
    ):
        self.expected_ticks_init = expected_ticks_init
        self.ewma_window = ewma_window
        self.min_ticks = min_ticks
        self.max_ticks = max_ticks

        self.prev_price: float | None = None
        self.prev_sign = 1
        self.trades: list[tuple[int, float, float]] = []

        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        self.bar_lengths: list[int] = []
        self.max_runs: list[float] = []
        self.expected_ticks = float(expected_ticks_init)
        self.expected_max_run = 0.0

        self.bars: list[OHLCV] = []

    def _tick_sign(self, price: float) -> int:
        if self.prev_price is None:
            self.prev_price = price
            return 1
        if price > self.prev_price:
            sign = 1
        elif price < self.prev_price:
            sign = -1
        else:
            sign = self.prev_sign
        self.prev_price = price
        self.prev_sign = sign
        return sign

    def on_trade(self, timestamp: int, price: float, qty: float):
        sign = self._tick_sign(price)
        self.trades.append((timestamp, price, qty))

        if sign == 1:
            self.buy_run += 1
            self.sell_run = 0
        else:
            self.sell_run += 1
            self.buy_run = 0

        self.max_buy_run = max(self.max_buy_run, self.buy_run)
        self.max_sell_run = max(self.max_sell_run, self.sell_run)

        theta = max(self.max_buy_run, self.max_sell_run)
        threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3

        if theta >= threshold and len(self.trades) >= self.min_ticks:
            return self._close_bar()

        if len(self.trades) >= self.max_ticks:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)

        max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
        self.bar_lengths.append(len(self.trades))
        self.max_runs.append(max_run)

        alpha = 2.0 / (self.ewma_window + 1)
        if len(self.bar_lengths) >= 2:
            self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
            self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
            self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run

        self.trades = []
        self.buy_run = 0
        self.sell_run = 0
        self.max_buy_run = 0
        self.max_sell_run = 0

        return bar

Run bars สามารถขยายเป็น volume runs และ dollar runs ได้

15. CUSUM Filter Bars

CUSUM (Cumulative Sum) filter กำหนด เมื่อ จะสุ่มตัวอย่างโดยการติดตาม cumulative returns ต่างจาก imbalance bars (ที่ทำงานกับการซื้อขายดิบ), CUSUM สามารถนำไปใช้กับข้อมูล 1m OHLCV ที่มีอยู่ — ไม่ต้องการข้อมูล tick

class CUSUMFilterBarGenerator:
    """
    Symmetric CUSUM filter for event-based sampling.

    Based on Lopez de Prado (2018), Chapter 2.5.

    Key advantage over Bollinger Bands: CUSUM requires a FULL
    run of threshold magnitude before triggering. Bollinger Bands
    trigger repeatedly when price hovers near the band.

    Can be applied to 1m OHLCV data — no tick data required.
    """

    def __init__(self, threshold: float = 0.01):
        self.threshold = threshold
        self.s_pos = 0.0
        self.s_neg = 0.0
        self.prev_price: float | None = None
        self.buffer: list[OHLCV] = []
        self.bars: list[OHLCV] = []

    def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
        self.buffer.append(candle)

        if self.prev_price is None:
            self.prev_price = candle.close
            return None

        import math
        log_ret = math.log(candle.close / self.prev_price)
        self.prev_price = candle.close

        self.s_pos = max(0.0, self.s_pos + log_ret)
        self.s_neg = min(0.0, self.s_neg + log_ret)

        triggered = False

        if self.s_pos > self.threshold:
            self.s_pos = 0.0
            triggered = True

        if self.s_neg < -self.threshold:
            self.s_neg = 0.0
            triggered = True

        if triggered and len(self.buffer) >= 2:
            bars = self.buffer
            bar = OHLCV(
                timestamp=bars[-1].timestamp,
                open=bars[0].open,
                high=max(b.high for b in bars),
                low=min(b.low for b in bars),
                close=bars[-1].close,
                volume=sum(b.volume for b in bars),
            )
            self.bars.append(bar)
            self.buffer = []
            return bar

        return None

CUSUM + Triple Barrier Method: ในกรอบงานของ Lopez de Prado, CUSUM events ถูกใช้เป็นจุดเข้าสำหรับ Triple Barrier method — ที่แต่ละ event กระตุ้นการซื้อขายพร้อม stop-loss, take-profit และ expiration barriers สำหรับการตรวจสอบความถูกต้องของกลยุทธ์ที่ขับเคลื่อนด้วย event ดังกล่าว ดู Walk-Forward Optimization และ Monte Carlo Bootstrap for Backtesting

16. Entropy Bars

วิธีการที่สง่างามที่สุดในเชิงทฤษฎี: สุ่มตัวอย่างเมื่อเนื้อหาข้อมูล (Shannon entropy) ของ series ราคาภายในแท่งเกิน threshold

class EntropyBarGenerator:
    """
    Generates bars when the entropy of intra-bar returns exceeds
    a threshold.

    Based on Shannon's information theory: bars are sampled when
    "new information" arrives, measured as the entropy of the
    return distribution within the current bar.

    This is the most theoretically "pure" information-driven bar.
    """

    def __init__(
        self,
        entropy_threshold: float = 2.0,
        min_trades: int = 50,
        n_bins: int = 10,
    ):
        self.entropy_threshold = entropy_threshold
        self.min_trades = min_trades
        self.n_bins = n_bins
        self.trades: list[tuple[int, float, float]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float):
        self.trades.append((timestamp, price, qty))

        if len(self.trades) < self.min_trades:
            return None

        entropy = self._compute_entropy()

        if entropy >= self.entropy_threshold:
            return self._close_bar()

        return None

    def _compute_entropy(self) -> float:
        import math

        prices = [t[1] for t in self.trades]
        if len(prices) < 2:
            return 0.0

        returns = [
            math.log(prices[i] / prices[i-1])
            for i in range(1, len(prices))
            if prices[i-1] > 0
        ]

        if not returns:
            return 0.0

        min_r = min(returns)
        max_r = max(returns)

        if max_r == min_r:
            return 0.0

        bin_width = (max_r - min_r) / self.n_bins
        bins = [0] * self.n_bins

        for r in returns:
            idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
            bins[idx] += 1

        total = sum(bins)
        entropy = 0.0
        for count in bins:
            if count > 0:
                p = count / total
                entropy -= p * math.log2(p)

        return entropy

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        self.bars.append(bar)
        self.trades = []
        return bar

หมายเหตุเชิงปฏิบัติ: Entropy bars มีค่าใช้จ่ายในการคำนวณสูงและส่วนใหญ่เป็นที่สนใจในการวิจัย — แต่สำหรับกลยุทธ์ที่ใช้ ML จะสร้าง features ที่มีคุณสมบัติทางสถิติที่ดีกว่าเพราะแต่ละแท่งมี "ข้อมูล" ใกล้เคียงกัน

17. Delta Bars (Order Flow)

Delta bars and order flow Cumulative delta: วัดแรงสุทธิของผู้ซื้อ aggressive เทียบกับผู้ขายแบบ real-time

Delta bars สุ่มตัวอย่างตาม cumulative delta — ผลต่างที่กำลังวิ่งอยู่ระหว่าง buy volume และ sell volume ต่างจาก imbalance bars (ที่ใช้ tick signs ±1), delta bars ใช้ order flow ที่ถ่วงน้ำหนักตามปริมาณจริง

class DeltaBarGenerator:
    """
    Generates bars based on cumulative order flow delta.

    Delta = Buy Volume - Sell Volume (classified by aggressor side).

    Requires trade-level data with side classification
    (available from Binance aggTrades, Bybit trades, etc.)
    """

    def __init__(self, threshold: float = 500.0):
        self.threshold = threshold
        self.cumulative_delta = 0.0
        self.trades: list[tuple[int, float, float, int]] = []
        self.bars: list[OHLCV] = []

    def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
        side = -1 if is_buyer_maker else 1
        signed_qty = side * qty

        self.cumulative_delta += signed_qty
        self.trades.append((timestamp, price, qty, side))

        if abs(self.cumulative_delta) >= self.threshold:
            return self._close_bar()

        return None

    def _close_bar(self):
        prices = [t[1] for t in self.trades]
        volumes = [t[2] for t in self.trades]

        bar = OHLCV(
            timestamp=self.trades[-1][0],
            open=prices[0],
            high=max(prices),
            low=min(prices),
            close=prices[-1],
            volume=sum(volumes),
        )
        bar.delta = self.cumulative_delta  # type: ignore
        bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1)  # type: ignore
        bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1)  # type: ignore

        self.bars.append(bar)
        self.cumulative_delta = 0.0
        self.trades = []
        return bar

Delta divergence: หนึ่งในสัญญาณที่ทรงพลังที่สุด — ราคาขึ้นในขณะที่ cumulative delta เป็นลบ (ผู้ขาย aggressive แต่ราคายังขึ้น ซึ่งบ่งชี้การดูดซับ limit buy) เกี่ยวข้องโดยตรงกับแนวทาง behavioral fingerprinting ที่อธิบายในบทความ Digital Fingerprint: Trader Identification สำหรับ market makers ที่ใช้ Avellaneda-Stoikov model, delta bars ให้มุมมอง real-time ของ inventory risk และแรงกดดันจาก aggressor


Rolling window aggregation circular buffer ของแท่งพื้นฐาน: ข้อมูลใหม่เข้า ข้อมูลเก่าออก และแท่งที่รวบรวมนั้นถูกต้องเสมอ

วิธีการรวมข้อมูลกำหนดวิธีที่แท่งพื้นฐานถูกรวบรวมเป็นแท่งเทียนกรอบเวลาสูง (HTF) พวกมัน เป็นอิสระ จากประเภทแท่ง — คุณสามารถใช้วิธีการรวมข้อมูลใดก็ได้กับประเภทแท่งใดก็ได้

วิธี A: การรวมข้อมูลจัดตามปฏิทิน

รวบรวมแท่งพื้นฐานทั้งหมดที่ตกอยู่ภายในขอบเขตปฏิทินที่กำหนด แท่ง "1 ชั่วโมง" ครอบคลุมแท่งทั้งหมดตั้งแต่ 14:00:00 ถึง 14:59:59

คุณสมบัติ:

  • ผู้เข้าร่วมตลาดทั้งหมดเห็นขอบเขตเดียวกัน — จำเป็นสำหรับการวิเคราะห์โครงสร้างตลาด, support/resistance, PIQ triggers
  • ปัญหา cold start: แท่งที่ไม่สมบูรณ์หลังรีสตาร์ท
  • เป็นธรรมชาติสำหรับ time bars (สิ่งที่กระดานซื้อขายให้โดยกำเนิด)
  • ยังทำงานกับแท่งที่ไม่ใช่เวลา: "volume bars ทั้งหมดที่ปิดระหว่าง 14:00 ถึง 15:00" = แท่งรายชั่วโมงที่จัดตามปฏิทินจาก volume bars

วิธี B: การรวมข้อมูล Rolling Window

รวบรวม N แท่งพื้นฐานที่ปิดล่าสุด คำนวณใหม่ทุกแท่งใหม่ แท่ง "1 ชั่วโมง" rolling = 60 1-minute time bars ที่ปิดล่าสุด อัปเดตทุกนาที

หน่วยพื้นฐานคือแท่งพื้นฐานที่ปิดแล้ว การเลือกออกแบบนี้ให้:

  1. ไม่มี cold start. หลังจาก N แท่ง แท่งก็ถูกต้อง ไม่มีสัญญาณรบกวนจากแท่งที่ไม่สมบูรณ์
  2. ความเท่าเทียมกับ backtest. ถ้าการซื้อขายจริงใช้หน่วยพื้นฐานเดียวกับ backtest engine สัญญาณจะเหมือนกัน
  3. การตรวจสอบที่ง่าย. กฎเดียว: if buffer not full: skip
import numpy as np

class RollingCandleAggregator:
    """
    Produces rolling higher-timeframe candles from closed base bars.

    Works with ANY bar type: time bars, tick bars, volume bars,
    dollar bars, delta bars — anything that produces OHLCV output.

    Example: RollingCandleAggregator(window=60) with 1m time bars
    produces a "1h" candle updated every minute.

    Example: RollingCandleAggregator(window=24) with volume bars
    produces a candle spanning the last 24 volume bars.
    """

    def __init__(self, window: int):
        self.window = window
        self.buffer: deque[OHLCV] = deque(maxlen=window)

    def push(self, bar: OHLCV) -> OHLCV | None:
        """
        Add a closed base bar. Returns aggregated candle
        only when buffer is full (= candle is valid).
        """
        self.buffer.append(bar)

        if len(self.buffer) < self.window:
            return None

        return self._aggregate()

    def _aggregate(self) -> OHLCV:
        bars = list(self.buffer)
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

    @property
    def is_valid(self) -> bool:
        return len(self.buffer) == self.window

การแลกเปลี่ยน phase shift: Rolling candles ปิดที่ :37 ถ้าคุณเริ่มที่ :37 ไม่ใช่ที่ :00 เหมือนของทุกคน สิ่งนี้สำคัญสำหรับกลยุทธ์ที่ขึ้นอยู่กับระดับที่กลุ่มมองเห็น วิธีแก้ไข: ใช้ทั้งคู่ — ปฏิทินสำหรับโครงสร้างตลาด, rolling สำหรับสัญญาณ

วิธี C: Adaptive Rolling Aggregation

เหมือน rolling แต่ขนาด window ปรับตามความผันผวนปัจจุบัน ตลาดเงียบ → window กว้างขึ้น (smooth มากขึ้น) ตลาดผันผวน → window แคบลง (ตอบสนองเร็วขึ้น)

class AdaptiveRollingAggregator:
    """
    Rolling window where the window size adapts to volatility.

    Works with any base bar type. Uses ATR of recent bars
    as the volatility measure.

    Low volatility → wider window (more smoothing, fewer signals)
    High volatility → narrower window (faster reaction)
    """

    def __init__(
        self,
        base_window: int = 60,
        min_window: int = 15,
        max_window: int = 240,
        atr_period: int = 14,
        atr_base: float | None = None,
    ):
        self.base_window = base_window
        self.min_window = min_window
        self.max_window = max_window
        self.atr_period = atr_period
        self.atr_base = atr_base

        self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
        self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
        self.current_window = base_window

    def push(self, bar: OHLCV) -> OHLCV | None:
        self.all_candles.append(bar)

        tr = bar.high - bar.low
        self.atr_values.append(tr)

        if len(self.atr_values) < self.atr_period:
            return None

        current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period

        if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
            self.atr_base = sum(self.atr_values) / len(self.atr_values)

        if self.atr_base is None or self.atr_base == 0:
            return None

        vol_ratio = current_atr / self.atr_base
        self.current_window = int(self.base_window / vol_ratio)
        self.current_window = max(self.min_window, min(self.max_window, self.current_window))

        if len(self.all_candles) < self.current_window:
            return None

        bars = list(self.all_candles)[-self.current_window:]
        return OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )

ประเภทแท่งพื้นฐานทุกประเภทสามารถผสมผสานกับวิธีการรวมข้อมูลทุกวิธี การผสมผสานบางอย่างเป็นมาตรฐาน (calendar time bars = สิ่งที่กระดานซื้อขายให้คุณ) บางอย่างแปลกใหม่แต่ทรงพลัง

ตัวอย่างการผสมผสาน

ประเภทแท่งพื้นฐาน ปฏิทิน Rolling Adaptive
เวลา แท่งมาตรฐานของกระดานซื้อขาย HTF ที่ถูกต้องเสมอ ไม่มี cold start Timeframe ปรับตามความผันผวน
ปริมาณ "volume bars ทั้งหมดในชั่วโมงนี้" 24 volume bars ล่าสุด Window กว้างขึ้นในตลาดเงียบ
ดอลลาร์ Dollar-bar aggregate รายชั่วโมง N dollar bars ล่าสุด Dollar windows แบบ adaptive
Tick Imbalance Imbalance aggregate รายชั่วโมง N imbalance events ล่าสุด ตอบสนองเร็วในสภาวะที่ผันผวน
Delta Net order flow รายชั่วโมง Rolling delta snapshot Adaptive flow window
Renko "Bricks ในชั่วโมงนี้" N bricks ล่าสุด Brick count แบบ adaptive

Hybrid Engine: ปฏิทิน + Rolling

ในทางปฏิบัติ คุณต้องการทั้งการรวมข้อมูลแบบปฏิทินและ rolling พร้อมกัน overhead ของหน่วยความจำน้อยมาก — สอง deque buffers ต่อ timeframe ต่อ symbol

class HybridCandleEngine:
    """
    Maintains both calendar-aligned and rolling candles
    for any base bar type.

    Calendar candles: for market structure, support/resistance, PIQ.
    Rolling candles: for indicators, signal generation, entries/exits.
    """

    def __init__(self):
        self.rolling = {
            '1h': RollingCandleAggregator(60),
            '4h': RollingCandleAggregator(240),
        }
        self.calendar: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }
        self._calendar_buffer: dict[str, list[OHLCV]] = {
            '1h': [],
            '4h': [],
        }

    def on_bar(self, bar: OHLCV):
        """Process any base bar type — time, volume, tick, delta, etc."""
        rolling_results = {}
        for tf, agg in self.rolling.items():
            rolling_results[tf] = agg.push(bar)

        self._update_calendar(bar)

        return rolling_results

    def _update_calendar(self, bar: OHLCV):
        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)

        for tf, minutes in [('1h', 60), ('4h', 240)]:
            self._calendar_buffer[tf].append(bar)

            total_minutes = ts.hour * 60 + ts.minute
            if (total_minutes + 1) % minutes == 0:
                bars = self._calendar_buffer[tf]
                if bars:
                    agg = OHLCV(
                        timestamp=bars[-1].timestamp,
                        open=bars[0].open,
                        high=max(b.high for b in bars),
                        low=min(b.low for b in bars),
                        close=bars[-1].close,
                        volume=sum(b.volume for b in bars),
                    )
                    self.calendar[tf].append(agg)
                    self._calendar_buffer[tf] = []

Time-Volume Hybrid: ปฏิทินพร้อม Volume Splits

รูปแบบการรวมข้อมูลพิเศษ: แท่งที่จัดตามปฏิทินซึ่งบังคับปิดก่อนกำหนดเมื่อปริมาณเกิน threshold รักษาการซิงค์เวลาในขณะที่ปรับตัวต่อการพุ่งของกิจกรรม

class TimeVolumeHybridGenerator:
    """
    Calendar-aligned candles that split when volume spikes.

    Rule: close the candle at the calendar boundary OR when
    accumulated volume exceeds vol_threshold, whichever comes first.

    Works with any base bar type — the volume trigger adds an
    extra split dimension on top of calendar alignment.
    """

    def __init__(
        self,
        interval_minutes: int = 60,
        vol_threshold: float = 5000.0,
    ):
        self.interval_minutes = interval_minutes
        self.vol_threshold = vol_threshold

        self.buffer: list[OHLCV] = []
        self.accumulated_volume = 0.0
        self.bars: list[OHLCV] = []

    def on_bar(self, bar: OHLCV) -> OHLCV | None:
        self.buffer.append(bar)
        self.accumulated_volume += bar.volume

        from datetime import datetime
        ts = datetime.utcfromtimestamp(bar.timestamp)
        total_minutes = ts.hour * 60 + ts.minute
        at_boundary = (total_minutes + 1) % self.interval_minutes == 0

        vol_spike = self.accumulated_volume >= self.vol_threshold

        if at_boundary or vol_spike:
            return self._close_bar(split_reason='volume' if vol_spike else 'time')

        return None

    def _close_bar(self, split_reason: str) -> OHLCV:
        bars = self.buffer
        bar = OHLCV(
            timestamp=bars[-1].timestamp,
            open=bars[0].open,
            high=max(b.high for b in bars),
            low=min(b.low for b in bars),
            close=bars[-1].close,
            volume=sum(b.volume for b in bars),
        )
        bar.split_reason = split_reason  # type: ignore
        bar.num_bars = len(bars)  # type: ignore

        self.bars.append(bar)
        self.buffer = []
        self.accumulated_volume = 0.0
        return bar

การรวมข้อมูลเชิงปฏิบัติ: Cascading Preload

Cascading aggregation Cascading preload: รวบรวมแท่งรายวันจากรายชั่วโมง และรายชั่วโมงจากรายนาที — หลีกเลี่ยงข้อจำกัดของ API

กระดานซื้อขายจำกัดจำนวนข้อมูลประวัติศาสตร์ที่พวกเขาให้บริการ Binance ให้ ~1000 แท่งต่อคำขอ REST, OKX จำกัดที่ 300 ถ้าคุณต้องการแท่ง 1D rolling (1440 นาที) คุณไม่สามารถรับประวัติ 1m เพียงพอได้เสมอ สำหรับการ streaming แบบ real-time ของการซื้อขายและ order books ผ่าน WebSocket ดู CCXT Pro WebSocket Methods

วิธีแก้ไข: cascading aggregation — สร้าง timeframes ที่สูงขึ้นจากความละเอียดสูงสุดที่มีในแต่ละระดับ แล้วเย็บเข้าด้วยกัน

Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│   ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│   └── 1 partial hour:
│       └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain

สิ่งนี้ทำงานเพราะการรวบรวม OHLCV สามารถ compose ได้: high ของแท่ง 1D คือ max ของ 24 1h highs ซึ่งคือ max ของ 1440 1m highs

ข้อจำกัดหลาย Exchange

Exchange สูงสุด 1m Candles สูงสุด 1h Candles Intervals ที่น่าสังเกต
Binance 1,000 1,000 1m–1M, full range
Bybit 1,000 1,000 1–720, D/W/M
OKX 300 300 1m–1M (เข้มงวดกว่า)
Gate.io 1,000 1,000 10s–30d

การตรวจสอบความสอดคล้องของการรวบรวม

แท่ง 1h จาก REST API อาจไม่ตรงกับที่คุณจะคำนวณจาก 60 1m candles ตรวจสอบเสมอ:

def validate_aggregation(
    candle_htf: OHLCV,
    candles_ltf: list[OHLCV],
    tolerance_pct: float = 0.001,
) -> dict[str, bool]:
    agg = OHLCV(
        timestamp=candles_ltf[-1].timestamp,
        open=candles_ltf[0].open,
        high=max(c.high for c in candles_ltf),
        low=min(c.low for c in candles_ltf),
        close=candles_ltf[-1].close,
        volume=sum(c.volume for c in candles_ltf),
    )

    def close_enough(a: float, b: float) -> bool:
        if a == 0 and b == 0:
            return True
        return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct

    return {
        'open': close_enough(candle_htf.open, agg.open),
        'high': close_enough(candle_htf.high, agg.high),
        'low': close_enough(candle_htf.low, agg.low),
        'close': close_enough(candle_htf.close, agg.close),
        'volume': close_enough(candle_htf.volume, agg.volume),
    }

ถ้าการตรวจสอบล้มเหลวอย่างสม่ำเสมอ รวบรวมจาก 1m ด้วยตัวคุณเองเสมอ — อย่าเชื่อแท่ง HTF ของกระดานซื้อขายสำหรับ backtesting parity


เมทริกซ์การเปรียบเทียบ

แกนที่ 1: ประเภทแท่งพื้นฐาน

# ประเภทแท่ง ตัวกระตุ้น ต้องการข้อมูล Tick เหมาะที่สุดสำหรับ
1 เวลา ช่วงเวลาที่กำหนด ไม่ โครงสร้างตลาด, พฤติกรรมกลุ่ม
2 Tick N การซื้อขาย ใช่ ML features, การสุ่มตัวอย่างความคิดเห็นเท่าเทียม
3 ปริมาณ N หน่วยที่ซื้อขาย ใช่ การวิเคราะห์กิจกรรมที่ปรับให้เป็นมาตรฐาน
4 ดอลลาร์ $N notional ใช่ การเปรียบเทียบข้ามสินทรัพย์
5 Renko ราคา ± N หน่วย ไม่ การตาม trend, การกรองสัญญาณรบกวน
6 Range High-Low ≥ N ใช่ การตรวจจับ breakout
7 ความผันผวน Range แบบ adaptive ใช่ การวิเคราะห์ปรับตาม regime
8 Heikin-Ashi การแปลง ไม่ การยืนยัน trend (ราคาสังเคราะห์!)
9 Kagi การกลับตัวของราคา ไม่ โครงสร้าง supply/demand
10 Line Break N-line breakout ไม่ กรอง trend มหภาค
11 Point & Figure Box + reversal ไม่ การทำแผนที่ support/resistance
12 TIB Tick imbalance ใช่ การตรวจจับ informed flow
13 VIB Volume imbalance ใช่ การตรวจจับ large order
14 Run ความยาว run ใช่ การตรวจจับการแบ่ง order
15 CUSUM Cumulative return ไม่ (1m closes) เหตุการณ์ structural break
16 Entropy Shannon entropy ใช่ วิจัย ML, ความบริสุทธิ์ของ feature
17 Delta Order flow delta ใช่ (aggTrades) การวิเคราะห์ aggressor flow

แกนที่ 2: วิธีการรวมข้อมูล

วิธี การจัดเรียง Cold Start Phase Shift เหมาะที่สุดสำหรับ
ปฏิทิน นาฬิกา ความเสี่ยงแท่งที่ไม่สมบูรณ์ ไม่มี (จัดตามกลุ่ม) โครงสร้างตลาด, PIQ, S/R
Rolling N แท่ง ไม่มี (หลัง warmup) ใช่ (เลื่อนจาก :00) ตัวชี้วัด, สัญญาณ
Adaptive N ที่ขับเคลื่อนด้วยความผันผวน หลัง ATR calibration ใช่ กลยุทธ์ปรับตามความผันผวน

คำแนะนำเชิงปฏิบัติ

Layered architecture สถาปัตยกรรมแท่งเทียนสี่ชั้น: rolling signals, calendar structure, microstructure flow และ trend filters

ถ้า backtest engine ของคุณทำงานบนข้อมูล 1m OHLCV:

  1. Rolling time bars — การอัปเกรดที่ง่ายที่สุด ไม่ต้องการข้อมูลเพิ่มเติม ขจัด cold start
  2. Hybrid (rolling + calendar) time bars — ปฏิทินสำหรับโครงสร้างตลาด rolling สำหรับสัญญาณ
  3. CUSUM filter — ทำงานบน 1m closes ไม่ต้องการข้อมูล tick "มีบางอย่างเคลื่อนไหวพอที่จะน่าสนใจ"

ถ้าคุณมีข้อมูล tick/trade:

  1. Dollar bars + rolling — ค่าเริ่มต้นที่แนะนำจากวรรณกรรม quant finance
  2. Volume imbalance bars + rolling — ตรวจจับ informed flow สุ่มตัวอย่างมากขึ้นในช่วงเหตุการณ์สำคัญ
  3. Delta bars + calendar — ถ้าคุณมีการจำแนก aggressor-side มุมมองที่ตรงที่สุดว่าใครกำลังผลักดันตลาด

เป็น filters (ใช้ Heikin-Ashi หรือ Line Break เหนือการผสมผสาน base+aggregation ใดก็ได้):

  1. Heikin-Ashi เหนือ rolling volume bars — สัญญาณ trend ที่สะอาดบนข้อมูลที่ปรับตามกิจกรรม
  2. Line Break / Kagi เหนือ daily calendar bars — กรอง trend มหภาค

สำหรับ Marketmaker.cc โดยเฉพาะ — แนวทางแบบ layered:

  • Layer 1 (สัญญาณ): Rolling aggregation ของ time bars สำหรับตัวชี้วัดและสัญญาณ entry/exit ไม่มี cold start, backtest parity ที่สมบูรณ์แบบ
  • Layer 2 (โครงสร้างตลาด): Calendar-aligned time bars สำหรับ support/resistance, การวิเคราะห์ hourly close และ PIQ triggers
  • Layer 3 (microstructure): Volume imbalance bars + delta bars จาก raw trade stream สำหรับการตรวจจับ informed flow, การแบ่ง order และการคาดการณ์การเคลื่อนไหวขนาดใหญ่ ดูเพิ่มเติมที่ Digital Fingerprint: Trader Identification สำหรับการรู้จำรูปแบบพฤติกรรมบนข้อมูล order flow
  • Layer 4 (กรอง trend): การแปลง Heikin-Ashi บน rolling bars หรือ Line Break บน 4h calendar closes เพื่อรักษาสัญญาณให้สอดคล้องกับทิศทางมหภาค

บทสรุป

การสร้างแท่งเทียนไม่ใช่การตัดสินใจครั้งเดียว — มันเป็นสองการตัดสินใจที่เป็นอิสระ:

  1. แท่งประเภทไหน? เวลาจับช่วงเวลานาฬิกา กิจกรรม (tick, volume, dollar) จับการมีส่วนร่วมของตลาด ราคา (Renko, range, volatility) จับการเคลื่อนไหว ข้อมูล (imbalance, runs, CUSUM, entropy) จับการมาถึงของข้อมูลใหม่ Order flow (delta) จับแรงกดดัน aggressive

  2. วิธีรวบรวมเป็นกรอบเวลาที่สูงขึ้นอย่างไร? ปฏิทินจัดตามกลุ่ม Rolling ขจัด cold start Adaptive ตอบสนองต่อความผันผวน

"แท่ง 1 ชั่วโมงมาตรฐานจาก Binance" เป็นเพียงเซลล์เดียวในเมทริกซ์ 17×3 อีก 50 การผสมผสานมีให้สำหรับทุกคนที่ยินดีนำไปใช้ สำหรับระบบ production คำตอบคือ "เลือกการผสมผสานที่เหมาะสมสำหรับแต่ละชั้นของ decision engine ของคุณ"

หน่วยพื้นฐาน — แท่งพื้นฐานที่ปิดแล้ว — ยังคงเป็นรากฐาน ทุกสิ่งอื่นคือการรวบรวม

สำหรับข้อมูลเพิ่มเติมเกี่ยวกับความแม่นยำของ backtest ด้วยข้อมูลละเอียด ดู Adaptive Drill-Down: Backtest with Variable Granularity สำหรับผลกระทบของการ precomputation ตัวชี้วัดต่อกลยุทธ์หลาย timeframe ดู Aggregated Parquet Cache


ลิงก์ที่มีประโยชน์

  1. Lopez de Prado — Advances in Financial Machine Learning (2018)
  2. Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
  3. mlfinlab — Python library implementing information-driven bars
  4. Binance — Historical Market Data
  5. Apache Parquet — columnar storage format

การอ้างอิง

@article{soloviov2026bartypes,
  author = {Soloviov, Eugen},
  title = {Bar Types and Aggregation Methods for Algorithmic Trading},
  year = {2026},
  url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
  description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}
ข้อจำกัดความรับผิดชอบ: ข้อมูลที่ให้ไว้ในบทความนี้มีไว้เพื่อการศึกษาและให้ข้อมูลเท่านั้น และไม่ถือเป็นคำแนะนำทางการเงิน การลงทุน หรือการเทรด การเทรดสกุลเงินดิจิทัลมีความเสี่ยงสูงที่จะขาดทุน

ผู้เขียน

Eugen Soloviov
Eugen Soloviov

Trading-systems engineer

Trading-systems engineer building bots since 2017: cross-exchange arbitrage (connected up to 30 venues), cointegration-based pairs arbitrage across spot and futures, scalping, news and sentiment-driven strategies, trend algorithms, and portfolio management and balancing algorithms. Also builds sub-millisecond order execution, big-data warehouses, backtesting engines, AI agents, and trading interfaces (incl. open-source profitmaker.cc). Stack: JS/TS, Python, Rust/Zig/Go, DevOps, backend, frontend, architecture.

Newsletter

ก้าวนำหน้าตลาด

สมัครรับจดหมายข่าวของเราเพื่อรับข้อมูลเชิงลึกการเทรดด้วย AI เฉพาะ การวิเคราะห์ตลาด และการอัปเดตแพลตฟอร์ม

เราเคารพความเป็นส่วนตัวของคุณ ยกเลิกการสมัครได้ทุกเมื่อ