ประเภทแท่งเทียนและวิธีการรวมข้อมูลสำหรับการซื้อขายแบบ Algorithmic
ทุกกราฟแท่งเทียนที่คุณเคยเห็นบน Binance, TradingView หรือ UI ของกระดานซื้อขายใดก็ตาม ถูกสร้างขึ้นในแบบเดียวกัน: รวบรวมการซื้อขายภายในช่วงเวลาที่กำหนด — 1 นาที, 5 นาที, 1 ชั่วโมง — แล้วสร้างแท่ง OHLCV วิธีนี้แพร่หลายมากจนนักเทรดส่วนใหญ่ไม่เคยตั้งคำถาม แต่สำหรับการซื้อขายแบบ algorithmic การเลือกประเภทแท่งและวิธีการรวมข้อมูลเป็นสองการตัดสินใจที่เป็นอิสระจากกัน — และระบบส่วนใหญ่มักนำสองสิ่งนี้มารวมกันโดยไม่ตั้งใจ
บทความนี้แยกสองแกนของการสร้างแท่งเทียน: ประเภทของแท่ง ที่คุณสร้าง (17 ประเภท) และ วิธีการรวมข้อมูล เป็นกรอบเวลาที่สูงขึ้น (3 วิธี) การผสมผสานนี้ให้ 51 การกำหนดค่าที่เป็นไปได้ แต่ละแบบมีคุณสมบัติแตกต่างกันสำหรับการ backtesting, การซื้อขายจริง และการสร้างสัญญาณ
สำหรับบทแนะนำเกี่ยวกับวิธีที่การซื้อขายดิบกลายเป็นแท่งเทียนมาตรฐาน ดูได้ที่ Trading Candles Demystified
TL;DR
- การสร้างแท่งเทียนมี สองแกนที่เป็นอิสระ: ประเภทแท่งและวิธีการรวมข้อมูล
- 17 ประเภทแท่งพื้นฐาน: เวลา, tick, ปริมาณ, ดอลลาร์, Renko, range, ความผันผวน, Heikin-Ashi, Kagi, Line Break, P&F, tick imbalance (TIB), volume imbalance (VIB), run, CUSUM, entropy, delta
- 3 วิธีการรวมข้อมูล: จัดตามปฏิทิน, rolling window, adaptive rolling
- 17 × 3 = 51 การผสมผสานที่เป็นไปได้ แต่ละแบบมีคุณสมบัติแตกต่างกัน
- ระบบส่วนใหญ่ใช้เพียงการผสมผสานเดียว: แท่งเวลาจัดตามปฏิทิน อีก 50 แบบที่เหลือยังไม่ถูกนำมาใช้
- คำแนะนำเชิงปฏิบัติ: ใช้หลายการผสมผสานเป็นชั้นๆ — rolling time bars สำหรับสัญญาณ, calendar time bars สำหรับโครงสร้างตลาด, information-driven bars สำหรับ microstructure
สองแกนของการสร้างแท่งเทียน
มุมมองดั้งเดิมจัดประเภทแท่งทั้งหมดไว้ในรายการเดียว: time bars, tick bars, volume bars, Renko ฯลฯ ซึ่งทำให้เกิดความเข้าใจผิด ในความเป็นจริงมีการเลือกสองแบบที่ตั้งฉากกัน:
แกนที่ 1 — ประเภทแท่งพื้นฐาน (17 ประเภท): คุณตัดสินใจอย่างไรว่าแท่งใหม่จะปิดเมื่อใด? หลังจากช่วงเวลาที่กำหนด? หลังจาก N การซื้อขาย? หลังจากการเคลื่อนไหวของราคา? เมื่อเนื้อหาของข้อมูลเปลี่ยนแปลง? สิ่งนี้กำหนดว่า "หนึ่งแท่ง" หมายความว่าอะไร
แกนที่ 2 — วิธีการรวมข้อมูล (3 วิธี): คุณรวบรวมแท่งพื้นฐานเป็นแท่งเทียนกรอบเวลาที่สูงขึ้นอย่างไร? จัดตามขอบเขตปฏิทิน (00:00, 01:00, ...)? ใช้ rolling window ของ N แท่งล่าสุด? ปรับขนาด window ตามความผันผวน?
สองแกนนี้เป็นอิสระจากกัน คุณสามารถมี:
- Calendar-aligned tick bars — รวม tick bars ที่ปิดระหว่าง 14:00 ถึง 14:59 เป็นแท่งรายชั่วโมงเดียว
- Rolling volume bars — นำ 24 volume bars ล่าสุดโดยไม่คำนึงว่าปิดเมื่อใด
- Adaptive delta bars — ใช้ window ที่ขับเคลื่อนด้วยความผันผวนเหนือ delta bars
"แท่ง 1 ชั่วโมง" มาตรฐานเป็นเพียงจุดเดียวในเมทริกซ์ 17×3 นี้: time bars + calendar alignment ทุกการผสมผสานอื่นเป็นทางเลือกที่ควรพิจารณา
1. Time Bars (มาตรฐาน)
ความหนาแน่นของข้อมูลที่ไม่สม่ำเสมอ: ขอบเขตเวลาที่ตายตัวปฏิบัติต่อช่วงเวลาเงียบที่มี 200 การซื้อขายเหมือนกับช่วงเวลาประกาศที่มี 50,000 การซื้อขาย
ค่าเริ่มต้น แท่งใหม่เกิดขึ้นหลังจากช่วงเวลาที่กำหนด: 1 นาที, 5 นาที, 1 ชั่วโมง ทุกกระดานซื้อขายให้สิ่งเหล่านี้โดยกำเนิด
คุณสมบัติ:
- ในช่วง Asian session (00:00–08:00 UTC) แท่ง 1 ชั่วโมงอาจมี 200 การซื้อขาย ในระหว่างการประกาศ listing ของ Binance ช่วงเวลาเดียวกันอาจมี 50,000 การซื้อขาย Time bars ปฏิบัติต่อทั้งสองอย่างเหมือนกัน การตรวจจับการพุ่งของกิจกรรมดังกล่าวมีความสำคัญอย่างยิ่งสำหรับการป้องกันบอต — ดู Anomaly Detection for Trading Bots
- ผู้เข้าร่วมตลาดทั้งหมดเห็นขอบเขตแท่งเดียวกัน — Schelling point ทำให้ time bars จำเป็นสำหรับการวิเคราะห์พฤติกรรมของกลุ่ม
- ตัวชี้วัดที่คำนวณบนแท่งที่ยังไม่สมบูรณ์ (หลังรีสตาร์ท) ให้ค่าที่ผิดพลาด
from datetime import datetime
def time_until_valid_hourly_candle():
"""How long until the first complete hourly candle after restart."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
2–4. แท่งตามกิจกรรม
Tick, volume และ dollar bars: สามวิธีให้การมีส่วนร่วมของตลาด — ไม่ใช่นาฬิกา — กำหนดขอบเขตแท่ง
แทนที่จะสุ่มตัวอย่างที่ช่วงเวลาที่กำหนด ให้สุ่มตัวอย่างหลังจากปริมาณกิจกรรมของตลาดที่กำหนด วิธีนี้สร้างแท่งที่มี "เนื้อหาข้อมูล" ใกล้เคียงกันโดยไม่คำนึงถึงเวลาในวัน
2. Tick Bars
แท่งใหม่เกิดขึ้นหลังจากทุก N การซื้อขาย (tick) ในช่วงกิจกรรมสูง แท่งจะก่อตัวอย่างรวดเร็ว ในช่วงเงียบ แท่งเดียวอาจใช้เวลาหลายชั่วโมง
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Generates a new bar every `threshold` trades.
Each bar contains equal number of market "opinions".
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
ข้อดี: ปรับตัวตามกิจกรรมของตลาดได้ตามธรรมชาติ ผลตอบแทนจาก tick bars มักมีการกระจายตัวใกล้เคียง normal มากกว่าผลตอบแทนจาก time bars — คุณสมบัตินี้ปรับปรุงประสิทธิภาพของโมเดลทางสถิติหลายรูปแบบ
ข้อเสีย: ต้องการ trade stream ดิบ (ไม่มีจากผู้ให้บริการข้อมูลทุกรายสำหรับข้อมูลย้อนหลัง) เวลาของแท่งไม่สามารถคาดเดาได้ — คุณไม่สามารถบอกได้ว่า "แท่งถัดไปจะปิดที่ X"
3. Volume Bars
แท่งใหม่เกิดขึ้นหลังจาก N สัญญา (หรือเหรียญในกรณีของ crypto) ถูกซื้อขาย คล้ายกับ tick bars แต่ถ่วงน้ำหนักตามขนาดการซื้อขาย — การซื้อขาย 100 BTC ครั้งเดียวมีส่วนร่วม 100 เท่าของการซื้อขาย 1 BTC
class VolumeBarGenerator:
"""
Generates a new bar every `threshold` units of volume.
Normalizes for trade size: one large order ≠ one small order.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
4. Dollar Bars
แท่งใหม่เกิดขึ้นหลังจากมูลค่าตามสัญญาที่กำหนด (เป็น USD/USDT) ถูกแลกเปลี่ยน เป็นแท่งตามกิจกรรมที่แข็งแกร่งที่สุดเพราะปรับให้เป็นมาตรฐานทั้งจำนวนการซื้อขายและระดับราคา
พิจารณา: ถ้า ETH ไปจาก 4,000 การขาย ETH มูลค่า 4,000 แต่ต้องการ 10 ETH ที่ $1,000 Volume bars จะปฏิบัติต่อสิ่งเหล่านี้แตกต่างกัน; dollar bars ปฏิบัติต่อเหมือนกัน
class DollarBarGenerator:
"""
Generates a new bar every `threshold` dollars (USDT) of notional volume.
Most robust normalization: independent of price level.
Lopez de Prado (2018) recommends dollar bars as the default
for most quantitative applications.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
การเลือก Threshold
Threshold สำหรับแท่งตามกิจกรรมควรสร้างแท่งในจำนวนที่ใกล้เคียงกันต่อวันกับ time bars ที่คุณกำลังแทนที่ สำหรับ BTCUSDT บน Binance:
| ประเภทแท่ง | Threshold ทั่วไป | แท่ง/วัน (ประมาณ) | TF เทียบเท่า |
|---|---|---|---|
| Tick | 1,000 การซื้อขาย | ~1,400 | ~1m |
| Tick | 50,000 การซื้อขาย | ~28 | ~1h |
| Volume | 100 BTC | ~600 | ~2-3m |
| Volume | 2,400 BTC | ~25 | ~1h |
| Dollar | $1M | ~1,400 | ~1m |
| Dollar | $50M | ~28 | ~1h |
ตัวเลขเหล่านี้เป็นค่าประมาณและเปลี่ยนแปลงอย่างมากตามสภาวะตลาด ในระหว่างการพุ่งขึ้นหรือตก แท่งตามกิจกรรมจะสร้างแท่งมากกว่าปกติ 5-10 เท่า — ซึ่งนั่นคือจุดประสงค์ที่แท้จริง
5–7. แท่งตามราคา
Renko bricks, range bars และ volatility bars: สุ่มตัวอย่างเฉพาะเมื่อราคาเคลื่อนไหวมากพอ
แท่งตามราคาไม่สนใจทั้งเวลาและกิจกรรม แท่งใหม่เกิดขึ้นเฉพาะเมื่อราคาเคลื่อนไหวในปริมาณที่กำหนด สิ่งนี้กรองสัญญาณรบกวนในช่วง sideways และเน้น trend โดยธรรมชาติ
5. Renko Bars
"อิฐ" Renko ใหม่เกิดขึ้นเมื่อราคาปิดเคลื่อนไหวอย่างน้อย N หน่วยจากราคาปิดของอิฐก่อนหน้า อิฐมีขนาดเท่ากันเสมอ สร้างการแสดงผลที่สะอาดตาของทิศทาง trend
class RenkoBarGenerator:
"""
Generates Renko bricks based on price movement.
Key property: during sideways movement, no new bricks form.
During strong trends, bricks form rapidly.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
Dynamic Renko ใช้ ATR (Average True Range) แทนขนาดอิฐที่กำหนด ปรับตัวตามความผันผวนโดยอัตโนมัติ
6. Range Bars
แต่ละแท่งมี range สูง-ต่ำที่กำหนด เมื่อ range ถูกเกินไป แท่งจะปิดและเริ่มอันใหม่ ต่างจาก Renko, range bars มี wicks และสามารถแสดงความผันผวนภายในแท่งได้
class RangeBarGenerator:
"""
Generates bars with a fixed high-low range.
Difference from Renko: range bars show the full OHLC within
the range, not just brick direction. More information-rich.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
ความแตกต่างหลักระหว่าง Renko และ Range bars: Renko ติดตามเฉพาะราคาปิดและแสดงทิศทาง; range bars ติดตาม range ราคาเต็มและแสดงโครงสร้างภายในแท่ง Range bars มักมีประโยชน์มากกว่าสำหรับการซื้อขายแบบ algorithmic เพราะรักษาข้อมูล high-low ที่จำเป็นสำหรับการจำลอง stop-loss และ take-profit
7. Volatility Bars
แท่งใหม่เกิดขึ้นเมื่อความผันผวนภายในแท่งถึง threshold แบบ dynamic — ตัวอย่างเช่น เป็นทวีคูณของ ATR ล่าสุด ต่างจาก range bars (threshold ที่กำหนด), volatility bars ปรับตัวตามสภาวะตลาด
class VolatilityBarGenerator:
"""
Generates bars when intra-bar volatility reaches a threshold.
Similar to range bars, but the threshold adapts to market conditions
using a rolling ATR measure. In calm markets, bars need less
absolute movement to close; in volatile markets, more.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
8. Heikin-Ashi (การแปลงแบบ Smoothed)
Heikin-Ashi: การเฉลี่ยแปลงแท่งที่มีสัญญาณรบกวนให้เป็นสัญญาณ trend ที่ราบรื่น — แต่แลกมาด้วยการสูญเสียข้อมูลราคาที่แน่นอน
Heikin-Ashi (ภาษาญี่ปุ่นแปลว่า "แท่งเฉลี่ย") ไม่ใช่ประเภทแท่ง — มันเป็น การแปลง ที่สามารถนำไปใช้กับประเภทแท่งพื้นฐานใดก็ได้ มันทำให้แท่งเรียบโดยเฉลี่ยค่าแท่งปัจจุบันและก่อนหน้า:
- HA Close = (Open + High + Low + Close) / 4
- HA Open = (Previous HA Open + Previous HA Close) / 2
- HA High = max(High, HA Open, HA Close)
- HA Low = min(Low, HA Open, HA Close)
Trend ปรากฏเป็นลำดับแท่งสีเดียวกันโดยไม่มี lower wicks (uptrend) หรือไม่มี upper wicks (downtrend)
class HeikinAshiTransformer:
"""
Transforms standard OHLCV candles into Heikin-Ashi candles.
Can be applied on top of ANY bar type: time bars, volume bars,
rolling bars, etc. It's a transformation, not a sampling method.
WARNING: HA prices are synthetic — they don't represent real
traded prices. Never use HA close for order placement or
PnL calculation. Use HA only for signal generation, then
execute at real prices.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Transform an entire series. Resets state first."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Simple HA trend signal.
Returns:
+1: bullish (N consecutive green HA candles with no lower wick)
-1: bearish (N consecutive red HA candles with no upper wick)
0: no clear trend
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
ข้อควรระวังสำคัญสำหรับการ backtesting: ราคา Heikin-Ashi เป็นสังเคราะห์ ถ้า backtest ของคุณใช้ HA close เป็นราคาเข้า ผลลัพธ์จะผิดพลาด ใช้ HA สำหรับการสร้างสัญญาณเท่านั้นและดำเนินการที่ราคา OHLC จริงเสมอ
เมื่อไร HA มีประโยชน์: กลยุทธ์ตาม trend ที่ต้องการสัญญาณ "อยู่ใน" ที่สะอาด ใช้ HA กับประเภทแท่งพื้นฐานใดก็ได้ — time bars, volume bars, dollar bars — เพื่อกรอง crossovers ที่เป็นสัญญาณเท็จ
เมื่อไร HA เป็นอันตราย: กลยุทธ์ใดๆ ที่ต้องการระดับราคาที่แม่นยำ — support/resistance, การวิเคราะห์ order book, PIQ (Position In Queue) การเฉลี่ยทำลายข้อมูลราคาที่แน่นอน
9–11. กราฟ Reversal สไตล์ญี่ปุ่น
Kagi, Line Break และ Point & Figure: วิธีการเขียนกราฟที่ไม่ใช้เวลาซึ่งมุ่งเน้นที่โครงสร้างราคาล้วนๆ
วิธีการเขียนกราฟดั้งเดิมแบบญี่ปุ่นเหล่านี้ (พร้อมกับ Renko) ทิ้งเวลาโดยสิ้นเชิงและมุ่งเน้นที่โครงสร้างราคา
9. Kagi Charts
กราฟ Kagi ประกอบด้วยเส้นแนวตั้งที่เปลี่ยนทิศทางเมื่อราคากลับตัวในปริมาณที่กำหนด เส้นเปลี่ยนความหนาเมื่อราคาทำลาย high ก่อนหน้า (หนา = "yang" = demand) หรือ low ก่อนหน้า (บาง = "yin" = supply)
class KagiChartGenerator:
"""
Generates Kagi chart lines based on price reversals.
Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
of each move and changes line thickness at breakout points.
Useful for identifying support/resistance breaks and
supply/demand shifts without time noise.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=up, -1=down
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (thick) or 'yin' (thin)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
10. Line Break Charts
กราฟ Line Break วาดเส้นใหม่ (กล่อง) เฉพาะเมื่อราคาปิดเกิน high หรือ low ของ N เส้นก่อนหน้า (โดยทั่วไป 3) ไม่มีเส้นใหม่ถูกวาดหากราคาอยู่ในช่วงนั้น
class LineBreakGenerator:
"""
Generates Line Break bars (Three Line Break by default).
A new bar is drawn only when the close exceeds the high or low
of the last N bars. Filters out minor noise by requiring price
to break through a multi-bar range.
The 'N' parameter (line_count) controls sensitivity:
- N=2: more sensitive, more bars, more noise
- N=3: standard (Three Line Break)
- N=4+: less sensitive, fewer bars, stronger signals
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
11. Point & Figure Charts
กราฟ Point & Figure (P&F) ใช้คอลัมน์ของ X (ราคาขึ้น) และ O (ราคาลง) การเปลี่ยนคอลัมน์ต้องการการกลับตัวโดยทั่วไป 3 ขนาดกล่อง เป็นหนึ่งในวิธีที่เก่าแก่ที่สุดในการกรองสัญญาณรบกวนและระบุ support/resistance
class PointAndFigureGenerator:
"""
Generates Point & Figure chart data.
X column: price rising by box_size increments.
O column: price falling by box_size increments.
Column switch: requires reversal_boxes * box_size movement
in the opposite direction.
Classic setting: box_size based on ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
Kagi, Line Break และ P&F ในการซื้อขายแบบ algorithmic: ใช้เป็นหลักสำหรับการตรวจจับ trend ระยะยาวและการระบุ support/resistance ในฐานะ ชั้นกรอง — "อย่ารับสัญญาณ long เมื่อกราฟ Kagi อยู่ในโหมด yin" — จะเพิ่มมูลค่าโดยการจัดการซื้อขายให้สอดคล้องกับโครงสร้างมหภาค
12–14. แท่งขับเคลื่อนด้วยข้อมูล
Imbalance bars, run bars, CUSUM filters และ entropy bars: สุ่มตัวอย่างเมื่อตลาดบอกเราว่ามีบางอย่างเปลี่ยนแปลง
วิธีการที่ซับซ้อนที่สุด จาก Advances in Financial Machine Learning (2018) ของ Marcos Lopez de Prado แนวคิดหลัก: สุ่มตัวอย่างเมื่อข้อมูลใหม่มาถึงตลาด ไม่ใช่ที่ช่วงเวลาที่กำหนด
12. Tick Imbalance Bars (TIB)
ถ้าตลาดอยู่ในสมดุล การซื้อขายที่เริ่มต้นโดยผู้ซื้อและผู้ขายควรสมดุลกันโดยประมาณ เมื่อ imbalance เกินความคาดหวังของเรา บางอย่างได้เปลี่ยนแปลง ให้สุ่มตัวอย่างแท่ง ณ จุดนั้น
การซื้อขายแต่ละรายการถูกจัดประเภทว่าเริ่มต้นโดยผู้ซื้อ (+1) หรือผู้ขาย (-1) โดยใช้กฎ tick เราติดตาม cumulative imbalance θ และสุ่มตัวอย่างเมื่อ |θ| เกิน threshold แบบ dynamic
class TickImbalanceBarGenerator:
"""
Generates bars when the cumulative tick imbalance exceeds
expected levels — i.e., when "new information" arrives.
Based on Lopez de Prado (2018), Chapter 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classify trade as buy (+1) or sell (-1) using tick rule."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
13. Volume Imbalance Bars (VIB)
การขยาย TIBs: แทนที่จะนับแต่ละการซื้อขายเป็น ±1 ให้ถ่วงน้ำหนักตาม signed volume การซื้อ 100 BTC มีส่วนร่วม +100 การขาย 1 BTC มีส่วนร่วม -1 จับ orders ที่มีข้อมูลขนาดใหญ่ที่อาจถูกแบ่งเป็นหลายการซื้อขายเล็กๆ
class VolumeImbalanceBarGenerator:
"""
Like TIBs, but uses signed volume instead of signed ticks.
Captures the insight that a 100-BTC buy signal is 100x more
informative than a 1-BTC buy signal.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
ปัญหาการระเบิด
ปัญหาที่ทราบกันดีกับ imbalance bars: threshold ที่ใช้ EWMA สามารถเข้าสู่ positive feedback loop ได้ วิธีแก้ไข: จำกัดด้วยขอบเขต min_ticks และ max_ticks
self.expected_ticks = max(
self.min_ticks, # Floor: never less than 100 ticks
min(
self.max_ticks, # Ceiling: never more than 50000 ticks
new_expected_ticks
)
)
14. Run Bars
Run bars ติดตาม ความยาวของ directional run ปัจจุบัน — ลำดับต่อเนื่องที่ยาวที่สุดของการซื้อหรือขาย เมื่อผู้ซื้อขายที่มีข้อมูลรายใหญ่แบ่ง order เป็นการซื้อขายเล็กๆ หลายรายการ ลำดับจะยาวผิดปกติ Run bars ตรวจจับสิ่งนี้
class TickRunBarGenerator:
"""
Generates bars when the length of a directional run exceeds expectations.
Based on Lopez de Prado (2018), Chapter 2.
Difference from imbalance bars:
- Imbalance bars track NET imbalance (buys minus sells)
- Run bars track the MAXIMUM run length (consecutive buys OR sells)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
Run bars สามารถขยายเป็น volume runs และ dollar runs ได้
15. CUSUM Filter Bars
CUSUM (Cumulative Sum) filter กำหนด เมื่อ จะสุ่มตัวอย่างโดยการติดตาม cumulative returns ต่างจาก imbalance bars (ที่ทำงานกับการซื้อขายดิบ), CUSUM สามารถนำไปใช้กับข้อมูล 1m OHLCV ที่มีอยู่ — ไม่ต้องการข้อมูล tick
class CUSUMFilterBarGenerator:
"""
Symmetric CUSUM filter for event-based sampling.
Based on Lopez de Prado (2018), Chapter 2.5.
Key advantage over Bollinger Bands: CUSUM requires a FULL
run of threshold magnitude before triggering. Bollinger Bands
trigger repeatedly when price hovers near the band.
Can be applied to 1m OHLCV data — no tick data required.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + Triple Barrier Method: ในกรอบงานของ Lopez de Prado, CUSUM events ถูกใช้เป็นจุดเข้าสำหรับ Triple Barrier method — ที่แต่ละ event กระตุ้นการซื้อขายพร้อม stop-loss, take-profit และ expiration barriers สำหรับการตรวจสอบความถูกต้องของกลยุทธ์ที่ขับเคลื่อนด้วย event ดังกล่าว ดู Walk-Forward Optimization และ Monte Carlo Bootstrap for Backtesting
16. Entropy Bars
วิธีการที่สง่างามที่สุดในเชิงทฤษฎี: สุ่มตัวอย่างเมื่อเนื้อหาข้อมูล (Shannon entropy) ของ series ราคาภายในแท่งเกิน threshold
class EntropyBarGenerator:
"""
Generates bars when the entropy of intra-bar returns exceeds
a threshold.
Based on Shannon's information theory: bars are sampled when
"new information" arrives, measured as the entropy of the
return distribution within the current bar.
This is the most theoretically "pure" information-driven bar.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
หมายเหตุเชิงปฏิบัติ: Entropy bars มีค่าใช้จ่ายในการคำนวณสูงและส่วนใหญ่เป็นที่สนใจในการวิจัย — แต่สำหรับกลยุทธ์ที่ใช้ ML จะสร้าง features ที่มีคุณสมบัติทางสถิติที่ดีกว่าเพราะแต่ละแท่งมี "ข้อมูล" ใกล้เคียงกัน
17. Delta Bars (Order Flow)
Cumulative delta: วัดแรงสุทธิของผู้ซื้อ aggressive เทียบกับผู้ขายแบบ real-time
Delta bars สุ่มตัวอย่างตาม cumulative delta — ผลต่างที่กำลังวิ่งอยู่ระหว่าง buy volume และ sell volume ต่างจาก imbalance bars (ที่ใช้ tick signs ±1), delta bars ใช้ order flow ที่ถ่วงน้ำหนักตามปริมาณจริง
class DeltaBarGenerator:
"""
Generates bars based on cumulative order flow delta.
Delta = Buy Volume - Sell Volume (classified by aggressor side).
Requires trade-level data with side classification
(available from Binance aggTrades, Bybit trades, etc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
Delta divergence: หนึ่งในสัญญาณที่ทรงพลังที่สุด — ราคาขึ้นในขณะที่ cumulative delta เป็นลบ (ผู้ขาย aggressive แต่ราคายังขึ้น ซึ่งบ่งชี้การดูดซับ limit buy) เกี่ยวข้องโดยตรงกับแนวทาง behavioral fingerprinting ที่อธิบายในบทความ Digital Fingerprint: Trader Identification สำหรับ market makers ที่ใช้ Avellaneda-Stoikov model, delta bars ให้มุมมอง real-time ของ inventory risk และแรงกดดันจาก aggressor
circular buffer ของแท่งพื้นฐาน: ข้อมูลใหม่เข้า ข้อมูลเก่าออก และแท่งที่รวบรวมนั้นถูกต้องเสมอ
วิธีการรวมข้อมูลกำหนดวิธีที่แท่งพื้นฐานถูกรวบรวมเป็นแท่งเทียนกรอบเวลาสูง (HTF) พวกมัน เป็นอิสระ จากประเภทแท่ง — คุณสามารถใช้วิธีการรวมข้อมูลใดก็ได้กับประเภทแท่งใดก็ได้
วิธี A: การรวมข้อมูลจัดตามปฏิทิน
รวบรวมแท่งพื้นฐานทั้งหมดที่ตกอยู่ภายในขอบเขตปฏิทินที่กำหนด แท่ง "1 ชั่วโมง" ครอบคลุมแท่งทั้งหมดตั้งแต่ 14:00:00 ถึง 14:59:59
คุณสมบัติ:
- ผู้เข้าร่วมตลาดทั้งหมดเห็นขอบเขตเดียวกัน — จำเป็นสำหรับการวิเคราะห์โครงสร้างตลาด, support/resistance, PIQ triggers
- ปัญหา cold start: แท่งที่ไม่สมบูรณ์หลังรีสตาร์ท
- เป็นธรรมชาติสำหรับ time bars (สิ่งที่กระดานซื้อขายให้โดยกำเนิด)
- ยังทำงานกับแท่งที่ไม่ใช่เวลา: "volume bars ทั้งหมดที่ปิดระหว่าง 14:00 ถึง 15:00" = แท่งรายชั่วโมงที่จัดตามปฏิทินจาก volume bars
วิธี B: การรวมข้อมูล Rolling Window
รวบรวม N แท่งพื้นฐานที่ปิดล่าสุด คำนวณใหม่ทุกแท่งใหม่ แท่ง "1 ชั่วโมง" rolling = 60 1-minute time bars ที่ปิดล่าสุด อัปเดตทุกนาที
หน่วยพื้นฐานคือแท่งพื้นฐานที่ปิดแล้ว การเลือกออกแบบนี้ให้:
- ไม่มี cold start. หลังจาก N แท่ง แท่งก็ถูกต้อง ไม่มีสัญญาณรบกวนจากแท่งที่ไม่สมบูรณ์
- ความเท่าเทียมกับ backtest. ถ้าการซื้อขายจริงใช้หน่วยพื้นฐานเดียวกับ backtest engine สัญญาณจะเหมือนกัน
- การตรวจสอบที่ง่าย. กฎเดียว:
if buffer not full: skip
import numpy as np
class RollingCandleAggregator:
"""
Produces rolling higher-timeframe candles from closed base bars.
Works with ANY bar type: time bars, tick bars, volume bars,
dollar bars, delta bars — anything that produces OHLCV output.
Example: RollingCandleAggregator(window=60) with 1m time bars
produces a "1h" candle updated every minute.
Example: RollingCandleAggregator(window=24) with volume bars
produces a candle spanning the last 24 volume bars.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Add a closed base bar. Returns aggregated candle
only when buffer is full (= candle is valid).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
การแลกเปลี่ยน phase shift: Rolling candles ปิดที่ :37 ถ้าคุณเริ่มที่ :37 ไม่ใช่ที่ :00 เหมือนของทุกคน สิ่งนี้สำคัญสำหรับกลยุทธ์ที่ขึ้นอยู่กับระดับที่กลุ่มมองเห็น วิธีแก้ไข: ใช้ทั้งคู่ — ปฏิทินสำหรับโครงสร้างตลาด, rolling สำหรับสัญญาณ
วิธี C: Adaptive Rolling Aggregation
เหมือน rolling แต่ขนาด window ปรับตามความผันผวนปัจจุบัน ตลาดเงียบ → window กว้างขึ้น (smooth มากขึ้น) ตลาดผันผวน → window แคบลง (ตอบสนองเร็วขึ้น)
class AdaptiveRollingAggregator:
"""
Rolling window where the window size adapts to volatility.
Works with any base bar type. Uses ATR of recent bars
as the volatility measure.
Low volatility → wider window (more smoothing, fewer signals)
High volatility → narrower window (faster reaction)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
ประเภทแท่งพื้นฐานทุกประเภทสามารถผสมผสานกับวิธีการรวมข้อมูลทุกวิธี การผสมผสานบางอย่างเป็นมาตรฐาน (calendar time bars = สิ่งที่กระดานซื้อขายให้คุณ) บางอย่างแปลกใหม่แต่ทรงพลัง
ตัวอย่างการผสมผสาน
| ประเภทแท่งพื้นฐาน | ปฏิทิน | Rolling | Adaptive |
|---|---|---|---|
| เวลา | แท่งมาตรฐานของกระดานซื้อขาย | HTF ที่ถูกต้องเสมอ ไม่มี cold start | Timeframe ปรับตามความผันผวน |
| ปริมาณ | "volume bars ทั้งหมดในชั่วโมงนี้" | 24 volume bars ล่าสุด | Window กว้างขึ้นในตลาดเงียบ |
| ดอลลาร์ | Dollar-bar aggregate รายชั่วโมง | N dollar bars ล่าสุด | Dollar windows แบบ adaptive |
| Tick Imbalance | Imbalance aggregate รายชั่วโมง | N imbalance events ล่าสุด | ตอบสนองเร็วในสภาวะที่ผันผวน |
| Delta | Net order flow รายชั่วโมง | Rolling delta snapshot | Adaptive flow window |
| Renko | "Bricks ในชั่วโมงนี้" | N bricks ล่าสุด | Brick count แบบ adaptive |
Hybrid Engine: ปฏิทิน + Rolling
ในทางปฏิบัติ คุณต้องการทั้งการรวมข้อมูลแบบปฏิทินและ rolling พร้อมกัน overhead ของหน่วยความจำน้อยมาก — สอง deque buffers ต่อ timeframe ต่อ symbol
class HybridCandleEngine:
"""
Maintains both calendar-aligned and rolling candles
for any base bar type.
Calendar candles: for market structure, support/resistance, PIQ.
Rolling candles: for indicators, signal generation, entries/exits.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Process any base bar type — time, volume, tick, delta, etc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
Time-Volume Hybrid: ปฏิทินพร้อม Volume Splits
รูปแบบการรวมข้อมูลพิเศษ: แท่งที่จัดตามปฏิทินซึ่งบังคับปิดก่อนกำหนดเมื่อปริมาณเกิน threshold รักษาการซิงค์เวลาในขณะที่ปรับตัวต่อการพุ่งของกิจกรรม
class TimeVolumeHybridGenerator:
"""
Calendar-aligned candles that split when volume spikes.
Rule: close the candle at the calendar boundary OR when
accumulated volume exceeds vol_threshold, whichever comes first.
Works with any base bar type — the volume trigger adds an
extra split dimension on top of calendar alignment.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
การรวมข้อมูลเชิงปฏิบัติ: Cascading Preload
Cascading preload: รวบรวมแท่งรายวันจากรายชั่วโมง และรายชั่วโมงจากรายนาที — หลีกเลี่ยงข้อจำกัดของ API
กระดานซื้อขายจำกัดจำนวนข้อมูลประวัติศาสตร์ที่พวกเขาให้บริการ Binance ให้ ~1000 แท่งต่อคำขอ REST, OKX จำกัดที่ 300 ถ้าคุณต้องการแท่ง 1D rolling (1440 นาที) คุณไม่สามารถรับประวัติ 1m เพียงพอได้เสมอ สำหรับการ streaming แบบ real-time ของการซื้อขายและ order books ผ่าน WebSocket ดู CCXT Pro WebSocket Methods
วิธีแก้ไข: cascading aggregation — สร้าง timeframes ที่สูงขึ้นจากความละเอียดสูงสุดที่มีในแต่ละระดับ แล้วเย็บเข้าด้วยกัน
Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│ ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│ └── 1 partial hour:
│ └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain
สิ่งนี้ทำงานเพราะการรวบรวม OHLCV สามารถ compose ได้: high ของแท่ง 1D คือ max ของ 24 1h highs ซึ่งคือ max ของ 1440 1m highs
ข้อจำกัดหลาย Exchange
| Exchange | สูงสุด 1m Candles | สูงสุด 1h Candles | Intervals ที่น่าสังเกต |
|---|---|---|---|
| Binance | 1,000 | 1,000 | 1m–1M, full range |
| Bybit | 1,000 | 1,000 | 1–720, D/W/M |
| OKX | 300 | 300 | 1m–1M (เข้มงวดกว่า) |
| Gate.io | 1,000 | 1,000 | 10s–30d |
การตรวจสอบความสอดคล้องของการรวบรวม
แท่ง 1h จาก REST API อาจไม่ตรงกับที่คุณจะคำนวณจาก 60 1m candles ตรวจสอบเสมอ:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
ถ้าการตรวจสอบล้มเหลวอย่างสม่ำเสมอ รวบรวมจาก 1m ด้วยตัวคุณเองเสมอ — อย่าเชื่อแท่ง HTF ของกระดานซื้อขายสำหรับ backtesting parity
เมทริกซ์การเปรียบเทียบ
แกนที่ 1: ประเภทแท่งพื้นฐาน
| # | ประเภทแท่ง | ตัวกระตุ้น | ต้องการข้อมูล Tick | เหมาะที่สุดสำหรับ |
|---|---|---|---|---|
| 1 | เวลา | ช่วงเวลาที่กำหนด | ไม่ | โครงสร้างตลาด, พฤติกรรมกลุ่ม |
| 2 | Tick | N การซื้อขาย | ใช่ | ML features, การสุ่มตัวอย่างความคิดเห็นเท่าเทียม |
| 3 | ปริมาณ | N หน่วยที่ซื้อขาย | ใช่ | การวิเคราะห์กิจกรรมที่ปรับให้เป็นมาตรฐาน |
| 4 | ดอลลาร์ | $N notional | ใช่ | การเปรียบเทียบข้ามสินทรัพย์ |
| 5 | Renko | ราคา ± N หน่วย | ไม่ | การตาม trend, การกรองสัญญาณรบกวน |
| 6 | Range | High-Low ≥ N | ใช่ | การตรวจจับ breakout |
| 7 | ความผันผวน | Range แบบ adaptive | ใช่ | การวิเคราะห์ปรับตาม regime |
| 8 | Heikin-Ashi | การแปลง | ไม่ | การยืนยัน trend (ราคาสังเคราะห์!) |
| 9 | Kagi | การกลับตัวของราคา | ไม่ | โครงสร้าง supply/demand |
| 10 | Line Break | N-line breakout | ไม่ | กรอง trend มหภาค |
| 11 | Point & Figure | Box + reversal | ไม่ | การทำแผนที่ support/resistance |
| 12 | TIB | Tick imbalance | ใช่ | การตรวจจับ informed flow |
| 13 | VIB | Volume imbalance | ใช่ | การตรวจจับ large order |
| 14 | Run | ความยาว run | ใช่ | การตรวจจับการแบ่ง order |
| 15 | CUSUM | Cumulative return | ไม่ (1m closes) | เหตุการณ์ structural break |
| 16 | Entropy | Shannon entropy | ใช่ | วิจัย ML, ความบริสุทธิ์ของ feature |
| 17 | Delta | Order flow delta | ใช่ (aggTrades) | การวิเคราะห์ aggressor flow |
แกนที่ 2: วิธีการรวมข้อมูล
| วิธี | การจัดเรียง | Cold Start | Phase Shift | เหมาะที่สุดสำหรับ |
|---|---|---|---|---|
| ปฏิทิน | นาฬิกา | ความเสี่ยงแท่งที่ไม่สมบูรณ์ | ไม่มี (จัดตามกลุ่ม) | โครงสร้างตลาด, PIQ, S/R |
| Rolling | N แท่ง | ไม่มี (หลัง warmup) | ใช่ (เลื่อนจาก :00) | ตัวชี้วัด, สัญญาณ |
| Adaptive | N ที่ขับเคลื่อนด้วยความผันผวน | หลัง ATR calibration | ใช่ | กลยุทธ์ปรับตามความผันผวน |
คำแนะนำเชิงปฏิบัติ
สถาปัตยกรรมแท่งเทียนสี่ชั้น: rolling signals, calendar structure, microstructure flow และ trend filters
ถ้า backtest engine ของคุณทำงานบนข้อมูล 1m OHLCV:
- Rolling time bars — การอัปเกรดที่ง่ายที่สุด ไม่ต้องการข้อมูลเพิ่มเติม ขจัด cold start
- Hybrid (rolling + calendar) time bars — ปฏิทินสำหรับโครงสร้างตลาด rolling สำหรับสัญญาณ
- CUSUM filter — ทำงานบน 1m closes ไม่ต้องการข้อมูล tick "มีบางอย่างเคลื่อนไหวพอที่จะน่าสนใจ"
ถ้าคุณมีข้อมูล tick/trade:
- Dollar bars + rolling — ค่าเริ่มต้นที่แนะนำจากวรรณกรรม quant finance
- Volume imbalance bars + rolling — ตรวจจับ informed flow สุ่มตัวอย่างมากขึ้นในช่วงเหตุการณ์สำคัญ
- Delta bars + calendar — ถ้าคุณมีการจำแนก aggressor-side มุมมองที่ตรงที่สุดว่าใครกำลังผลักดันตลาด
เป็น filters (ใช้ Heikin-Ashi หรือ Line Break เหนือการผสมผสาน base+aggregation ใดก็ได้):
- Heikin-Ashi เหนือ rolling volume bars — สัญญาณ trend ที่สะอาดบนข้อมูลที่ปรับตามกิจกรรม
- Line Break / Kagi เหนือ daily calendar bars — กรอง trend มหภาค
สำหรับ Marketmaker.cc โดยเฉพาะ — แนวทางแบบ layered:
- Layer 1 (สัญญาณ): Rolling aggregation ของ time bars สำหรับตัวชี้วัดและสัญญาณ entry/exit ไม่มี cold start, backtest parity ที่สมบูรณ์แบบ
- Layer 2 (โครงสร้างตลาด): Calendar-aligned time bars สำหรับ support/resistance, การวิเคราะห์ hourly close และ PIQ triggers
- Layer 3 (microstructure): Volume imbalance bars + delta bars จาก raw trade stream สำหรับการตรวจจับ informed flow, การแบ่ง order และการคาดการณ์การเคลื่อนไหวขนาดใหญ่ ดูเพิ่มเติมที่ Digital Fingerprint: Trader Identification สำหรับการรู้จำรูปแบบพฤติกรรมบนข้อมูล order flow
- Layer 4 (กรอง trend): การแปลง Heikin-Ashi บน rolling bars หรือ Line Break บน 4h calendar closes เพื่อรักษาสัญญาณให้สอดคล้องกับทิศทางมหภาค
บทสรุป
การสร้างแท่งเทียนไม่ใช่การตัดสินใจครั้งเดียว — มันเป็นสองการตัดสินใจที่เป็นอิสระ:
-
แท่งประเภทไหน? เวลาจับช่วงเวลานาฬิกา กิจกรรม (tick, volume, dollar) จับการมีส่วนร่วมของตลาด ราคา (Renko, range, volatility) จับการเคลื่อนไหว ข้อมูล (imbalance, runs, CUSUM, entropy) จับการมาถึงของข้อมูลใหม่ Order flow (delta) จับแรงกดดัน aggressive
-
วิธีรวบรวมเป็นกรอบเวลาที่สูงขึ้นอย่างไร? ปฏิทินจัดตามกลุ่ม Rolling ขจัด cold start Adaptive ตอบสนองต่อความผันผวน
"แท่ง 1 ชั่วโมงมาตรฐานจาก Binance" เป็นเพียงเซลล์เดียวในเมทริกซ์ 17×3 อีก 50 การผสมผสานมีให้สำหรับทุกคนที่ยินดีนำไปใช้ สำหรับระบบ production คำตอบคือ "เลือกการผสมผสานที่เหมาะสมสำหรับแต่ละชั้นของ decision engine ของคุณ"
หน่วยพื้นฐาน — แท่งพื้นฐานที่ปิดแล้ว — ยังคงเป็นรากฐาน ทุกสิ่งอื่นคือการรวบรวม
สำหรับข้อมูลเพิ่มเติมเกี่ยวกับความแม่นยำของ backtest ด้วยข้อมูลละเอียด ดู Adaptive Drill-Down: Backtest with Variable Granularity สำหรับผลกระทบของการ precomputation ตัวชี้วัดต่อกลยุทธ์หลาย timeframe ดู Aggregated Parquet Cache
ลิงก์ที่มีประโยชน์
- Lopez de Prado — Advances in Financial Machine Learning (2018)
- Easley, Lopez de Prado, O'Hara — The Volume Clock: Insights into the High Frequency Paradigm (2012)
- mlfinlab — Python library implementing information-driven bars
- Binance — Historical Market Data
- Apache Parquet — columnar storage format
การอ้างอิง
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}
ผู้เขียน
Trading-systems engineer
Trading-systems engineer building bots since 2017: cross-exchange arbitrage (connected up to 30 venues), cointegration-based pairs arbitrage across spot and futures, scalping, news and sentiment-driven strategies, trend algorithms, and portfolio management and balancing algorithms. Also builds sub-millisecond order execution, big-data warehouses, backtesting engines, AI agents, and trading interfaces (incl. open-source profitmaker.cc). Stack: JS/TS, Python, Rust/Zig/Go, DevOps, backend, frontend, architecture.