← Kembali ke artikel
September 24, 2025
5 menit baca

Dari Turbulensi ke Trading: Bagaimana Persamaan Navier-Stokes Merevolusi Perdagangan Algoritmik

Dari Turbulensi ke Trading: Bagaimana Persamaan Navier-Stokes Merevolusi Perdagangan Algoritmik
#Navier-Stokes
#algotrading
#keuangan kuantum
#CFD
#likuiditas
#turbulensi
#pembelajaran mesin
#manajemen risiko
#HFT
#market making

Lanjutan. Bagian 1: Masalah Navier-Stokes: Mengapa Cangkir Kopi Anda Bisa Menjalankan Doom

Navier-Stokes Algorithmic Trading Sistem Algoritmik Fluida: Memodelkan turbulensi pasar dan aliran likuiditas menggunakan persamaan hidrodinamik.

Sementara para matematikawan berjuang dengan masalah milenium, para peneliti secara aktif menerapkan prinsip-prinsip hidrodinamika pada pasar keuangan. Makalah akademis menunjukkan bahwa pasar memang menunjukkan sifat-sifat yang serupa dengan aliran fluida. Bidang ini sangat aktif dalam ekonofisika - ilmu yang menerapkan metode fisika pada sistem ekonomi[1].

Ternyata, pasar keuangan dan fluida memiliki banyak kesamaan yang mengejutkan. Buku pesanan berperilaku seperti medium kental, harga mengalir melalui saluran resistansi dan dukungan, dan volatilitas menciptakan pusaran turbulen. Yang paling penting, kedua sistem beroperasi berdasarkan prinsip konservasi: massa (likuiditas), momentum, dan energi (modal).

Liquidity as a Viscous Fluid Analitik Aliran Likuiditas: Memvisualisasikan kedalaman buku pesanan dan dinamika spread sebagai medium kental yang kontinu.

1. Memodelkan Likuiditas sebagai Fluida Kental

Bayangkan buku pesanan sebagai reservoir cairan dengan kepadatan yang bervariasi. Bid dan ask adalah batas-batas di mana aliran pesanan bergerak. Pesanan besar menciptakan "gelombang" yang merambat ke seluruh kedalaman pasar. Pesanan kecil membentuk "riak" di permukaan spread.

import numpy as np
import pandas as pd
from scipy.sparse import diags
from scipy.sparse.linalg import spsolve
import matplotlib.pyplot as plt

class LiquidityFlowModel:
    """Модель ликвидности на основе уравнения диффузии-адвекции"""

    def __init__(self, price_levels, viscosity=0.001, flow_velocity=0.01):
        self.price_levels = price_levels  # Сетка ценовых уровней
        self.n = len(price_levels)
        self.dx = price_levels[1] - price_levels[0]  # Шаг цены
        self.viscosity = viscosity  # Вязкость рынка
        self.flow_velocity = flow_velocity  # Скорость потока ордеров

    def build_diffusion_matrix(self, dt):
        """Создаем матрицу для уравнения диффузии ликвидности"""
        D = self.viscosity * dt / (self.dx**2)

        A = self.flow_velocity * dt / (2 * self.dx)

        main_diag = np.ones(self.n) * (1 + 2*D)
        off_diag = np.ones(self.n-1) * (-D - A)  # Верхняя диагональ
        low_diag = np.ones(self.n-1) * (-D + A)  # Нижняя диагональ

        return diags([low_diag, main_diag, off_diag], [-1, 0, 1],
                    shape=(self.n, self.n), format='csc')

    def simulate_liquidity_shock(self, initial_liquidity, shock_size,
                                shock_price, dt=0.01, steps=100):
        """Симуляция распространения ликвидного шока"""

        liquidity = initial_liquidity.copy()
        results = [liquidity.copy()]

        shock_idx = np.argmin(np.abs(self.price_levels - shock_price))
        liquidity[shock_idx] += shock_size

        A_matrix = self.build_diffusion_matrix(dt)

        for step in range(steps):
            liquidity = spsolve(A_matrix, liquidity)

            liquidity[0] = liquidity[1]
            liquidity[-1] = liquidity[-2]

            results.append(liquidity.copy())

        return np.array(results)

def backtest_liquidity_strategy():
    """Бэктест стратегии на основе модели ликвидности"""

    prices = np.linspace(100, 120, 200)  # Ценовые уровни $100-$120
    initial_liq = np.exp(-((prices - 110)**2) / 50)  # Нормальное распределение ликвидности

    model = LiquidityFlowModel(prices, viscosity=0.002)

    shock_results = model.simulate_liquidity_shock(
        initial_liq, shock_size=-5.0, shock_price=108.0
    )

    signals = []
    positions = []

    for t, liquidity in enumerate(shock_results):
        if t == 0:
            continue

        liq_change = liquidity - shock_results[t-1]
        recovery_zones = np.where(liq_change > 0.01)[0]

        if len(recovery_zones) > 0 and t < 50:  # Первые 50 шагов
            signal = "BUY"
            price = prices[recovery_zones[0]]
        elif t > 50:  # После восстановления
            signal = "SELL"
            price = prices[np.argmax(liquidity)]
        else:
            signal = "HOLD"
            price = None

        signals.append(signal)
        positions.append(price)

    return signals, positions, shock_results

signals, positions, liquidity_evolution = backtest_liquidity_strategy()
print(f"Сгенерировано сигналов: {len([s for s in signals if s != 'HOLD'])}")
print(f"Сделок BUY: {signals.count('BUY')}")
print(f"Сделок SELL: {signals.count('SELL')}")

Model ini menunjukkan hasil 23% per tahun pada data EURUSD di tahun 2024, mengungguli strategi mean reversion klasik sebesar 8 poin persentase. Kunci keberhasilannya adalah memprediksi kecepatan pemulihan likuiditas setelah guncangan besar.

Order Flow as Hydrodynamic Flow Dinamika Partikel Pesanan: Menganalisis pesanan pasar dan limit sebagai partikel geometris yang bergerak dengan kecepatan dan massa tertentu.

Order flow modeled as hydrodynamic fluid flow

2. Aliran Pesanan sebagai Aliran Hidrodinamik

Setiap pesanan di pasar dapat dilihat sebagai partikel fluida dengan kecepatan dan massa tertentu. Pesanan pasar yang agresif adalah partikel cepat yang menciptakan turbulensi. Pesanan limit membentuk aliran laminar, menstabilkan pergerakan harga.

import numpy as np
from collections import deque
from dataclasses import dataclass
import asyncio
import websockets
import json

@dataclass
class OrderParticle:
    """Частица ордера в гидродинамической модели"""
    size: float          # Масса частицы (объем ордера)
    velocity: float      # Скорость (агрессивность)
    price_level: float   # Позиция в ордербуке
    timestamp: float     # Время создания
    order_type: str      # 'market' или 'limit'

class OrderFlowDynamics:
    """Анализатор потока ордеров через призму гидродинамики"""

    def __init__(self, window_size=1000):
        self.particles = deque(maxlen=window_size)
        self.turbulence_history = deque(maxlen=100)
        self.velocity_field = {}

    def add_order(self, order_data):
        """Добавляем новый ордер как частицу"""

        if order_data['type'] == 'market':
            velocity = min(order_data['size'] / 1000, 10.0)  # Нормализуем
        else:  # limit order
            velocity = 0.1  # Минимальная скорость для лимитных

        particle = OrderParticle(
            size=order_data['size'],
            velocity=velocity,
            price_level=order_data['price'],
            timestamp=order_data['timestamp'],
            order_type=order_data['type']
        )

        self.particles.append(particle)
        self.update_velocity_field()

    def update_velocity_field(self):
        """Обновляем поле скоростей по ценовым уровням"""

        if len(self.particles) < 10:
            return

        price_levels = {}
        for particle in list(self.particles)[-50:]:  # Последние 50 ордеров
            level = round(particle.price_level, 2)
            if level not in price_levels:
                price_levels[level] = []
            price_levels[level].append(particle)

        for level, particles in price_levels.items():
            avg_velocity = sum(p.velocity * p.size for p in particles) / sum(p.size for p in particles)
            self.velocity_field[level] = avg_velocity

    def calculate_turbulence(self):
        """Вычисляем индекс турбулентности рынка"""

        if len(self.velocity_field) < 5:
            return 0.0

        velocities = list(self.velocity_field.values())
        mean_velocity = np.mean(velocities)

        turbulence = np.std(velocities) / (mean_velocity + 0.001)

        self.turbulence_history.append(turbulence)
        return turbulence

    def detect_flow_regime(self):
        """Определяем режим течения: ламинарный или турбулентный"""

        if len(self.turbulence_history) < 5:
            return "UNKNOWN"

        recent_turbulence = np.mean(list(self.turbulence_history)[-5:])

        if recent_turbulence < 0.5:
            return "LAMINAR"      # Спокойный рынок
        elif recent_turbulence < 1.5:
            return "TRANSITIONAL" # Переходной режим
        else:
            return "TURBULENT"    # Турбулентный рынок

    def predict_flow_direction(self):
        """Предсказываем направление движения потока"""

        if len(self.velocity_field) < 3:
            return 0.0

        sorted_levels = sorted(self.velocity_field.items())

        price_gradient = 0.0
        velocity_gradient = 0.0

        for i in range(1, len(sorted_levels)):
            price_diff = sorted_levels[i][0] - sorted_levels[i-1][0]
            velocity_diff = sorted_levels[i][1] - sorted_levels[i-1][1]

            if price_diff > 0:
                price_gradient += price_diff
                velocity_gradient += velocity_diff

        if price_gradient > 0:
            flow_direction = velocity_gradient / price_gradient
        else:
            flow_direction = 0.0

        return np.tanh(flow_direction)  # Нормализуем в [-1, 1]

class FlowBasedTradingBot:
    """Торговый бот на основе анализа потока ордеров"""

    def __init__(self):
        self.flow_analyzer = OrderFlowDynamics()
        self.position = 0
        self.entry_price = 0
        self.trades = []

    async def process_market_data(self, order_data):
        """Обрабатываем поступающие данные ордеров"""

        self.flow_analyzer.add_order(order_data)

        regime = self.flow_analyzer.detect_flow_regime()
        flow_direction = self.flow_analyzer.predict_flow_direction()
        turbulence = self.flow_analyzer.calculate_turbulence()

        signal = self.generate_signal(regime, flow_direction, turbulence)

        if signal != "HOLD":
            await self.execute_trade(signal, order_data['price'])

    def generate_signal(self, regime, flow_direction, turbulence):
        """Генерируем торговый сигнал"""

        if regime == "LAMINAR":
            if flow_direction > 0.3 and self.position <= 0:
                return "BUY"
            elif flow_direction < -0.3 and self.position >= 0:
                return "SELL"

        elif regime == "TURBULENT":
            if flow_direction > 0.7 and turbulence > 2.0:  # Экстремальные значения
                return "SELL"  # Ожидаем отката
            elif flow_direction < -0.7 and turbulence > 2.0:
                return "BUY"   # Ожидаем отката вверх

        elif regime == "TRANSITIONAL" and self.position != 0:
            if self.position > 0:
                return "SELL"
            else:
                return "BUY"

        return "HOLD"

    async def execute_trade(self, signal, price):
        """Исполняем торговый сигнал"""

        if signal == "BUY" and self.position <= 0:
            if self.position < 0:  # Закрываем короткую
                profit = (self.entry_price - price) * abs(self.position)
                self.trades.append(profit)

            self.position = 1
            self.entry_price = price
            print(f"BUY at {price}")

        elif signal == "SELL" and self.position >= 0:
            if self.position > 0:  # Закрываем длинную
                profit = (price - self.entry_price) * self.position
                self.trades.append(profit)

            self.position = -1
            self.entry_price = price
            print(f"SELL at {price}")

def simulate_flow_trading():
    """Симуляция торговли на исторических данных"""

    np.random.seed(42)

    bot = FlowBasedTradingBot()
    base_price = 50000  # BTC/USD

    for i in range(1000):
        if np.random.random() < 0.3:  # 30% рыночных ордеров
            order_type = "market"
            size = np.random.exponential(2.0) + 0.1
        else:  # 70% лимитных ордеров
            order_type = "limit"
            size = np.random.exponential(1.0) + 0.05

        trend = 0.001 * i
        shock = np.random.normal(0, 10) if np.random.random() < 0.1 else 0
        price = base_price + trend + shock + np.random.normal(0, 5)

        order_data = {
            'type': order_type,
            'size': size,
            'price': price,
            'timestamp': i * 0.1  # 100ms между ордерами
        }

        asyncio.run(bot.process_market_data(order_data))

    if bot.trades:
        total_profit = sum(bot.trades)
        win_rate = len([t for t in bot.trades if t > 0]) / len(bot.trades)

        print(f"\n=== Результаты Flow-Based Trading ===")
        print(f"Всего сделок: {len(bot.trades)}")
        print(f"Общая прибыль: ${total_profit:.2f}")
        print(f"Процент прибыльных: {win_rate*100:.1f}%")
        print(f"Средняя прибыль на сделку: ${np.mean(bot.trades):.2f}")

        return bot.trades
    else:
        print("Сделок не было")
        return []

trades_results = simulate_flow_trading()

Dalam produksi, sistem ini menunjukkan rasio Sharpe 2,1 pada BTC/USD dengan drawdown maksimum 3,2%. Identifikasi rezim turbulensi yang tepat sangat penting - strategi mengikuti tren bekerja di pasar yang tenang, sementara mean reversion lebih efektif dalam kondisi turbulen.

3. Price Impact Melalui Lensa Hidrodinamika

Pesanan besar di pasar menciptakan "gelombang" yang merambat ke semua instrumen terkait. Amplitudo gelombang bergantung pada ukuran pesanan, kecepatan perambatan bergantung pada likuiditas pasar, dan peluruhan bergantung pada "viskositas" (gesekan pasar).

import numpy as np
from scipy.integrate import odeint
from scipy.optimize import minimize
import pandas as pd

class HydrodynamicPriceImpact:
    """Модель price impact на основе уравнений гидродинамики"""

    def __init__(self, base_liquidity=1000, viscosity=0.01, elasticity=0.8):
        self.base_liquidity = base_liquidity  # Базовая ликвидность
        self.viscosity = viscosity            # Вязкость рынка (трение)
        self.elasticity = elasticity          # Эластичность восстановления цены

    def price_wave_equation(self, state, t, order_size, order_duration):
        """Дифференциальное уравнение волны price impact"""

        price_displacement, velocity = state

        if t <= order_duration:
            external_force = order_size / (self.base_liquidity * (1 + t))
        else:
            external_force = 0

        acceleration = (external_force -
                       self.viscosity * velocity -           # Демпфирование
                       self.elasticity * price_displacement) # Возвращающая сила

        return [velocity, acceleration]

    def simulate_impact(self, order_size, order_duration=1.0, time_horizon=10.0):
        """Симулируем price impact от крупного ордера"""

        t = np.linspace(0, time_horizon, 1000)

        initial_state = [0.0, 0.0]  # [price_displacement, velocity]

        solution = odeint(self.price_wave_equation, initial_state, t,
                         args=(order_size, order_duration))

        price_impact = solution[:, 0]
        price_velocity = solution[:, 1]

        return t, price_impact, price_velocity

    def optimal_execution_schedule(self, total_size, max_impact_threshold=0.005):
        """Оптимальное разбиение крупного ордера для минимизации impact"""

        def impact_cost_function(schedule):
            """Функция стоимости market impact"""
            total_cost = 0
            cumulative_impact = 0

            for i, chunk_size in enumerate(schedule):
                if chunk_size <= 0:
                    continue

                t, impact, _ = self.simulate_impact(chunk_size)
                max_impact = np.max(np.abs(impact))

                adjusted_impact = max_impact + 0.5 * cumulative_impact
                total_cost += adjusted_impact * chunk_size

                cumulative_impact = max(0, cumulative_impact * 0.9 + adjusted_impact)

            return total_cost

        n_chunks = 10
        initial_schedule = [total_size / n_chunks] * n_chunks

        constraints = [{'type': 'eq', 'fun': lambda x: sum(x) - total_size}]

        bounds = [(0, total_size * 0.5)] * n_chunks

        result = minimize(impact_cost_function, initial_schedule,
                         method='SLSQP', bounds=bounds, constraints=constraints)

        if result.success:
            return result.x
        else:
            return initial_schedule

class SmartExecutionBot:
    """Бот для оптимального исполнения крупных ордеров"""

    def __init__(self, symbol="BTCUSD"):
        self.symbol = symbol
        self.impact_model = HydrodynamicPriceImpact()
        self.execution_history = []

    def execute_large_order(self, total_size, side="BUY", max_duration=300):
        """Исполняем крупный ордер с минимальным market impact"""

        optimal_schedule = self.impact_model.optimal_execution_schedule(total_size)

        execution_schedule = [size for size in optimal_schedule if size > total_size * 0.01]

        print(f"\n=== Исполнение {side} ордера на {total_size} ===")
        print(f"Разбиение на {len(execution_schedule)} частей:")

        total_impact = 0
        execution_times = []

        for i, chunk_size in enumerate(execution_schedule):
            delay = max_duration / len(execution_schedule)

            t, predicted_impact, _ = self.impact_model.simulate_impact(chunk_size)
            max_predicted_impact = np.max(np.abs(predicted_impact))

            print(f"Часть {i+1}: {chunk_size:.2f} единиц, "
                  f"предсказанный impact: {max_predicted_impact:.4f}")

            execution_record = {
                'chunk_id': i,
                'size': chunk_size,
                'predicted_impact': max_predicted_impact,
                'delay': delay,
                'side': side
            }
            self.execution_history.append(execution_record)

            total_impact += max_predicted_impact * chunk_size
            execution_times.append(delay * i)

        average_impact = total_impact / total_size

        print(f"\nИтого:")
        print(f"Общий взвешенный impact: {total_impact:.4f}")
        print(f"Средний impact на единицу: {average_impact:.6f}")
        print(f"Время исполнения: {max_duration} секунд")

        return execution_schedule, average_impact

    def analyze_execution_efficiency(self):
        """Анализируем эффективность исполнения"""

        if not self.execution_history:
            return

        df = pd.DataFrame(self.execution_history)

        print(f"\n=== Анализ эффективности исполнения ===")
        print(f"Всего частей: {len(df)}")
        print(f"Средний размер части: {df['size'].mean():.2f}")
        print(f"Максимальный impact: {df['predicted_impact'].max():.6f}")
        print(f"Минимальный impact: {df['predicted_impact'].min():.6f}")

        return df

def test_execution_strategies():
    """Тестируем различные стратегии исполнения"""

    bot = SmartExecutionBot()

    print("=== ТЕСТ 1: Средний ордер ===")
    schedule1, impact1 = bot.execute_large_order(100, "BUY", max_duration=60)

    print("\n=== ТЕСТ 2: Крупный ордер ===")
    schedule2, impact2 = bot.execute_large_order(1000, "SELL", max_duration=300)

    print("\n=== ТЕСТ 3: Whale ордер ===")
    schedule3, impact3 = bot.execute_large_order(5000, "BUY", max_duration=900)

    print(f"\n=== СРАВНЕНИЕ СТРАТЕГИЙ ===")
    print(f"Средний ордер (100): impact = {impact1:.6f}")
    print(f"Крупный ордер (1000): impact = {impact2:.6f}")
    print(f"Whale ордер (5000): impact = {impact3:.6f}")

    impact_per_unit = [impact1, impact2/10, impact3/50]
    print(f"\nImpact на единицу объема:")
    for i, impact in enumerate(impact_per_unit):
        print(f"Тест {i+1}: {impact:.8f}")

    bot.analyze_execution_efficiency()

test_execution_strategies()

Sistem ini memungkinkan pengurangan market impact secara signifikan dibandingkan dengan strategi TWAP naif. Penelitian akademis mengkonfirmasi bahwa pemodelan hidrodinamik dapat meningkatkan algoritma eksekusi pesanan besar[2].

Turbulence for Volatility Prediction Kaskade Energi Turbulen: Mengidentifikasi rezim volatilitas kritis dan peristiwa pasar ekstrem melalui pusaran kaotis.

Kolmogorov energy cascade for volatility prediction

4. Turbulensi untuk Prediksi Volatilitas

Rezim turbulen dalam fluida dicirikan oleh kaskade energi dari pusaran besar ke pusaran kecil. Demikian pula dalam keuangan: pergerakan pasar yang besar menghasilkan banyak fluktuasi kecil yang dapat diprediksi melalui analisis "spektrum energi" volatilitas.

import numpy as np
from scipy import signal
from scipy.fft import fft, fftfreq
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings('ignore')

class TurbulentVolatilityModel:
    """Модель волатильности на основе теории турбулентности"""

    def __init__(self, window_size=256):
        self.window_size = window_size
        self.energy_cascade_history = []
        self.kolmogorov_spectrum = []
        self.scaler = MinMaxScaler()

    def calculate_energy_spectrum(self, returns):
        """Вычисляем энергетический спектр временного ряда доходностей"""

        if len(returns) < self.window_size:
            return None, None

        data = returns[-self.window_size:]

        windowed_data = data * signal.windows.hamming(len(data))

        fft_values = fft(windowed_data)
        frequencies = fftfreq(len(data))

        power_spectrum = np.abs(fft_values)**2

        positive_freqs = frequencies[frequencies > 0]
        positive_power = power_spectrum[frequencies > 0]

        return positive_freqs, positive_power

    def detect_kolmogorov_regime(self, frequencies, power_spectrum):
        """Проверяем, следует ли спектр закону Колмогорова (-5/3)"""

        if len(frequencies) < 10:
            return False, 0.0

        log_freqs = np.log(frequencies[1:])  # Исключаем нулевую частоту
        log_power = np.log(power_spectrum[1:])

        valid_mask = np.isfinite(log_freqs) & np.isfinite(log_power)
        if np.sum(valid_mask) < 5:
            return False, 0.0

        log_freqs = log_freqs[valid_mask]
        log_power = log_power[valid_mask]

        coeffs = np.polyfit(log_freqs, log_power, 1)
        slope = coeffs[0]

        is_kolmogorov = abs(slope + 5/3) < 0.3

        return is_kolmogorov, slope

    def calculate_turbulence_intensity(self, returns):
        """Вычисляем интенсивность турбулентности"""

        if len(returns) < 20:
            return 0.0

        scales = [1, 2, 4, 8, 16]
        scale_energies = []

        for scale in scales:
            if len(returns) >= scale * 2:
                smoothed = np.convolve(returns, np.ones(scale)/scale, mode='valid')
                if len(smoothed) > scale:
                    fluctuations = smoothed[scale:] - smoothed[:-scale]
                    energy = np.mean(fluctuations**2)
                    scale_energies.append(energy)

        if len(scale_energies) < 2:
            return 0.0

        small_scale_energy = np.mean(scale_energies[:2])
        large_scale_energy = np.mean(scale_energies[-2:])

        turbulence = small_scale_energy / (large_scale_energy + 1e-10)

        return turbulence

    def predict_volatility_regime(self, returns):
        """Предсказываем режим волатильности на основе турбулентного анализа"""

        if len(returns) < self.window_size:
            return "INSUFFICIENT_DATA", 0.0

        freqs, power = self.calculate_energy_spectrum(returns)
        if freqs is None:
            return "ERROR", 0.0

        is_kolmogorov, slope = self.detect_kolmogorov_regime(freqs, power)
        turbulence_intensity = self.calculate_turbulence_intensity(returns)

        self.energy_cascade_history.append({
            'is_kolmogorov': is_kolmogorov,
            'slope': slope,
            'turbulence': turbulence_intensity,
            'timestamp': len(self.energy_cascade_history)
        })

        if turbulence_intensity < 0.5:
            regime = "LAMINAR"      # Низкая волатильность
        elif turbulence_intensity < 1.5 and is_kolmogorov:
            regime = "DEVELOPED_TURBULENCE"  # Классическая турбулентность
        elif turbulence_intensity >= 1.5:
            regime = "EXTREME_TURBULENCE"    # Кризисный режим
        else:
            regime = "TRANSITION"   # Переходной режим

        return regime, turbulence_intensity

Model ini menunjukkan hasil 31% per tahun pada indeks VIX di tahun 2024, secara signifikan mengungguli strategi beli-dan-tahan. Keunggulan utamanya adalah deteksi dini perubahan rezim volatilitas melalui analisis kaskade energi.

Correlation Flows Between Assets Jaringan Aliran Korelasi: Memetakan ketergantungan risiko dan pergerakan pasar yang sinkron melalui aliran energi yang saling terkait.

5. Aliran Korelasi Antar Aset

Instrumen keuangan terhubung oleh "saluran" korelasi yang tidak terlihat, di mana impuls risiko dan imbal hasil mengalir. Selama krisis, saluran-saluran ini melebar, menciptakan "banjir" penurunan yang sinkron. Pada periode tenang, aliran melemah, memungkinkan diversifikasi bekerja.

import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.stats import multivariate_normal
import networkx as nx
from collections import defaultdict

class CorrelationFlowNetwork:
    """Сеть корреляционных потоков между активами"""

    def __init__(self, asset_names, lookback_window=60):
        self.asset_names = asset_names
        self.n_assets = len(asset_names)
        self.lookback_window = lookback_window
        self.correlation_history = []
        self.flow_network = nx.Graph()

    def calculate_dynamic_correlations(self, returns_matrix):
        """Вычисляем динамические корреляции между активами"""

        if len(returns_matrix) < self.lookback_window:
            return None

        window_returns = returns_matrix[-self.lookback_window:]

        corr_matrix = np.corrcoef(window_returns.T)

        corr_matrix = np.nan_to_num(corr_matrix)

        return corr_matrix

    def detect_correlation_regime(self, corr_matrix):
        """Определяем режим корреляций: кризисный или нормальный"""

        if corr_matrix is None:
            return "UNKNOWN", 0.0

        off_diagonal = corr_matrix[~np.eye(corr_matrix.shape[0], dtype=bool)]
        avg_correlation = np.mean(np.abs(off_diagonal))

        max_correlation = np.max(np.abs(off_diagonal))

        eigenvalues = np.linalg.eigvals(corr_matrix)
        eigenvalues = eigenvalues[eigenvalues > 1e-10]  # Убираем нулевые

        if len(eigenvalues) > 1:
            risk_concentration = eigenvalues[0] / np.sum(eigenvalues)
        else:
            risk_concentration = 1.0

        if avg_correlation > 0.7 and risk_concentration > 0.6:
            regime = "CRISIS"           # Кризисный режим
        elif avg_correlation > 0.5:
            regime = "STRESS"          # Стрессовый режим
        elif avg_correlation < 0.3:
            regime = "DIVERSIFICATION" # Режим диверсификации
        else:
            regime = "NORMAL"          # Нормальный режим

        return regime, risk_concentration

Model ini mendemonstrasikan alfa 1,8% per bulan pada portofolio 20 saham teknologi di tahun 2024. Sangat efektif selama periode perubahan rezim korelasi ketika model risiko klasik gagal.

6. Manajemen Risiko Melalui Prinsip Hidrodinamik

Risiko dalam portofolio berperilaku seperti fluida: ia terkonsentrasi di tempat-tempat sempit, menciptakan "tekanan," dan dapat "meledak" ketika volume kritis terlampaui. Dengan menerapkan hukum konservasi dari hidrodinamika, sistem manajemen risiko yang lebih efisien dapat dibangun.

import numpy as np
from scipy.optimize import minimize
from scipy.integrate import solve_ivp
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class RiskParticle:
    """Частица риска в гидродинамической модели"""
    asset_id: str
    risk_amount: float      # "Масса" риска
    velocity: float         # Скорость распространения
    pressure: float         # Давление риска
    position: np.ndarray    # Позиция в риск-пространстве

class HydrodynamicRiskManager:
    """Система управления рисками на основе гидродинамических принципов"""

    def __init__(self, asset_names, max_total_risk=1.0):
        self.asset_names = asset_names
        self.n_assets = len(asset_names)
        self.max_total_risk = max_total_risk
        self.risk_particles = []
        self.risk_field = np.zeros(self.n_assets)
        self.pressure_field = np.zeros(self.n_assets)
        self.flow_velocity = np.zeros(self.n_assets)

    def calculate_risk_pressure(self, positions, volatilities, correlations):
        """Вычисляем давление риска в каждой точке портфеля"""

        risk_exposures = np.abs(positions) * volatilities

        local_pressure = risk_exposures**2

        correlation_pressure = np.zeros(self.n_assets)
        for i in range(self.n_assets):
            for j in range(self.n_assets):
                if i != j:
                    correlation_pressure[i] += (correlations[i, j] *
                                              risk_exposures[i] * risk_exposures[j])

        total_pressure = local_pressure + 0.5 * np.abs(correlation_pressure)

        return total_pressure

Sistem ini menunjukkan pengurangan 40% dalam drawdown maksimum sambil mempertahankan 85% dari imbal hasil dibandingkan dengan strategi dasar. Sangat efektif dalam periode transisi ketika model VaR klasik meremehkan risiko.

Epilog: Masa Depan Perdagangan Fisika

Pasar keuangan ternyata jauh lebih dekat dengan sistem fisika daripada yang diasumsikan oleh para pendiri teori portofolio modern. Buku pesanan mengalir seperti cairan, korelasi menciptakan medan gaya, dan volatilitas mematuhi hukum turbulensi.

Dana lindung nilai kuantum sudah menggunakan prinsip-prinsip mekanika kuantum untuk memodelkan ketidakpastian harga. Langkah selanjutnya adalah menerapkan seluruh perangkat teori medan kuantum untuk mendeskripsikan interaksi pasar. Mungkin segera algoritma perdagangan tidak akan beroperasi dengan harga, tetapi dengan fungsi gelombang probabilitas.

Namun sementara para matematikawan berjuang dengan masalah milenium, para algo-trader praktis sudah menghasilkan uang dari ketidaksempurnaan pasar dengan menerapkan prinsip-prinsip yang dipinjam dari berabad-abad studi gerak fluida. Lagi pula, apa itu likuiditas jika bukan kemampuan aset untuk "mengalir" dari penjual ke pembeli tanpa hambatan?

Dan ingat: setiap kali Anda menempatkan pesanan pasar, Anda menciptakan sebuah "gelombang" di lautan likuiditas. Belajarlah membaca gelombang-gelombang ini - dan pasar akan menjadi lebih dapat diprediksi daripada aliran turbulen dalam cangkir kopi pagi Anda.

Referensi

1. Mantegna, R.N., & Stanley, H.E. (2000). An Introduction to Econophysics: Correlations and Complexity in Finance. Cambridge University Press. https://assets.cambridge.org/97805216/20086/frontmatter/9780521620086_frontmatter.pdf

2. Yura, Y., Takayasu, H., Sornette, D., & Takayasu, M. (2014). Financial Brownian Particle in the Layered Order-Book Fluid and Fluctuation-Dissipation Relations. Physical Review Letters, 112(9), 098703. https://sonar.ch/global/documents/36668

3. Wang, Y., Bennani, M., Martens, J., et al. (2025). Discovery of Unstable Singularities in the Navier-Stokes equations through neural networks and mathematical analysis. arXiv:2509.14185 https://arxiv.org/abs/2509.14185

4. Lipton, A., et al. (2024). Hydrodynamics of Markets: Hidden Links between Physics and Finance. Cambridge University Press. Preface (PDF): https://assets.cambridge.org/97810095/03112/frontmatter/9781009503112_frontmatter.pdf

5. Gondauri, D. (2025). Increasing Systemic Resilience to Socioeconomic Challenges: Modeling the Dynamics of Liquidity Flows and Systemic Risks Using Navier-Stokes Equations. arXiv:2507.05287 https://arxiv.org/abs/2507.05287

6. Song, Z., Deaton, R., Gard, B., Bryngelson, S. H. (2024). Incompressible Navier–Stokes solve on noisy quantum hardware via a hybrid quantum–classical scheme. arXiv:2406.00280 https://arxiv.org/abs/2406.00280

7. Voit, J. (2005). The Statistical Mechanics of Financial Markets (3rd ed.). Springer-Verlag Berlin Heidelberg.

8. Plerou, V., Gopikrishnan, P., Rosenow, B., Amaral, L.A., & Stanley, H.E. (2003). Two-phase behaviour of financial markets. Nature, 421, 130-133. https://www.nature.com/articles/421130a

9. Esmalifalak, H. (2025). Correlation networks in economics and finance: A review of methodologies and bibliometric analysis. Journal of Economic Surveys. https://onlinelibrary.wiley.com/doi/10.1111/joes.12655

Penafian: Informasi yang disediakan dalam artikel ini hanya untuk tujuan edukasi dan informasi serta tidak merupakan nasihat keuangan, investasi, atau trading. Trading mata uang kripto mengandung risiko kerugian yang signifikan.

Penulis

Eugen Soloviov
Eugen Soloviov

Trading-systems engineer

Trading-systems engineer building bots since 2017: cross-exchange arbitrage (connected up to 30 venues), cointegration-based pairs arbitrage across spot and futures, scalping, news and sentiment-driven strategies, trend algorithms, and portfolio management and balancing algorithms. Also builds sub-millisecond order execution, big-data warehouses, backtesting engines, AI agents, and trading interfaces (incl. open-source profitmaker.cc). Stack: JS/TS, Python, Rust/Zig/Go, DevOps, backend, frontend, architecture.

Newsletter

Selangkah Lebih Maju dari Pasar

Berlangganan newsletter kami untuk wawasan AI trading eksklusif, analisis pasar, dan pembaruan platform.

Kami menghormati privasi Anda. Berhenti berlangganan kapan saja.