from __future__ import annotations import argparse import json from dataclasses import dataclass from math import sqrt from pathlib import Path import pandas as pd from okx_codex_trader.bbmr_report import BBMRConfig, run_bbmr_segment from okx_codex_trader.bbsb_report import BBSBConfig, run_bbsb_segment from okx_codex_trader.donchian_report import DonchianConfig, run_donchian_segment from okx_codex_trader.ema_pullback_report import EMAPullbackConfig, run_ema_pullback_segment from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient from okx_codex_trader.rsi2_report import RSI2Config, _compute_rsi, run_rsi2_segment from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, sample_segments, trade_equity SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") BARS = ("1m", "3m", "5m") ANALYSIS_BARS = ("3m", "5m", "15m") HISTORY_LIMIT = 4000 SEGMENTS = 8 WINDOW_SIZE = 240 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 ROBUST_HISTORY_LIMIT = 50_000 GROSS_RETURN_NOTE = "Gross-return backtest only: fees, slippage, and funding rates are excluded." MINUTES_PER_YEAR = 365 * 24 * 60 CANDLE_CACHE_DIR = Path("data/okx-candles") CANDLE_BAR_MS = { "1m": 60_000, "3m": 180_000, "5m": 300_000, "15m": 900_000, "30m": 1_800_000, "1H": 3_600_000, "4H": 14_400_000, "1D": 86_400_000, } @dataclass(frozen=True) class Candidate: name: str warmup_bars: int run: object @dataclass(frozen=True) class PairCandidate: name: str warmup_bars: int run: object def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def candle_cache_file(cache_dir: Path, symbol: str, bar: str) -> Path: return cache_dir / symbol / f"{bar}.csv" def candle_cache_meta_file(cache_dir: Path, symbol: str, bar: str) -> Path: return cache_dir / symbol / f"{bar}.meta.json" def load_cached_candles(cache_dir: Path, symbol: str, bar: str) -> tuple[list[Candle], bool]: cache_file = candle_cache_file(cache_dir, symbol, bar) if not cache_file.exists(): return [], False frame = pd.read_csv(cache_file) candles = [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] meta_file = candle_cache_meta_file(cache_dir, symbol, bar) history_exhausted = False if meta_file.exists(): with meta_file.open("r", encoding="utf-8") as handle: history_exhausted = bool(json.load(handle).get("history_exhausted")) return candles, history_exhausted def save_cached_candles(cache_dir: Path, symbol: str, bar: str, candles: list[Candle], history_exhausted: bool) -> None: cache_file = candle_cache_file(cache_dir, symbol, bar) cache_file.parent.mkdir(parents=True, exist_ok=True) frame = pd.DataFrame( [ { "ts": candle.ts, "open": candle.open, "high": candle.high, "low": candle.low, "close": candle.close, "volume": candle.volume, } for candle in sorted(candles, key=lambda candle: candle.ts) ] ).drop_duplicates("ts", keep="last") frame.to_csv(cache_file, index=False) meta = { "symbol": symbol, "bar": bar, "history_exhausted": history_exhausted, "rows": len(frame), "first_ts": int(frame["ts"].iloc[0]) if len(frame) else None, "last_ts": int(frame["ts"].iloc[-1]) if len(frame) else None, } with candle_cache_meta_file(cache_dir, symbol, bar).open("w", encoding="utf-8") as handle: json.dump(meta, handle, separators=(",", ":")) def latest_bridge_count(cached: list[Candle], latest_last_ts: int, interval: int) -> int: bridge_from_ts = max(candle.ts for candle in cached) for left, right in zip(cached, cached[1:]): if right.ts - left.ts != interval: bridge_from_ts = left.ts return ((latest_last_ts - bridge_from_ts) // interval) + 1 def get_candles_cached( client: OkxClient, symbol: str, bar: str, limit: int, cache_dir: Path = CANDLE_CACHE_DIR, ) -> list[Candle]: cached, history_exhausted = load_cached_candles(cache_dir, symbol, bar) if cached and (len(cached) >= limit or history_exhausted): latest = client.get_candles(symbol, bar, min(300, limit)) interval = CANDLE_BAR_MS[bar] if latest: latest_last_ts = max(candle.ts for candle in latest) needed_latest_count = latest_bridge_count(cached, latest_last_ts, interval) if needed_latest_count > len(latest): latest = client.get_candles(symbol, bar, needed_latest_count) merged = {candle.ts: candle for candle in cached} for candle in latest: merged[candle.ts] = candle candles = sorted(merged.values(), key=lambda candle: candle.ts) save_cached_candles(cache_dir, symbol, bar, candles, history_exhausted) return candles[-limit:] if len(candles) >= limit else candles fetched = client.get_candles(symbol, bar, limit) history_exhausted = len(fetched) < limit save_cached_candles(cache_dir, symbol, bar, fetched, history_exhausted) return fetched def align_pair_candles(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_aligned: list[Candle] = [] right_aligned: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is None: continue left_aligned.append(candle) right_aligned.append(other) return left_aligned, right_aligned def _trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, leverage: int, ) -> tuple[float, bool]: exit_equity = trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(exit_equity - float(position["margin_used"]), 4), "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, exit_equity > float(position["margin_used"]) def _close_partial_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: Candle, exit_price: float, leverage: int, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return account_equity + pnl, pnl > 0.0 def run_range_momentum_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, lookback: int, take_profit_pct: float, stop_loss_pct: float, ) -> SegmentResult: highs = pd.Series([candle.high for candle in candles], dtype=float) lows = pd.Series([candle.low for candle in candles], dtype=float) entry_high = highs.shift(1).rolling(lookback).max().tolist() entry_low = lows.shift(1).rolling(lookback).min().tolist() equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1 - stop_loss_pct if pending_entry_side == "long" else 1 + stop_loss_pct), "take_profit_price": candle.open * (1 + take_profit_pct if pending_entry_side == "long" else 1 - take_profit_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: stop_hit = ( position["side"] == "long" and candle.low <= float(position["stop_price"]) ) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) take_hit = ( position["side"] == "long" and candle.high >= float(position["take_profit_price"]) ) or ( position["side"] == "short" and candle.low <= float(position["take_profit_price"]) ) if stop_hit or take_hit: exit_price = float(position["stop_price"] if stop_hit else position["take_profit_price"]) equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or position is not None or equity <= 0.0: continue if entry_high[index] == entry_high[index] and candle.close > float(entry_high[index]): pending_entry_side = "long" elif entry_low[index] == entry_low[index] and candle.close < float(entry_low[index]): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_vwap_reversion_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, window: int, entry_z: float, exit_z: float, stop_loss_pct: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) volumes = pd.Series([candle.volume for candle in candles], dtype=float) vwap = (closes * volumes).rolling(window).sum() / volumes.rolling(window).sum() deviation = ((closes - vwap) / vwap).tolist() stdev = pd.Series(deviation, dtype=float).rolling(window).std(ddof=0).tolist() zscore = [ float("nan") if dev != dev or std != std or std == 0.0 else dev / std for dev, std in zip(deviation, stdev) ] equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, "stop_price": candle.open * (1 - stop_loss_pct if pending_entry_side == "long" else 1 + stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: stop_hit = ( position["side"] == "long" and candle.low <= float(position["stop_price"]) ) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_z = zscore[index] if current_z != current_z: continue if position is not None: if (position["side"] == "long" and current_z >= -exit_z) or ( position["side"] == "short" and current_z <= exit_z ): pending_exit = True continue if current_z <= -entry_z: pending_entry_side = "long" elif current_z >= entry_z: pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_rsi2_side_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, config: RSI2Config, side_mode: str, ) -> SegmentResult: result = run_rsi2_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config, ) if side_mode == "both": return result closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(config.trend_sma).mean().tolist() rsi_values = _compute_rsi(closes, config.rsi_length) equity = config.initial_equity ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False allowed_side = "long" if side_mode == "long" else "short" for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: if (position["side"] == "long" and current_rsi >= config.exit_rsi) or ( position["side"] == "short" and current_rsi <= config.exit_rsi ): pending_exit = True continue if allowed_side == "long" and candle.close > float(current_trend) and current_rsi <= config.rsi_long_threshold: pending_entry_side = "long" elif allowed_side == "short" and candle.close < float(current_trend) and current_rsi >= config.rsi_short_threshold: pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - config.initial_equity) / config.initial_equity, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_rsi2_long_guarded_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, trend_sma: int, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(trend_sma).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1 - stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= exit_rsi or held_bars >= max_hold_bars: pending_exit = True continue if candle.close > float(current_trend) and current_rsi <= rsi_threshold: pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_rsi2_long_guarded_twap_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, trend_sma: int, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, entry_slices: int, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(trend_sma).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_slices = 0 pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _close_partial_trade( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False pending_entry_slices = 0 if pending_entry_slices and equity > 0.0: slice_margin = equity / entry_slices if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": slice_margin, "stop_price": candle.open * (1 - stop_loss_pct), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + candle.open * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1 - stop_loss_pct) entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry_slices -= 1 current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = _close_partial_trade( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None pending_entry_slices = 0 if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= exit_rsi or held_bars >= max_hold_bars: pending_exit = True pending_entry_slices = 0 continue if pending_entry_slices == 0 and candle.close > float(current_trend) and current_rsi <= rsi_threshold: pending_entry_slices = entry_slices trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_rsi2_long_guarded_price_twap_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, trend_sma: int, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, entry_offsets: tuple[float, ...], entry_valid_bars: int, fill_buffer: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(trend_sma).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - fill_buffer) and equity > 0.0: slice_margin = equity / len(entry_offsets) if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": limit_price, "entry_index": index, "margin_used": slice_margin, "stop_price": limit_price * (1 - stop_loss_pct), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1 - stop_loss_pct) entries.append({"ts": candle.ts, "price": limit_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = _close_partial_trade( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None pending_limits = [] if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= exit_rsi or held_bars >= max_hold_bars: pending_exit = True pending_limits = [] continue if not pending_limits and candle.close > float(current_trend) and current_rsi <= rsi_threshold: pending_limits = [ { "price": candle.close * (1.0 - offset), "expires_index": index + entry_valid_bars, } for offset in entry_offsets ] trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_ma_cross_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, fast: int, slow: int, side_mode: str, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) fast_ma = closes.rolling(fast).mean().tolist() slow_ma = closes.rolling(slow).mean().tolist() equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_fast = fast_ma[index] current_slow = slow_ma[index] previous_fast = fast_ma[index - 1] previous_slow = slow_ma[index - 1] if current_fast != current_fast or current_slow != current_slow or previous_fast != previous_fast or previous_slow != previous_slow: continue crossed_up = previous_fast <= previous_slow and current_fast > current_slow crossed_down = previous_fast >= previous_slow and current_fast < current_slow if position is not None: if (position["side"] == "long" and crossed_down) or (position["side"] == "short" and crossed_up): pending_exit = True continue if crossed_up and side_mode in {"both", "long"}: pending_entry_side = "long" elif crossed_down and side_mode in {"both", "short"}: pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_trend_rsi_bb_long_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, trend_sma: int, band_length: int, std_multiplier: float, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(trend_sma).mean().tolist() middle = closes.rolling(band_length).mean().tolist() stdev = closes.rolling(band_length).std(ddof=0).tolist() lower = [ float("nan") if middle_value != middle_value or std_value != std_value else middle_value - std_multiplier * std_value for middle_value, std_value in zip(middle, stdev) ] rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1 - stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] current_middle = middle[index] current_lower = lower[index] if current_rsi != current_rsi or current_trend != current_trend or current_middle != current_middle or current_lower != current_lower: continue if position is not None: if current_rsi >= exit_rsi or candle.close >= float(current_middle): pending_exit = True continue if candle.close > float(current_trend) and candle.close <= float(current_lower) and current_rsi <= rsi_threshold: pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_regime_hybrid_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, trend_sma: int, regime_lookback: int, neutral_ma_distance: float, rsi_long_threshold: float, rsi_exit: float, bb_length: int, bb_std: float, bb_bandwidth_lookback: int, stop_loss_pct: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(trend_sma).mean().tolist() rsi_values = _compute_rsi(closes, 2) middle = closes.rolling(bb_length).mean().tolist() stdev = closes.rolling(bb_length).std(ddof=0).tolist() upper = [ float("nan") if middle_value != middle_value or std_value != std_value else middle_value + bb_std * std_value for middle_value, std_value in zip(middle, stdev) ] lower = [ float("nan") if middle_value != middle_value or std_value != std_value else middle_value - bb_std * std_value for middle_value, std_value in zip(middle, stdev) ] bandwidth = [ float("nan") if upper_value != upper_value or lower_value != lower_value or middle_value != middle_value or middle_value == 0.0 else (upper_value - lower_value) / middle_value for upper_value, lower_value, middle_value in zip(upper, lower, middle) ] equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry: dict[str, object] | None = None pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry is not None and position is None and equity > 0.0: side = str(pending_entry["side"]) position = { "side": side, "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1 - stop_loss_pct if side == "long" else 1 + stop_loss_pct), "mode": str(pending_entry["mode"]), } entries.append({"ts": candle.ts, "price": candle.open, "side": side}) pending_entry = None current_equity = equity if position is not None: stop_hit = ( position["side"] == "long" and candle.low <= float(position["stop_price"]) ) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_trend = trend[index] current_rsi = rsi_values[index] current_middle = middle[index] current_upper = upper[index] current_lower = lower[index] if ( current_trend != current_trend or current_rsi != current_rsi or current_middle != current_middle or current_upper != current_upper or current_lower != current_lower ): continue if position is not None: if position["mode"] == "rsi" and (current_rsi >= rsi_exit or candle.close < float(current_trend)): pending_exit = True elif position["mode"] == "bbmr" and ( (position["side"] == "long" and candle.close >= float(current_middle)) or (position["side"] == "short" and candle.close <= float(current_middle)) ): pending_exit = True continue regime_return = candle.close / candles[index - regime_lookback].close - 1.0 ma_distance = candle.close / float(current_trend) - 1.0 if candle.close > float(current_trend) and regime_return > 0.0 and current_rsi <= rsi_long_threshold: pending_entry = {"side": "long", "mode": "rsi"} continue previous_bandwidths = [value for value in bandwidth[max(0, index - bb_bandwidth_lookback) : index] if value == value] if abs(ma_distance) > neutral_ma_distance or len(previous_bandwidths) < bb_bandwidth_lookback: continue if bandwidth[index] == bandwidth[index] and bandwidth[index] <= pd.Series(previous_bandwidths, dtype=float).median(): if candle.close < float(current_lower): pending_entry = {"side": "long", "mode": "bbmr"} elif candle.close > float(current_upper): pending_entry = {"side": "short", "mode": "bbmr"} trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_eth_btc_rsi_filter_segment( *, eth_candles: list[Candle], btc_candles: list[Candle], leverage: int, warmup_bars: int, eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, ) -> SegmentResult: eth_closes = pd.Series([candle.close for candle in eth_candles], dtype=float) btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) eth_trend = eth_closes.rolling(eth_trend_sma).mean().tolist() eth_rsi = _compute_rsi(eth_closes, 2) btc_trend = btc_closes.rolling(btc_trend_sma).mean().tolist() equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue current_eth_trend = eth_trend[index] current_eth_rsi = eth_rsi[index] current_btc_trend = btc_trend[index] if current_eth_trend != current_eth_trend or current_eth_rsi != current_eth_rsi or current_btc_trend != current_btc_trend: continue if position is not None: if current_eth_rsi >= eth_exit_rsi or btc_candles[index].close < float(current_btc_trend): pending_exit = True continue btc_momentum = btc_candles[index].close / btc_candles[index - btc_momentum_lookback].close - 1.0 btc_risk_on = btc_candles[index].close > float(current_btc_trend) and btc_momentum >= btc_min_momentum eth_pullback = candle.close > float(current_eth_trend) and current_eth_rsi <= eth_rsi_threshold if btc_risk_on and eth_pullback: pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_eth_btc_shock_filter_segment( *, eth_candles: list[Candle], btc_candles: list[Candle], leverage: int, warmup_bars: int, eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, btc_shock_lookback: int, btc_max_realized_vol: float, btc_max_drawdown: float, ) -> SegmentResult: eth_closes = pd.Series([candle.close for candle in eth_candles], dtype=float) btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) eth_trend = eth_closes.rolling(eth_trend_sma).mean().tolist() eth_rsi = _compute_rsi(eth_closes, 2) btc_trend = btc_closes.rolling(btc_trend_sma).mean().tolist() btc_realized_vol = btc_closes.pct_change().rolling(btc_shock_lookback).std(ddof=1).tolist() btc_recent_high = btc_closes.rolling(btc_shock_lookback).max().tolist() equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue current_eth_trend = eth_trend[index] current_eth_rsi = eth_rsi[index] current_btc_trend = btc_trend[index] current_btc_vol = btc_realized_vol[index] current_btc_high = btc_recent_high[index] if ( current_eth_trend != current_eth_trend or current_eth_rsi != current_eth_rsi or current_btc_trend != current_btc_trend or current_btc_vol != current_btc_vol or current_btc_high != current_btc_high ): continue btc_drawdown = btc_candles[index].close / float(current_btc_high) - 1.0 btc_shock_ok = current_btc_vol <= btc_max_realized_vol and btc_drawdown >= -btc_max_drawdown if position is not None: if current_eth_rsi >= eth_exit_rsi or btc_candles[index].close < float(current_btc_trend) or not btc_shock_ok: pending_exit = True continue btc_momentum = btc_candles[index].close / btc_candles[index - btc_momentum_lookback].close - 1.0 btc_risk_on = ( btc_candles[index].close > float(current_btc_trend) and btc_momentum >= btc_min_momentum and btc_shock_ok ) eth_pullback = candle.close > float(current_eth_trend) and current_eth_rsi <= eth_rsi_threshold if btc_risk_on and eth_pullback: pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_eth_btc_ratio_pullback_segment( *, eth_candles: list[Candle], btc_candles: list[Candle], leverage: int, warmup_bars: int, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, ratio_length: int, ratio_std: float, ratio_rsi_threshold: float, stop_loss_pct: float, ) -> SegmentResult: eth_closes = pd.Series([candle.close for candle in eth_candles], dtype=float) btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) ratio = eth_closes / btc_closes btc_trend = btc_closes.rolling(btc_trend_sma).mean().tolist() ratio_middle = ratio.rolling(ratio_length).mean().tolist() ratio_stdev = ratio.rolling(ratio_length).std(ddof=0).tolist() ratio_lower = [ float("nan") if middle != middle or stdev != stdev else middle - ratio_std * stdev for middle, stdev in zip(ratio_middle, ratio_stdev) ] ratio_rsi = _compute_rsi(ratio, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1 - stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue current_btc_trend = btc_trend[index] current_ratio_middle = ratio_middle[index] current_ratio_lower = ratio_lower[index] current_ratio_rsi = ratio_rsi[index] if ( current_btc_trend != current_btc_trend or current_ratio_middle != current_ratio_middle or current_ratio_lower != current_ratio_lower or current_ratio_rsi != current_ratio_rsi ): continue btc_momentum = btc_candles[index].close / btc_candles[index - btc_momentum_lookback].close - 1.0 btc_risk_on = btc_candles[index].close > float(current_btc_trend) and btc_momentum >= btc_min_momentum if position is not None: if not btc_risk_on or ratio.iloc[index] >= float(current_ratio_middle): pending_exit = True continue ratio_pullback = ratio.iloc[index] <= float(current_ratio_lower) or current_ratio_rsi <= ratio_rsi_threshold if btc_risk_on and ratio_pullback: pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_btc_lead_eth_lag_segment( *, eth_candles: list[Candle], btc_candles: list[Candle], leverage: int, warmup_bars: int, lead_lookback: int, btc_return_threshold: float, lag_gap: float, max_hold_bars: int, stop_loss_pct: float, take_profit_pct: float, ) -> SegmentResult: equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1.0 - stop_loss_pct), "take_profit_price": candle.open * (1.0 + take_profit_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None: if candle.low <= float(position["stop_price"]): equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None elif candle.high >= float(position["take_profit_price"]): equity, won = _trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["take_profit_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue if position is not None: if index - int(position["entry_index"]) >= max_hold_bars: pending_exit = True continue btc_return = btc_candles[index].close / btc_candles[index - lead_lookback].close - 1.0 eth_return = candle.close / eth_candles[index - lead_lookback].close - 1.0 if btc_return >= btc_return_threshold and btc_return - eth_return >= lag_gap: pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def build_candidates() -> list[Candidate]: candidates: list[Candidate] = [] candidates.append(Candidate("bbmr-default", 69, lambda candles, leverage, warmup_bars: run_bbmr_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=BBMRConfig()))) candidates.append(Candidate("bbsb-default", 69, lambda candles, leverage, warmup_bars: run_bbsb_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=BBSBConfig()))) for entry_window in (8, 12, 20): for exit_window in (4, 6, 10): for stop in (0.004, 0.008, 0.012): config = DonchianConfig(entry_window=entry_window, exit_window=exit_window, stop_loss_pct=stop) candidates.append(Candidate(f"donchian-e{entry_window}-x{exit_window}-s{stop}", max(entry_window, exit_window), lambda candles, leverage, warmup_bars, config=config: run_donchian_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config))) for trend in (30, 50, 80): for long_threshold, short_threshold in ((8.0, 92.0), (12.0, 88.0), (18.0, 82.0)): config = RSI2Config(trend_sma=trend, rsi_length=2, rsi_long_threshold=long_threshold, rsi_short_threshold=short_threshold, exit_rsi=50.0) candidates.append(Candidate(f"rsi2-t{trend}-l{long_threshold}-s{short_threshold}", max(trend, 3), lambda candles, leverage, warmup_bars, config=config: run_rsi2_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config))) for fast, slow in ((8, 21), (13, 34), (20, 50)): for stop in (0.003, 0.006, 0.01): config = EMAPullbackConfig(fast_ema=fast, slow_ema=slow, stop_buffer_pct=stop) candidates.append(Candidate(f"ema-pullback-f{fast}-s{slow}-b{stop}", max(fast, slow), lambda candles, leverage, warmup_bars, config=config: run_ema_pullback_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config))) for lookback in (6, 10, 16): for take, stop in ((0.004, 0.003), (0.006, 0.004), (0.01, 0.005)): candidates.append(Candidate(f"range-momo-l{lookback}-tp{take}-sl{stop}", lookback, lambda candles, leverage, warmup_bars, lookback=lookback, take=take, stop=stop: run_range_momentum_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, lookback=lookback, take_profit_pct=take, stop_loss_pct=stop))) for window in (24, 48, 72): for entry_z in (1.5, 2.0, 2.5): candidates.append(Candidate(f"vwap-revert-w{window}-z{entry_z}", window * 2, lambda candles, leverage, warmup_bars, window=window, entry_z=entry_z: run_vwap_reversion_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, window=window, entry_z=entry_z, exit_z=0.2, stop_loss_pct=0.006))) return candidates def evaluate_candidate(candidate: Candidate, candles: list[Candle]) -> dict[str, object]: sampled = sample_segments( candles=candles, segments=SEGMENTS, window_size=WINDOW_SIZE, warmup_bars=candidate.warmup_bars, ) results = [ candidate.run( candles=candles[segment.context_start : segment.report_end], leverage=LEVERAGE, warmup_bars=candidate.warmup_bars, ) for segment in sampled ] returns = [result.total_return for result in results] return { "name": candidate.name, "avg_return": sum(returns) / len(returns), "median_return": float(pd.Series(returns).median()), "worst_return": min(returns), "best_return": max(returns), "trades": sum(result.trade_count for result in results), "win_rate": sum(result.win_rate for result in results) / len(results), "max_drawdown": max(result.max_drawdown for result in results), } def evaluate_candidate_all_windows( *, candidate: Candidate, candles: list[Candle], window_size: int, leverage: int, ) -> dict[str, object]: rows = evaluate_candidate_window_rows( candidate=candidate, candles=candles, window_size=window_size, leverage=leverage, ) return summarize_window_rows(rows, candidate.name) def evaluate_candidate_window_rows( *, candidate: Candidate, candles: list[Candle], window_size: int, leverage: int, ) -> list[dict[str, object]]: block_size = candidate.warmup_bars + window_size context_starts = list(range(0, len(candles) - block_size + 1, window_size)) rows: list[dict[str, object]] = [] for start in context_starts: result = candidate.run( candles=candles[start : start + block_size], leverage=leverage, warmup_bars=candidate.warmup_bars, ) report_start = start + candidate.warmup_bars report_end = start + block_size - 1 rows.append( { "window_start_ts": candles[report_start].ts, "window_end_ts": candles[report_end].ts, "total_return": result.total_return, "trade_count": result.trade_count, "win_rate": result.win_rate, "max_drawdown": result.max_drawdown, "trades": result.trades, } ) return rows def evaluate_pair_candidate_window_rows( *, candidate: PairCandidate, eth_candles: list[Candle], btc_candles: list[Candle], window_size: int, leverage: int, ) -> list[dict[str, object]]: block_size = candidate.warmup_bars + window_size context_starts = list(range(0, len(eth_candles) - block_size + 1, window_size)) rows: list[dict[str, object]] = [] for start in context_starts: result = candidate.run( eth_candles=eth_candles[start : start + block_size], btc_candles=btc_candles[start : start + block_size], leverage=leverage, warmup_bars=candidate.warmup_bars, ) report_start = start + candidate.warmup_bars report_end = start + block_size - 1 rows.append( { "window_start_ts": eth_candles[report_start].ts, "window_end_ts": eth_candles[report_end].ts, "total_return": result.total_return, "trade_count": result.trade_count, "win_rate": result.win_rate, "max_drawdown": result.max_drawdown, "trades": result.trades, } ) return rows def summarize_window_rows(rows: list[dict[str, object]], name: str = "") -> dict[str, object]: returns = [float(row["total_return"]) for row in rows] trade_returns = [ float(trade["return_pct"]) / 100.0 for row in rows for trade in row["trades"] ] winning_trade_returns = [value for value in trade_returns if value > 0.0] losing_trade_returns = [value for value in trade_returns if value < 0.0] avg_win_return = sum(winning_trade_returns) / len(winning_trade_returns) if winning_trade_returns else 0.0 avg_loss_return_abs = abs(sum(losing_trade_returns) / len(losing_trade_returns)) if losing_trade_returns else 0.0 gross_profit = sum(winning_trade_returns) gross_loss_abs = abs(sum(losing_trade_returns)) series = pd.Series(returns, dtype=float) sample_count = len(returns) std = float(series.std(ddof=1)) if sample_count > 1 else 0.0 ci_half_width = 1.96 * std / sqrt(sample_count) if sample_count > 1 else 0.0 return { "name": name, "sample_count": sample_count, "avg_return": float(series.mean()), "ci95_low": float(series.mean() - ci_half_width), "ci95_high": float(series.mean() + ci_half_width), "median_return": float(series.median()), "positive_window_rate": float((series > 0).mean()), "worst_return": float(series.min()), "p10_return": float(series.quantile(0.10)), "p90_return": float(series.quantile(0.90)), "best_return": float(series.max()), "trades": sum(int(row["trade_count"]) for row in rows), "avg_trades_per_window": sum(int(row["trade_count"]) for row in rows) / sample_count, "win_rate": sum(float(row["win_rate"]) for row in rows) / sample_count, "trade_win_rate": len(winning_trade_returns) / len(trade_returns) if trade_returns else 0.0, "avg_trade_return": sum(trade_returns) / len(trade_returns) if trade_returns else 0.0, "avg_win_return": avg_win_return, "avg_loss_return_abs": avg_loss_return_abs, "payoff_ratio": avg_win_return / avg_loss_return_abs if avg_loss_return_abs else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, "expectancy_per_trade": sum(trade_returns) / len(trade_returns) if trade_returns else 0.0, "max_drawdown": max(float(row["max_drawdown"]) for row in rows), "return_drawdown_ratio": float(series.mean()) / max(float(row["max_drawdown"]) for row in rows) if max(float(row["max_drawdown"]) for row in rows) else 0.0, } def sort_robust_results(frame: pd.DataFrame) -> pd.DataFrame: return frame.sort_values(["ci95_low", "avg_return"], ascending=False) def add_cost_metrics(frame: pd.DataFrame, roundtrip_cost_on_margin: float) -> pd.DataFrame: frame = frame.copy() cost = frame["avg_trades_per_window"] * roundtrip_cost_on_margin frame["roundtrip_cost_on_margin"] = roundtrip_cost_on_margin frame["net_avg_return"] = frame["avg_return"] - cost frame["net_ci95_low"] = frame["ci95_low"] - cost frame["net_ci95_high"] = frame["ci95_high"] - cost frame["breakeven_roundtrip_cost_on_margin"] = frame["avg_return"] / frame["avg_trades_per_window"] return frame def sort_cost_results(frame: pd.DataFrame) -> pd.DataFrame: return frame.sort_values(["net_ci95_low", "net_avg_return"], ascending=False) def max_drawdown_from_equity(equity_values: list[float]) -> float: peak = equity_values[0] max_drawdown = 0.0 for equity in equity_values: peak = max(peak, equity) if peak > 0.0: max_drawdown = max(max_drawdown, (peak - equity) / peak) return max_drawdown def cost_adjusted_trade_equity_frame(result: SegmentResult, roundtrip_cost_on_margin: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - roundtrip_cost_on_margin * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) return pd.DataFrame(rows) def annualized_metrics_from_equity(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 max_drawdown = max_drawdown_from_equity([float(value) for value in frame["equity"]]) daily = frame.set_index("ts")["equity"].resample("1D").last().ffill() daily_returns = daily.pct_change().dropna() daily_std = float(daily_returns.std(ddof=1)) if len(daily_returns) > 1 else 0.0 sharpe = float(daily_returns.mean()) / daily_std * sqrt(365) if daily_std else 0.0 return { "net_total_return": total_return, "net_annualized_return": annualized_return, "net_max_drawdown": max_drawdown, "net_calmar": annualized_return / max_drawdown if max_drawdown else 0.0, "net_sharpe_daily": sharpe, } def recent_horizon_metrics_from_equity( frame: pd.DataFrame, last_ts: int, horizons: tuple[tuple[str, pd.DateOffset], ...], ) -> pd.DataFrame: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in horizons: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) start_time = cutoff after_cutoff = frame[frame["ts"] > cutoff] horizon_frame = pd.concat( [ pd.DataFrame([{"ts": start_time, "equity": start_equity}]), after_cutoff[["ts", "equity"]], ], ignore_index=True, ) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) metrics = annualized_metrics_from_equity( horizon_frame, int(start_time.timestamp() * 1000), last_ts, ) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), "horizon_days": (end_time - start_time).total_seconds() / 86_400, **metrics, } ) return pd.DataFrame(rows) def build_rsi2_candidate(trend: int, long_threshold: float, short_threshold: float) -> Candidate: config = RSI2Config( trend_sma=trend, rsi_length=2, rsi_long_threshold=long_threshold, rsi_short_threshold=short_threshold, exit_rsi=50.0, ) return Candidate( f"rsi2-t{trend}-l{long_threshold}-s{short_threshold}", max(trend, 3), lambda candles, leverage, warmup_bars, config=config: run_rsi2_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config, ), ) def build_rsi2_side_candidate( trend: int, long_threshold: float, short_threshold: float, exit_rsi: float, side_mode: str, ) -> Candidate: config = RSI2Config( trend_sma=trend, rsi_length=2, rsi_long_threshold=long_threshold, rsi_short_threshold=short_threshold, exit_rsi=exit_rsi, ) return Candidate( f"rsi2-{side_mode}-t{trend}-l{long_threshold}-s{short_threshold}-x{exit_rsi}", max(trend, 3), lambda candles, leverage, warmup_bars, config=config, side_mode=side_mode: run_rsi2_side_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config, side_mode=side_mode, ), ) def build_rsi2_long_guarded_candidate( trend: int, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, ) -> Candidate: return Candidate( f"rsi2-long-guarded-t{trend}-l{rsi_threshold}-x{exit_rsi}-sl{stop_loss_pct}-mh{max_hold_bars}", max(trend, 3), lambda candles, leverage, warmup_bars, trend=trend, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=max_hold_bars: run_rsi2_long_guarded_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, trend_sma=trend, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=max_hold_bars, ), ) def build_rsi2_long_guarded_twap_candidate( trend: int, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, entry_slices: int, ) -> Candidate: return Candidate( f"rsi2-long-guarded-twap{entry_slices}-t{trend}-l{rsi_threshold}-x{exit_rsi}-sl{stop_loss_pct}-mh{max_hold_bars}", max(trend, 3), lambda candles, leverage, warmup_bars, trend=trend, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=max_hold_bars, entry_slices=entry_slices: run_rsi2_long_guarded_twap_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, trend_sma=trend, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=max_hold_bars, entry_slices=entry_slices, ), ) def build_rsi2_long_guarded_price_twap_candidate( trend: int, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, entry_offsets: tuple[float, ...], entry_valid_bars: int, fill_buffer: float = 0.0, ) -> Candidate: offset_label = "-".join(f"{offset:.4f}" for offset in entry_offsets) buffer_label = f"-fb{fill_buffer:.4f}" if fill_buffer else "" return Candidate( f"rsi2-long-guarded-price-twap-o{offset_label}-v{entry_valid_bars}{buffer_label}-t{trend}-l{rsi_threshold}-x{exit_rsi}-sl{stop_loss_pct}-mh{max_hold_bars}", max(trend, 3), lambda candles, leverage, warmup_bars, trend=trend, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=max_hold_bars, entry_offsets=entry_offsets, entry_valid_bars=entry_valid_bars, fill_buffer=fill_buffer: run_rsi2_long_guarded_price_twap_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, trend_sma=trend, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=max_hold_bars, entry_offsets=entry_offsets, entry_valid_bars=entry_valid_bars, fill_buffer=fill_buffer, ), ) def build_ma_cross_candidate(fast: int, slow: int, side_mode: str) -> Candidate: return Candidate( f"ma-cross-{side_mode}-f{fast}-s{slow}", slow, lambda candles, leverage, warmup_bars, fast=fast, slow=slow, side_mode=side_mode: run_ma_cross_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, fast=fast, slow=slow, side_mode=side_mode, ), ) def build_trend_rsi_bb_long_candidate( trend_sma: int, band_length: int, std_multiplier: float, rsi_threshold: float, exit_rsi: float, stop_loss_pct: float, ) -> Candidate: return Candidate( f"trend-rsi-bb-long-t{trend_sma}-b{band_length}-m{std_multiplier}-r{rsi_threshold}-x{exit_rsi}-sl{stop_loss_pct}", max(trend_sma, band_length, 3), lambda candles, leverage, warmup_bars, trend_sma=trend_sma, band_length=band_length, std_multiplier=std_multiplier, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct: run_trend_rsi_bb_long_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, trend_sma=trend_sma, band_length=band_length, std_multiplier=std_multiplier, rsi_threshold=rsi_threshold, exit_rsi=exit_rsi, stop_loss_pct=stop_loss_pct, ), ) def build_regime_hybrid_candidate( trend_sma: int, regime_lookback: int, neutral_ma_distance: float, rsi_long_threshold: float, rsi_exit: float, bb_std: float, stop_loss_pct: float, ) -> Candidate: return Candidate( f"regime-hybrid-t{trend_sma}-r{regime_lookback}-n{neutral_ma_distance}-l{rsi_long_threshold}-x{rsi_exit}-m{bb_std}-sl{stop_loss_pct}", max(trend_sma, regime_lookback, 20, 50, 3), lambda candles, leverage, warmup_bars, trend_sma=trend_sma, regime_lookback=regime_lookback, neutral_ma_distance=neutral_ma_distance, rsi_long_threshold=rsi_long_threshold, rsi_exit=rsi_exit, bb_std=bb_std, stop_loss_pct=stop_loss_pct: run_regime_hybrid_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, trend_sma=trend_sma, regime_lookback=regime_lookback, neutral_ma_distance=neutral_ma_distance, rsi_long_threshold=rsi_long_threshold, rsi_exit=rsi_exit, bb_length=20, bb_std=bb_std, bb_bandwidth_lookback=50, stop_loss_pct=stop_loss_pct, ), ) def build_eth_btc_rsi_filter_candidate( eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, ) -> PairCandidate: return PairCandidate( f"eth-btc-rsi-filter-et{eth_trend_sma}-l{eth_rsi_threshold}-x{eth_exit_rsi}-bt{btc_trend_sma}-bm{btc_momentum_lookback}-br{btc_min_momentum}", max(eth_trend_sma, btc_trend_sma, btc_momentum_lookback, 3), lambda eth_candles, btc_candles, leverage, warmup_bars, eth_trend_sma=eth_trend_sma, eth_rsi_threshold=eth_rsi_threshold, eth_exit_rsi=eth_exit_rsi, btc_trend_sma=btc_trend_sma, btc_momentum_lookback=btc_momentum_lookback, btc_min_momentum=btc_min_momentum: run_eth_btc_rsi_filter_segment( eth_candles=eth_candles, btc_candles=btc_candles, leverage=leverage, warmup_bars=warmup_bars, eth_trend_sma=eth_trend_sma, eth_rsi_threshold=eth_rsi_threshold, eth_exit_rsi=eth_exit_rsi, btc_trend_sma=btc_trend_sma, btc_momentum_lookback=btc_momentum_lookback, btc_min_momentum=btc_min_momentum, ), ) def build_eth_btc_shock_filter_candidate( eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, btc_shock_lookback: int, btc_max_realized_vol: float, btc_max_drawdown: float, ) -> PairCandidate: return PairCandidate( f"eth-btc-shock-filter-et{eth_trend_sma}-l{eth_rsi_threshold}-x{eth_exit_rsi}-bt{btc_trend_sma}-bm{btc_momentum_lookback}-br{btc_min_momentum}-sw{btc_shock_lookback}-sv{btc_max_realized_vol}-sd{btc_max_drawdown}", max(eth_trend_sma, btc_trend_sma, btc_momentum_lookback, btc_shock_lookback + 1, 3), lambda eth_candles, btc_candles, leverage, warmup_bars, eth_trend_sma=eth_trend_sma, eth_rsi_threshold=eth_rsi_threshold, eth_exit_rsi=eth_exit_rsi, btc_trend_sma=btc_trend_sma, btc_momentum_lookback=btc_momentum_lookback, btc_min_momentum=btc_min_momentum, btc_shock_lookback=btc_shock_lookback, btc_max_realized_vol=btc_max_realized_vol, btc_max_drawdown=btc_max_drawdown: run_eth_btc_shock_filter_segment( eth_candles=eth_candles, btc_candles=btc_candles, leverage=leverage, warmup_bars=warmup_bars, eth_trend_sma=eth_trend_sma, eth_rsi_threshold=eth_rsi_threshold, eth_exit_rsi=eth_exit_rsi, btc_trend_sma=btc_trend_sma, btc_momentum_lookback=btc_momentum_lookback, btc_min_momentum=btc_min_momentum, btc_shock_lookback=btc_shock_lookback, btc_max_realized_vol=btc_max_realized_vol, btc_max_drawdown=btc_max_drawdown, ), ) def build_eth_btc_ratio_pullback_candidate( btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, ratio_length: int, ratio_std: float, ratio_rsi_threshold: float, stop_loss_pct: float, ) -> PairCandidate: return PairCandidate( f"eth-btc-ratio-pullback-bt{btc_trend_sma}-bm{btc_momentum_lookback}-br{btc_min_momentum}-rl{ratio_length}-rs{ratio_std}-rr{ratio_rsi_threshold}-sl{stop_loss_pct}", max(btc_trend_sma, btc_momentum_lookback, ratio_length, 3), lambda eth_candles, btc_candles, leverage, warmup_bars, btc_trend_sma=btc_trend_sma, btc_momentum_lookback=btc_momentum_lookback, btc_min_momentum=btc_min_momentum, ratio_length=ratio_length, ratio_std=ratio_std, ratio_rsi_threshold=ratio_rsi_threshold, stop_loss_pct=stop_loss_pct: run_eth_btc_ratio_pullback_segment( eth_candles=eth_candles, btc_candles=btc_candles, leverage=leverage, warmup_bars=warmup_bars, btc_trend_sma=btc_trend_sma, btc_momentum_lookback=btc_momentum_lookback, btc_min_momentum=btc_min_momentum, ratio_length=ratio_length, ratio_std=ratio_std, ratio_rsi_threshold=ratio_rsi_threshold, stop_loss_pct=stop_loss_pct, ), ) def build_btc_lead_eth_lag_candidate( lead_lookback: int, btc_return_threshold: float, lag_gap: float, max_hold_bars: int, stop_loss_pct: float, take_profit_pct: float, ) -> PairCandidate: return PairCandidate( f"btc-lead-eth-lag-lb{lead_lookback}-br{btc_return_threshold}-gap{lag_gap}-mh{max_hold_bars}-sl{stop_loss_pct}-tp{take_profit_pct}", lead_lookback, lambda eth_candles, btc_candles, leverage, warmup_bars, lead_lookback=lead_lookback, btc_return_threshold=btc_return_threshold, lag_gap=lag_gap, max_hold_bars=max_hold_bars, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct: run_btc_lead_eth_lag_segment( eth_candles=eth_candles, btc_candles=btc_candles, leverage=leverage, warmup_bars=warmup_bars, lead_lookback=lead_lookback, btc_return_threshold=btc_return_threshold, lag_gap=lag_gap, max_hold_bars=max_hold_bars, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, ), ) def history_bars_for_years(bar: str, years: float) -> int: if not bar.endswith("m"): raise ValueError("minute bar is required") minutes = int(bar[:-1]) if minutes <= 0: raise ValueError("minute bar is required") return int(MINUTES_PER_YEAR * years / minutes) def build_strategy_timeframe_candidates() -> list[Candidate]: return [ Candidate("bbmr-default", 69, lambda candles, leverage, warmup_bars: run_bbmr_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=BBMRConfig())), Candidate("bbsb-default", 69, lambda candles, leverage, warmup_bars: run_bbsb_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=BBSBConfig())), Candidate("donchian-e12-x6-s0.008", 12, lambda candles, leverage, warmup_bars: run_donchian_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=DonchianConfig(entry_window=12, exit_window=6, stop_loss_pct=0.008))), build_rsi2_candidate(50, 3.0, 97.0), Candidate("ema-pullback-f13-s34-b0.006", 34, lambda candles, leverage, warmup_bars: run_ema_pullback_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=EMAPullbackConfig(fast_ema=13, slow_ema=34, stop_buffer_pct=0.006))), Candidate("range-momo-l10-tp0.006-sl0.004", 10, lambda candles, leverage, warmup_bars: run_range_momentum_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, lookback=10, take_profit_pct=0.006, stop_loss_pct=0.004)), Candidate("vwap-revert-w72-z2.0-sl0.006", 144, lambda candles, leverage, warmup_bars: run_vwap_reversion_segment(candles=candles, leverage=leverage, warmup_bars=warmup_bars, window=72, entry_z=2.0, exit_z=0.2, stop_loss_pct=0.006)), ] def summarize_periods(frame: pd.DataFrame, period: str, roundtrip_cost_on_margin: float) -> pd.DataFrame: period_frame = frame.copy() period_frame["period"] = pd.to_datetime(period_frame["window_end_ts"], unit="ms", utc=True).dt.tz_localize(None).dt.to_period(period).astype(str) grouped = ( period_frame.groupby("period", as_index=False) .agg( window_count=("total_return", "size"), avg_return=("total_return", "mean"), median_return=("total_return", "median"), positive_window_rate=("total_return", lambda values: float((values > 0.0).mean())), worst_return=("total_return", "min"), best_return=("total_return", "max"), trades=("trade_count", "sum"), avg_trades_per_window=("trade_count", "mean"), avg_window_win_rate=("win_rate", "mean"), max_drawdown=("max_drawdown", "max"), ) .sort_values("period") ) grouped["net_avg_return"] = grouped["avg_return"] - grouped["avg_trades_per_window"] * roundtrip_cost_on_margin return grouped def summarize_equity_periods(result: SegmentResult, period: str) -> pd.DataFrame: frame = pd.DataFrame(result.equity_curve) frame["period"] = pd.to_datetime(frame["ts"], unit="ms", utc=True).dt.tz_localize(None).dt.to_period(period).astype(str) grouped = ( frame.groupby("period", as_index=False) .agg( start_equity=("equity", "first"), end_equity=("equity", "last"), min_equity=("equity", "min"), max_equity=("equity", "max"), bars=("equity", "size"), ) .sort_values("period") ) grouped["return"] = grouped["end_equity"] / grouped["start_equity"] - 1.0 grouped["drawdown_from_period_high"] = (grouped["max_equity"] - grouped["min_equity"]) / grouped["max_equity"] return grouped def summarize_cost_adjusted_trade_equity_periods( result: SegmentResult, period: str, roundtrip_cost_on_margin: float, ) -> pd.DataFrame: frame = cost_adjusted_trade_equity_frame(result, roundtrip_cost_on_margin) frame["period"] = frame["ts"].dt.tz_localize(None).dt.to_period(period).astype(str) grouped = ( frame.groupby("period", as_index=False) .agg( start_equity=("equity", "first"), end_equity=("equity", "last"), min_equity=("equity", "min"), max_equity=("equity", "max"), trades=("equity", lambda values: max(len(values) - 1, 0)), ) .sort_values("period") ) grouped["return"] = grouped["end_equity"] / grouped["start_equity"] - 1.0 grouped["drawdown_from_period_high"] = (grouped["max_equity"] - grouped["min_equity"]) / grouped["max_equity"] return grouped def add_market_regime_columns(candles: list[Candle], rows: list[dict[str, object]], roundtrip_cost_on_margin: float) -> pd.DataFrame: index_by_ts = {candle.ts: index for index, candle in enumerate(candles)} output_rows: list[dict[str, object]] = [] closes = pd.Series([candle.close for candle in candles], dtype=float) long_ma = closes.rolling(240).mean().tolist() for row in rows: start = index_by_ts[int(row["window_start_ts"])] end = index_by_ts[int(row["window_end_ts"])] window_closes = closes.iloc[start : end + 1] returns = window_closes.pct_change().dropna() ma_value = long_ma[end] close_value = float(window_closes.iloc[-1]) output_rows.append( { **{key: value for key, value in row.items() if key != "trades"}, "net_return": float(row["total_return"]) - int(row["trade_count"]) * roundtrip_cost_on_margin, "market_return": close_value / float(window_closes.iloc[0]) - 1.0, "realized_vol": float(returns.std(ddof=1)) if len(returns) > 1 else 0.0, "ma240_distance": close_value / float(ma_value) - 1.0 if ma_value == ma_value and ma_value else 0.0, } ) frame = pd.DataFrame(output_rows) frame["market_return_bucket"] = pd.qcut(frame["market_return"], 3, labels=["down", "flat", "up"], duplicates="drop") frame["realized_vol_bucket"] = pd.qcut(frame["realized_vol"], 3, labels=["low", "mid", "high"], duplicates="drop") frame["ma240_distance_bucket"] = pd.qcut(frame["ma240_distance"], 3, labels=["below", "near", "above"], duplicates="drop") return frame def summarize_regime_columns(frame: pd.DataFrame) -> pd.DataFrame: summaries: list[pd.DataFrame] = [] for column in ("market_return_bucket", "realized_vol_bucket", "ma240_distance_bucket"): grouped = ( frame.groupby(["symbol", "bar", "name", column], observed=True, as_index=False) .agg( sample_count=("net_return", "size"), avg_net_return=("net_return", "mean"), median_net_return=("net_return", "median"), positive_window_rate=("net_return", lambda values: float((values > 0.0).mean())), worst_net_return=("net_return", "min"), best_net_return=("net_return", "max"), avg_trades=("trade_count", "mean"), avg_market_return=("market_return", "mean"), avg_realized_vol=("realized_vol", "mean"), avg_ma240_distance=("ma240_distance", "mean"), ) .rename(columns={column: "regime"}) ) grouped.insert(3, "regime_type", column.removesuffix("_bucket")) summaries.append(grouped) return pd.concat(summaries, ignore_index=True).sort_values( ["name", "regime_type", "avg_net_return"], ascending=[True, True, False], ) def main() -> int: client = OkxClient() candidates = build_candidates() rows: list[dict[str, object]] = [] for symbol in SYMBOLS: for bar in BARS: candles = get_candles_cached(client, symbol, bar, HISTORY_LIMIT) for candidate in candidates: metrics = evaluate_candidate(candidate, candles) rows.append({"symbol": symbol, "bar": bar, **metrics}) print(f"done {symbol} {bar}") frame = pd.DataFrame(rows) frame["score"] = frame["avg_return"] - frame["max_drawdown"] * 0.25 columns = ["symbol", "bar", "name", "avg_return", "median_return", "worst_return", "best_return", "trades", "win_rate", "max_drawdown", "score"] print(frame.sort_values(["avg_return", "median_return"], ascending=False)[columns].head(30).to_string(index=False)) frame.to_csv("ultrashort-exploration.csv", index=False) return 0 def focus_vwap() -> int: client = OkxClient() candles = get_candles_cached(client, "ETH-USDT-SWAP", "3m", HISTORY_LIMIT) rows: list[dict[str, object]] = [] for seed in (3, 7, 11, 17, 23): for window in (56, 64, 72, 80, 96): for entry_z in (1.6, 1.8, 2.0, 2.2, 2.4): for stop in (0.004, 0.006, 0.008): warmup_bars = window * 2 sampled = sample_segments( candles=candles, segments=SEGMENTS, window_size=WINDOW_SIZE, warmup_bars=warmup_bars, seed=seed, ) results = [ run_vwap_reversion_segment( candles=candles[segment.context_start : segment.report_end], leverage=LEVERAGE, warmup_bars=warmup_bars, window=window, entry_z=entry_z, exit_z=0.2, stop_loss_pct=stop, ) for segment in sampled ] returns = [result.total_return for result in results] rows.append( { "seed": seed, "window": window, "entry_z": entry_z, "stop": stop, "avg_return": sum(returns) / len(returns), "median_return": float(pd.Series(returns).median()), "worst_return": min(returns), "best_return": max(returns), "trades": sum(result.trade_count for result in results), "win_rate": sum(result.win_rate for result in results) / len(results), "max_drawdown": max(result.max_drawdown for result in results), } ) frame = pd.DataFrame(rows) grouped = ( frame.groupby(["window", "entry_z", "stop"], as_index=False) .agg( avg_return=("avg_return", "mean"), median_return=("median_return", "mean"), worst_seed_avg=("avg_return", "min"), worst_window_return=("worst_return", "min"), trades=("trades", "mean"), win_rate=("win_rate", "mean"), max_drawdown=("max_drawdown", "max"), ) .sort_values(["avg_return", "worst_seed_avg"], ascending=False) ) print(grouped.head(30).to_string(index=False)) grouped.to_csv("ultrashort-vwap-focus.csv", index=False) return 0 def robust_vwap(history_limit: int, window_size: int) -> int: client = OkxClient() rows: list[dict[str, object]] = [] candidates = [ Candidate( f"vwap-revert-w{window}-z{entry_z}-sl{stop}", window * 2, lambda candles, leverage, warmup_bars, window=window, entry_z=entry_z, stop=stop: run_vwap_reversion_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, window=window, entry_z=entry_z, exit_z=0.2, stop_loss_pct=stop, ), ) for window in (56, 64, 72, 80, 96) for entry_z in (1.6, 1.8, 2.0, 2.2, 2.4) for stop in (0.004, 0.006, 0.008) ] for symbol in SYMBOLS: candles = get_candles_cached(client, symbol, "3m", history_limit) for candidate in candidates: metrics = evaluate_candidate_all_windows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) rows.append({"symbol": symbol, "bar": "3m", "history_bars": len(candles), **metrics}) print(f"done robust {symbol} 3m {len(candles)} bars") frame = pd.DataFrame(rows) columns = [ "symbol", "bar", "history_bars", "name", "sample_count", "avg_return", "ci95_low", "ci95_high", "median_return", "positive_window_rate", "worst_return", "p10_return", "p90_return", "best_return", "trades", "avg_trades_per_window", "win_rate", "trade_win_rate", "avg_trade_return", "avg_win_return", "avg_loss_return_abs", "payoff_ratio", "profit_factor", "expectancy_per_trade", "max_drawdown", "return_drawdown_ratio", ] frame = sort_robust_results(frame) print(GROSS_RETURN_NOTE) print(frame[columns].head(30).to_string(index=False)) frame.to_csv("ultrashort-vwap-robust.csv", index=False) return 0 def robust_all(history_limit: int, window_size: int) -> int: client = OkxClient() candidates = build_candidates() rows: list[dict[str, object]] = [] for symbol in SYMBOLS: for bar in BARS: candles = get_candles_cached(client, symbol, bar, history_limit) for candidate in candidates: metrics = evaluate_candidate_all_windows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) rows.append({"symbol": symbol, "bar": bar, "history_bars": len(candles), **metrics}) print(f"done robust all {symbol} {bar} {len(candles)} bars") frame = pd.DataFrame(rows) columns = [ "symbol", "bar", "history_bars", "name", "sample_count", "avg_return", "ci95_low", "ci95_high", "median_return", "positive_window_rate", "worst_return", "p10_return", "p90_return", "best_return", "trades", "avg_trades_per_window", "win_rate", "trade_win_rate", "avg_trade_return", "avg_win_return", "avg_loss_return_abs", "payoff_ratio", "profit_factor", "expectancy_per_trade", "max_drawdown", "return_drawdown_ratio", ] frame = sort_robust_results(frame) print(GROSS_RETURN_NOTE) print(frame[columns].head(40).to_string(index=False)) frame.to_csv("ultrashort-robust-all.csv", index=False) return 0 def robust_rsi2_cost_search(history_limit: int, window_size: int, roundtrip_cost_on_margin: float) -> int: client = OkxClient() candidates = [ build_rsi2_candidate(trend, long_threshold, short_threshold) for trend in (30, 50, 80, 120, 160, 240) for long_threshold, short_threshold in ( (3.0, 97.0), (5.0, 95.0), (8.0, 92.0), (10.0, 90.0), (12.0, 88.0), (15.0, 85.0), (18.0, 82.0), ) ] rows: list[dict[str, object]] = [] for symbol in SYMBOLS: for bar in ("3m", "5m", "15m"): candles = get_candles_cached(client, symbol, bar, history_limit) for candidate in candidates: metrics = evaluate_candidate_all_windows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) rows.append({"symbol": symbol, "bar": bar, "history_bars": len(candles), **metrics}) print(f"done robust rsi2 cost {symbol} {bar} {len(candles)} bars") frame = sort_cost_results(add_cost_metrics(pd.DataFrame(rows), roundtrip_cost_on_margin)) columns = [ "symbol", "bar", "history_bars", "name", "sample_count", "avg_return", "ci95_low", "avg_trades_per_window", "breakeven_roundtrip_cost_on_margin", "roundtrip_cost_on_margin", "net_avg_return", "net_ci95_low", "net_ci95_high", "median_return", "positive_window_rate", "worst_return", "trade_win_rate", "avg_trade_return", "avg_win_return", "avg_loss_return_abs", "payoff_ratio", "profit_factor", "expectancy_per_trade", "max_drawdown", "return_drawdown_ratio", ] print(GROSS_RETURN_NOTE) print(frame[columns].head(40).to_string(index=False)) frame.to_csv("ultrashort-rsi2-cost-search.csv", index=False) return 0 def rsi2_period_analysis( *, symbol: str, bar: str, history_limit: int, window_size: int, roundtrip_cost_on_margin: float, trend: int, long_threshold: float, short_threshold: float, ) -> int: client = OkxClient() candles = get_candles_cached(client, symbol, bar, history_limit) candidate = build_rsi2_candidate(trend, long_threshold, short_threshold) rows = evaluate_candidate_window_rows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) window_frame = pd.DataFrame(rows).drop(columns=["trades"]) window_frame["window_start"] = pd.to_datetime(window_frame["window_start_ts"], unit="ms", utc=True).dt.strftime("%Y-%m-%d %H:%M") window_frame["window_end"] = pd.to_datetime(window_frame["window_end_ts"], unit="ms", utc=True).dt.strftime("%Y-%m-%d %H:%M") window_frame["net_return"] = window_frame["total_return"] - window_frame["trade_count"] * roundtrip_cost_on_margin window_frame.insert(0, "name", candidate.name) window_frame.insert(0, "history_bars", len(candles)) window_frame.insert(0, "bar", bar) window_frame.insert(0, "symbol", symbol) monthly = summarize_periods(window_frame, "M", roundtrip_cost_on_margin) quarterly = summarize_periods(window_frame, "Q", roundtrip_cost_on_margin) full_result = candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) equity_monthly = summarize_equity_periods(full_result, "M") equity_quarterly = summarize_equity_periods(full_result, "Q") net_trade_equity_monthly = summarize_cost_adjusted_trade_equity_periods(full_result, "M", roundtrip_cost_on_margin) net_trade_equity_quarterly = summarize_cost_adjusted_trade_equity_periods(full_result, "Q", roundtrip_cost_on_margin) window_frame.to_csv("ultrashort-rsi2-window-distribution.csv", index=False) monthly.to_csv("ultrashort-rsi2-monthly.csv", index=False) quarterly.to_csv("ultrashort-rsi2-quarterly.csv", index=False) equity_monthly.to_csv("ultrashort-rsi2-equity-monthly.csv", index=False) equity_quarterly.to_csv("ultrashort-rsi2-equity-quarterly.csv", index=False) net_trade_equity_monthly.to_csv("ultrashort-rsi2-net-trade-equity-monthly.csv", index=False) net_trade_equity_quarterly.to_csv("ultrashort-rsi2-net-trade-equity-quarterly.csv", index=False) summary = add_cost_metrics(pd.DataFrame([summarize_window_rows(rows, candidate.name)]), roundtrip_cost_on_margin) first_candle = _format_ts(candles[0].ts) last_candle = _format_ts(candles[-1].ts) print(f"actual data range UTC: {first_candle} -> {last_candle}; bars={len(candles)}") print( summary[ [ "sample_count", "avg_return", "ci95_low", "avg_trades_per_window", "roundtrip_cost_on_margin", "net_avg_return", "net_ci95_low", "positive_window_rate", "trades", "trade_win_rate", "avg_win_return", "avg_loss_return_abs", "payoff_ratio", "profit_factor", "max_drawdown", ] ].to_string(index=False) ) print("monthly") print(monthly.to_string(index=False)) print("quarterly") print(quarterly.to_string(index=False)) print("equity monthly") print(equity_monthly.to_string(index=False)) print("equity quarterly") print(equity_quarterly.to_string(index=False)) print("cost-adjusted closed-trade equity monthly") print(net_trade_equity_monthly.to_string(index=False)) print("cost-adjusted closed-trade equity quarterly") print(net_trade_equity_quarterly.to_string(index=False)) return 0 def strategy_timeframe_analysis( *, symbols: tuple[str, ...], bars: tuple[str, ...], years: float, window_size: int, roundtrip_cost_on_margin: float, period: str, ) -> int: client = OkxClient() candidates = build_strategy_timeframe_candidates() summary_rows: list[dict[str, object]] = [] period_frames: list[pd.DataFrame] = [] availability_rows: list[dict[str, object]] = [] for symbol in symbols: for bar in bars: requested_bars = history_bars_for_years(bar, years) candles = get_candles_cached(client, symbol, bar, requested_bars) first_ts = candles[0].ts last_ts = candles[-1].ts availability_rows.append( { "symbol": symbol, "bar": bar, "requested_years": years, "requested_bars": requested_bars, "actual_bars": len(candles), "first_candle": _format_ts(first_ts), "last_candle": _format_ts(last_ts), "actual_days": (last_ts - first_ts) / 86_400_000, "complete_requested_range": len(candles) >= requested_bars, } ) for candidate in candidates: rows = evaluate_candidate_window_rows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) summary = add_cost_metrics(pd.DataFrame([summarize_window_rows(rows, candidate.name)]), roundtrip_cost_on_margin).iloc[0].to_dict() summary_rows.append( { "symbol": symbol, "bar": bar, "requested_bars": requested_bars, "actual_bars": len(candles), "first_candle": _format_ts(first_ts), "last_candle": _format_ts(last_ts), **summary, } ) window_frame = pd.DataFrame(rows).drop(columns=["trades"]) period_frame = summarize_periods(window_frame, period, roundtrip_cost_on_margin) period_frame.insert(0, "name", candidate.name) period_frame.insert(0, "bar", bar) period_frame.insert(0, "symbol", symbol) period_frames.append(period_frame) print(f"done strategy timeframe {symbol} {bar} {candidate.name} windows={len(rows)}") availability = pd.DataFrame(availability_rows) summary_frame = sort_cost_results(pd.DataFrame(summary_rows)) period_summary = pd.concat(period_frames, ignore_index=True) availability.to_csv("ultrashort-strategy-timeframe-availability.csv", index=False) summary_frame.to_csv("ultrashort-strategy-timeframe-summary.csv", index=False) period_summary.to_csv("ultrashort-strategy-timeframe-periods.csv", index=False) print("availability") print(availability.to_string(index=False)) print("summary") print( summary_frame[ [ "symbol", "bar", "name", "actual_bars", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(50).to_string(index=False) ) return 0 def rsi2_variant_search( *, symbol: str, bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() requested_bars = history_bars_for_years(bar, years) candles = get_candles_cached(client, symbol, bar, requested_bars) candidates = [ build_rsi2_side_candidate(trend, long_threshold, short_threshold, exit_rsi, side_mode) for trend in (50, 120, 240, 480) for long_threshold, short_threshold in ( (2.0, 98.0), (3.0, 97.0), (5.0, 95.0), ) for exit_rsi in (45.0, 55.0) for side_mode in ("both", "long", "short") ] rows: list[dict[str, object]] = [] for candidate in candidates: metrics = evaluate_candidate_all_windows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) rows.append( { "symbol": symbol, "bar": bar, "requested_bars": requested_bars, "actual_bars": len(candles), "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), **metrics, } ) print(f"done rsi2 variant {candidate.name}") frame = sort_cost_results(add_cost_metrics(pd.DataFrame(rows), roundtrip_cost_on_margin)) frame.to_csv("ultrashort-rsi2-variant-search.csv", index=False) print( frame[ [ "symbol", "bar", "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "avg_win_return", "avg_loss_return_abs", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(50).to_string(index=False) ) return 0 def build_best_known_candidates() -> list[Candidate]: bbmr_config = BBMRConfig(band_length=20, std_multiplier=2.5, bandwidth_lookback=50, stop_loss_pct=0.005) return [ Candidate( "bbmr-l20-m2.5-sl0.005", 50, lambda candles, leverage, warmup_bars, config=bbmr_config: run_bbmr_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=config, ), ), build_rsi2_side_candidate(50, 3.0, 97.0, 45.0, "long"), build_rsi2_side_candidate(50, 3.0, 97.0, 55.0, "long"), build_rsi2_side_candidate(240, 2.0, 98.0, 55.0, "long"), build_trend_rsi_bb_long_candidate(480, 20, 2.0, 5.0, 45.0, 0.005), build_trend_rsi_bb_long_candidate(240, 20, 2.5, 5.0, 55.0, 0.008), build_regime_hybrid_candidate(240, 240, 0.015, 2.0, 55.0, 2.5, 0.008), build_regime_hybrid_candidate(50, 240, 0.02, 3.0, 55.0, 2.5, 0.005), ] def best_total_annualized( *, symbols: tuple[str, ...], bar: str, years: float, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] horizons = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) for symbol in symbols: candles = get_candles_cached(client, symbol, bar, history_bars_for_years(bar, years)) for candidate in build_best_known_candidates(): result = candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost_on_margin) metrics = annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts) gross_years = (candles[-1].ts - candles[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / gross_years) - 1.0 if result.total_return > -1.0 else 0.0 rows.append( { "symbol": symbol, "bar": bar, "name": candidate.name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "years": gross_years, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **metrics, } ) horizon_frame = recent_horizon_metrics_from_equity(net_equity, candles[-1].ts, horizons) for horizon_row in horizon_frame.to_dict("records"): horizon_rows.append( { "symbol": symbol, "bar": bar, "name": candidate.name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "trades": result.trade_count, **horizon_row, } ) frame = pd.DataFrame(rows).sort_values(["net_calmar", "net_annualized_return"], ascending=False) horizon_output = pd.DataFrame(horizon_rows) horizon_output["horizon"] = pd.Categorical(horizon_output["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizon_output = horizon_output.sort_values(["horizon", "net_annualized_return"], ascending=[True, False]) frame.to_csv("ultrashort-best-total-annualized.csv", index=False) horizon_output.to_csv("ultrashort-best-horizon-returns.csv", index=False) print(frame.to_string(index=False)) print("recent horizon returns") print(horizon_output.to_string(index=False)) return 0 def ma_cross_search( *, symbols: tuple[str, ...], bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() rows: list[dict[str, object]] = [] for symbol in symbols: candles = get_candles_cached(client, symbol, bar, history_bars_for_years(bar, years)) candidates = [ build_ma_cross_candidate(fast, slow, side_mode) for fast, slow in ((12, 48), (20, 80), (30, 120), (50, 200), (80, 320)) for side_mode in ("both", "long", "short") ] for candidate in candidates: metrics = evaluate_candidate_all_windows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) rows.append( { "symbol": symbol, "bar": bar, "actual_bars": len(candles), "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), **metrics, } ) print(f"done ma cross {symbol} {candidate.name}") frame = sort_cost_results(add_cost_metrics(pd.DataFrame(rows), roundtrip_cost_on_margin)) frame.to_csv("ultrashort-ma-cross-search.csv", index=False) print( frame[ [ "symbol", "bar", "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(40).to_string(index=False) ) return 0 def trend_rsi_bb_search( *, symbols: tuple[str, ...], bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() rows: list[dict[str, object]] = [] candidates = [ build_trend_rsi_bb_long_candidate(trend_sma, band_length, std_multiplier, rsi_threshold, exit_rsi, stop_loss_pct) for trend_sma in (120, 240, 480) for band_length in (20, 30) for std_multiplier in (2.0, 2.5) for rsi_threshold in (2.0, 3.0, 5.0) for exit_rsi in (45.0, 55.0) for stop_loss_pct in (0.005, 0.008) ] for symbol in symbols: candles = get_candles_cached(client, symbol, bar, history_bars_for_years(bar, years)) for candidate in candidates: metrics = evaluate_candidate_all_windows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) rows.append( { "symbol": symbol, "bar": bar, "actual_bars": len(candles), "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), **metrics, } ) print(f"done trend rsi bb {symbol} {candidate.name}") frame = sort_cost_results(add_cost_metrics(pd.DataFrame(rows), roundtrip_cost_on_margin)) frame.to_csv("ultrashort-trend-rsi-bb-search.csv", index=False) print( frame[ [ "symbol", "bar", "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "avg_win_return", "avg_loss_return_abs", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(50).to_string(index=False) ) return 0 def regime_analysis( *, symbols: tuple[str, ...], bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() window_frames: list[pd.DataFrame] = [] for symbol in symbols: candles = get_candles_cached(client, symbol, bar, history_bars_for_years(bar, years)) for candidate in build_best_known_candidates(): rows = evaluate_candidate_window_rows( candidate=candidate, candles=candles, window_size=window_size, leverage=LEVERAGE, ) frame = add_market_regime_columns(candles, rows, roundtrip_cost_on_margin) frame.insert(0, "name", candidate.name) frame.insert(0, "last_candle", _format_ts(candles[-1].ts)) frame.insert(0, "first_candle", _format_ts(candles[0].ts)) frame.insert(0, "bar", bar) frame.insert(0, "symbol", symbol) window_frames.append(frame) print(f"done regime {symbol} {candidate.name} windows={len(frame)}") windows = pd.concat(window_frames, ignore_index=True) summary = summarize_regime_columns(windows) windows.to_csv("ultrashort-regime-windows.csv", index=False) summary.to_csv("ultrashort-regime-summary.csv", index=False) print( summary[ [ "symbol", "bar", "name", "regime_type", "regime", "sample_count", "avg_net_return", "median_net_return", "positive_window_rate", "avg_trades", "avg_market_return", "avg_realized_vol", "avg_ma240_distance", ] ].to_string(index=False) ) return 0 def eth_btc_signal_search( *, bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() requested_bars = history_bars_for_years(bar, years) eth = get_candles_cached(client, "ETH-USDT-SWAP", bar, requested_bars) btc = get_candles_cached(client, "BTC-USDT-SWAP", bar, requested_bars) eth, btc = align_pair_candles(eth, btc) candidates = [ build_eth_btc_rsi_filter_candidate(eth_trend, eth_rsi, eth_exit, btc_trend, btc_momentum, btc_min_momentum) for eth_trend in (50, 120) for eth_rsi in (2.0, 3.0, 5.0) for eth_exit in (45.0, 55.0) for btc_trend in (120, 240, 480) for btc_momentum in (96, 240) for btc_min_momentum in (0.0, 0.01) ] summary_rows: list[dict[str, object]] = [] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] horizons = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) for candidate in candidates: rows = evaluate_pair_candidate_window_rows( candidate=candidate, eth_candles=eth, btc_candles=btc, window_size=window_size, leverage=LEVERAGE, ) summary = add_cost_metrics(pd.DataFrame([summarize_window_rows(rows, candidate.name)]), roundtrip_cost_on_margin).iloc[0].to_dict() summary_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "actual_bars": len(eth), "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), **summary, } ) result = candidate.run(eth_candles=eth, btc_candles=btc, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost_on_margin) metrics = annualized_metrics_from_equity(net_equity, eth[0].ts, eth[-1].ts) years_actual = (eth[-1].ts - eth[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / years_actual) - 1.0 if result.total_return > -1.0 else 0.0 total_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": years_actual, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **metrics, } ) horizon_frame = recent_horizon_metrics_from_equity(net_equity, eth[-1].ts, horizons) for horizon_row in horizon_frame.to_dict("records"): horizon_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "trades": result.trade_count, **horizon_row, } ) print(f"done eth btc signal {candidate.name}") summary = sort_cost_results(pd.DataFrame(summary_rows)) totals = pd.DataFrame(total_rows).sort_values(["net_calmar", "net_annualized_return"], ascending=False) horizon = pd.DataFrame(horizon_rows) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["horizon", "net_annualized_return"], ascending=[True, False]) summary.to_csv("ultrashort-eth-btc-signal-summary.csv", index=False) totals.to_csv("ultrashort-eth-btc-signal-total.csv", index=False) horizon.to_csv("ultrashort-eth-btc-signal-horizon.csv", index=False) print("window summary") print( summary[ [ "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(30).to_string(index=False) ) print("total") print(totals.head(30).to_string(index=False)) print("horizon") print(horizon.head(60).to_string(index=False)) return 0 def eth_btc_shock_filter_search( *, bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() requested_bars = history_bars_for_years(bar, years) eth = get_candles_cached(client, "ETH-USDT-SWAP", bar, requested_bars) btc = get_candles_cached(client, "BTC-USDT-SWAP", bar, requested_bars) eth, btc = align_pair_candles(eth, btc) candidates = [ build_eth_btc_shock_filter_candidate( eth_trend, eth_rsi, eth_exit, btc_trend, btc_momentum, btc_min_momentum, btc_shock_lookback, btc_max_realized_vol, btc_max_drawdown, ) for eth_trend in (50,) for eth_rsi in (3.0,) for eth_exit in (45.0, 55.0) for btc_trend in (480,) for btc_momentum in (240,) for btc_min_momentum in (0.0, 0.01) for btc_shock_lookback in (96, 240) for btc_max_realized_vol in (0.006, 0.01) for btc_max_drawdown in (0.03, 0.05, 0.08) ] summary_rows: list[dict[str, object]] = [] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] horizons = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) for candidate in candidates: rows = evaluate_pair_candidate_window_rows( candidate=candidate, eth_candles=eth, btc_candles=btc, window_size=window_size, leverage=LEVERAGE, ) summary = add_cost_metrics(pd.DataFrame([summarize_window_rows(rows, candidate.name)]), roundtrip_cost_on_margin).iloc[0].to_dict() summary_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "actual_bars": len(eth), "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), **summary, } ) result = candidate.run(eth_candles=eth, btc_candles=btc, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost_on_margin) metrics = annualized_metrics_from_equity(net_equity, eth[0].ts, eth[-1].ts) years_actual = (eth[-1].ts - eth[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / years_actual) - 1.0 if result.total_return > -1.0 else 0.0 total_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": years_actual, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **metrics, } ) horizon_frame = recent_horizon_metrics_from_equity(net_equity, eth[-1].ts, horizons) for horizon_row in horizon_frame.to_dict("records"): horizon_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "trades": result.trade_count, **horizon_row, } ) print(f"done eth btc shock filter {candidate.name}") summary = sort_cost_results(pd.DataFrame(summary_rows)) totals = pd.DataFrame(total_rows).sort_values(["net_calmar", "net_annualized_return"], ascending=False) horizon = pd.DataFrame(horizon_rows) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["horizon", "net_annualized_return"], ascending=[True, False]) summary.to_csv("ultrashort-eth-btc-shock-filter-summary.csv", index=False) totals.to_csv("ultrashort-eth-btc-shock-filter-total.csv", index=False) horizon.to_csv("ultrashort-eth-btc-shock-filter-horizon.csv", index=False) print("window summary") print( summary[ [ "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(30).to_string(index=False) ) print("total") print(totals.head(30).to_string(index=False)) print("horizon") print(horizon.head(60).to_string(index=False)) return 0 def eth_btc_ratio_search( *, bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() requested_bars = history_bars_for_years(bar, years) eth = get_candles_cached(client, "ETH-USDT-SWAP", bar, requested_bars) btc = get_candles_cached(client, "BTC-USDT-SWAP", bar, requested_bars) eth, btc = align_pair_candles(eth, btc) candidates = [ build_eth_btc_ratio_pullback_candidate(btc_trend, btc_momentum, btc_min_momentum, ratio_length, ratio_std, ratio_rsi, stop) for btc_trend in (480,) for btc_momentum in (96, 240) for btc_min_momentum in (0.0, 0.01) for ratio_length in (48, 96) for ratio_std in (1.5, 2.0) for ratio_rsi in (5.0,) for stop in (0.005, 0.008) ] summary_rows: list[dict[str, object]] = [] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] horizons = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) for candidate in candidates: rows = evaluate_pair_candidate_window_rows( candidate=candidate, eth_candles=eth, btc_candles=btc, window_size=window_size, leverage=LEVERAGE, ) summary = add_cost_metrics(pd.DataFrame([summarize_window_rows(rows, candidate.name)]), roundtrip_cost_on_margin).iloc[0].to_dict() summary_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "actual_bars": len(eth), "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), **summary, } ) result = candidate.run(eth_candles=eth, btc_candles=btc, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost_on_margin) metrics = annualized_metrics_from_equity(net_equity, eth[0].ts, eth[-1].ts) years_actual = (eth[-1].ts - eth[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / years_actual) - 1.0 if result.total_return > -1.0 else 0.0 total_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": years_actual, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **metrics, } ) horizon_frame = recent_horizon_metrics_from_equity(net_equity, eth[-1].ts, horizons) for horizon_row in horizon_frame.to_dict("records"): horizon_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "trades": result.trade_count, **horizon_row, } ) print(f"done eth btc ratio {candidate.name}") summary = sort_cost_results(pd.DataFrame(summary_rows)) totals = pd.DataFrame(total_rows).sort_values(["net_calmar", "net_annualized_return"], ascending=False) horizon = pd.DataFrame(horizon_rows) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["horizon", "net_annualized_return"], ascending=[True, False]) summary.to_csv("ultrashort-eth-btc-ratio-summary.csv", index=False) totals.to_csv("ultrashort-eth-btc-ratio-total.csv", index=False) horizon.to_csv("ultrashort-eth-btc-ratio-horizon.csv", index=False) print("window summary") print( summary[ [ "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(30).to_string(index=False) ) print("total") print(totals.head(30).to_string(index=False)) print("horizon") print(horizon.head(60).to_string(index=False)) return 0 def btc_lead_eth_lag_search( *, bar: str, years: float, window_size: int, roundtrip_cost_on_margin: float, ) -> int: client = OkxClient() requested_bars = history_bars_for_years(bar, years) eth = get_candles_cached(client, "ETH-USDT-SWAP", bar, requested_bars) btc = get_candles_cached(client, "BTC-USDT-SWAP", bar, requested_bars) eth, btc = align_pair_candles(eth, btc) candidates = [ build_btc_lead_eth_lag_candidate(lead_lookback, btc_return_threshold, lag_gap, max_hold_bars, stop_loss_pct, take_profit_pct) for lead_lookback in (8, 16) for btc_return_threshold in (0.012, 0.018, 0.024) for lag_gap in (0.006, 0.012) for max_hold_bars in (8, 32) for stop_loss_pct in (0.006, 0.008) for take_profit_pct in (0.012, 0.018) ] summary_rows: list[dict[str, object]] = [] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] horizons = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) for candidate in candidates: rows = evaluate_pair_candidate_window_rows( candidate=candidate, eth_candles=eth, btc_candles=btc, window_size=window_size, leverage=LEVERAGE, ) summary = add_cost_metrics(pd.DataFrame([summarize_window_rows(rows, candidate.name)]), roundtrip_cost_on_margin).iloc[0].to_dict() summary_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "actual_bars": len(eth), "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), **summary, } ) result = candidate.run(eth_candles=eth, btc_candles=btc, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost_on_margin) metrics = annualized_metrics_from_equity(net_equity, eth[0].ts, eth[-1].ts) years_actual = (eth[-1].ts - eth[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / years_actual) - 1.0 if result.total_return > -1.0 else 0.0 total_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": years_actual, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **metrics, } ) horizon_frame = recent_horizon_metrics_from_equity(net_equity, eth[-1].ts, horizons) for horizon_row in horizon_frame.to_dict("records"): horizon_rows.append( { "symbol": "ETH-USDT-SWAP", "signal_symbol": "BTC-USDT-SWAP", "bar": bar, "name": candidate.name, "trades": result.trade_count, **horizon_row, } ) print(f"done btc lead eth lag {candidate.name}") summary = sort_cost_results(pd.DataFrame(summary_rows)) totals = pd.DataFrame(total_rows).sort_values(["net_calmar", "net_annualized_return"], ascending=False) horizon = pd.DataFrame(horizon_rows) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["horizon", "net_annualized_return"], ascending=[True, False]) output_prefix = f"ultrashort-btc-lead-eth-lag-{bar}" summary.to_csv(f"{output_prefix}-summary.csv", index=False) totals.to_csv(f"{output_prefix}-total.csv", index=False) horizon.to_csv(f"{output_prefix}-horizon.csv", index=False) print("window summary") print( summary[ [ "name", "sample_count", "trades", "net_avg_return", "net_ci95_low", "positive_window_rate", "trade_win_rate", "payoff_ratio", "profit_factor", "max_drawdown", ] ].head(30).to_string(index=False) ) print("total") print(totals.head(30).to_string(index=False)) print("horizon") print(horizon.head(60).to_string(index=False)) return 0 if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--focus-vwap", action="store_true") parser.add_argument("--robust-vwap", action="store_true") parser.add_argument("--robust-all", action="store_true") parser.add_argument("--rsi2-cost-search", action="store_true") parser.add_argument("--rsi2-period-analysis", action="store_true") parser.add_argument("--strategy-timeframe-analysis", action="store_true") parser.add_argument("--rsi2-variant-search", action="store_true") parser.add_argument("--best-total-annualized", action="store_true") parser.add_argument("--ma-cross-search", action="store_true") parser.add_argument("--trend-rsi-bb-search", action="store_true") parser.add_argument("--regime-analysis", action="store_true") parser.add_argument("--eth-btc-signal-search", action="store_true") parser.add_argument("--eth-btc-shock-filter-search", action="store_true") parser.add_argument("--eth-btc-ratio-search", action="store_true") parser.add_argument("--btc-lead-eth-lag-search", action="store_true") parser.add_argument("--history-limit", type=int, default=ROBUST_HISTORY_LIMIT) parser.add_argument("--window-size", type=int, default=WINDOW_SIZE) parser.add_argument("--roundtrip-cost-on-margin", type=float, default=0.0012) parser.add_argument("--symbol", default="BTC-USDT-SWAP") parser.add_argument("--symbols", default="BTC-USDT-SWAP") parser.add_argument("--bar", default="15m") parser.add_argument("--bars", default=",".join(ANALYSIS_BARS)) parser.add_argument("--years", type=float, default=10.0) parser.add_argument("--period", default="Q") parser.add_argument("--rsi2-trend", type=int, default=50) parser.add_argument("--rsi2-long-threshold", type=float, default=3.0) parser.add_argument("--rsi2-short-threshold", type=float, default=97.0) args = parser.parse_args() if args.rsi2_period_analysis: raise SystemExit( rsi2_period_analysis( symbol=args.symbol, bar=args.bar, history_limit=args.history_limit, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, trend=args.rsi2_trend, long_threshold=args.rsi2_long_threshold, short_threshold=args.rsi2_short_threshold, ) ) if args.rsi2_cost_search: raise SystemExit(robust_rsi2_cost_search(args.history_limit, args.window_size, args.roundtrip_cost_on_margin)) if args.best_total_annualized: raise SystemExit( best_total_annualized( symbols=tuple(value.strip() for value in args.symbols.split(",") if value.strip()), bar=args.bar, years=args.years, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.ma_cross_search: raise SystemExit( ma_cross_search( symbols=tuple(value.strip() for value in args.symbols.split(",") if value.strip()), bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.trend_rsi_bb_search: raise SystemExit( trend_rsi_bb_search( symbols=tuple(value.strip() for value in args.symbols.split(",") if value.strip()), bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.regime_analysis: raise SystemExit( regime_analysis( symbols=tuple(value.strip() for value in args.symbols.split(",") if value.strip()), bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.eth_btc_signal_search: raise SystemExit( eth_btc_signal_search( bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.eth_btc_shock_filter_search: raise SystemExit( eth_btc_shock_filter_search( bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.eth_btc_ratio_search: raise SystemExit( eth_btc_ratio_search( bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.btc_lead_eth_lag_search: raise SystemExit( btc_lead_eth_lag_search( bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.rsi2_variant_search: raise SystemExit( rsi2_variant_search( symbol=args.symbol, bar=args.bar, years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, ) ) if args.strategy_timeframe_analysis: raise SystemExit( strategy_timeframe_analysis( symbols=tuple(value.strip() for value in args.symbols.split(",") if value.strip()), bars=tuple(value.strip() for value in args.bars.split(",") if value.strip()), years=args.years, window_size=args.window_size, roundtrip_cost_on_margin=args.roundtrip_cost_on_margin, period=args.period, ) ) if args.robust_all: raise SystemExit(robust_all(args.history_limit, args.window_size)) if args.robust_vwap: raise SystemExit(robust_vwap(args.history_limit, args.window_size)) raise SystemExit(focus_vwap() if args.focus_vwap else main())