from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path from typing import Callable import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-microstructure" PRIMARY_COST = "maker_taker" COSTS = ( ("maker_maker", 0.0012), ("maker_taker", 0.0021), ("taker_taker", 0.0030), ) HORIZONS = ( ("all", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Position: side: str entry_time: int entry_index: int entry_price: float account_at_entry: float margin_used: float stop_price: float take_price: float | None max_hold_bars: int @dataclass(frozen=True) class SegmentResult: trade_count: int win_rate: float gross_total_return: float gross_max_drawdown: float trades: list[dict[str, object]] equity_curve: list[dict[str, float | int]] @dataclass(frozen=True) class Variant: family: str name: str params: dict[str, object] run: Callable[[list[Candle]], SegmentResult] def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def _load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def _trade_equity(side: str, margin_used: float, entry_price: float, exit_price: float) -> float: if side == "long": price_return = (exit_price - entry_price) / entry_price else: price_return = (entry_price - exit_price) / entry_price return margin_used + margin_used * LEVERAGE * price_return def _mark_account(equity: float, position: Position | None, mark_price: float) -> float: if position is None: return equity marked = _trade_equity(position.side, position.margin_used, position.entry_price, mark_price) return equity - position.margin_used + marked def _max_drawdown(values: list[float]) -> float: peak = values[0] drawdown = 0.0 for value in values: peak = max(peak, value) drawdown = max(drawdown, (peak - value) / peak if peak else 0.0) return drawdown def _close_position( *, trades: list[dict[str, object]], position: Position, candle: Candle, exit_price: float, ) -> tuple[float, bool]: exit_margin_equity = _trade_equity(position.side, position.margin_used, position.entry_price, exit_price) account_equity = position.account_at_entry - position.margin_used + exit_margin_equity pnl = exit_margin_equity - position.margin_used trades.append( { "side": "Long" if position.side == "long" else "Short", "entry_time": _format_ts(position.entry_time), "exit_time": _format_ts(candle.ts), "entry_price": round(position.entry_price, 4), "exit_price": round(exit_price, 4), "pnl": pnl, "return_pct": pnl / position.account_at_entry * 100.0, "return_on_margin_pct": pnl / position.margin_used * 100.0, "cost_weight": position.margin_used / position.account_at_entry, "hold_bars": int((candle.ts - position.entry_time) / 900_000), } ) return account_equity, pnl > 0.0 def _open_position( *, side: str, candle: Candle, index: int, equity: float, margin_fraction: float, stop_loss_pct: float, take_profit_pct: float | None, max_hold_bars: int, ) -> Position: margin_used = equity * margin_fraction return Position( side=side, entry_time=candle.ts, entry_index=index, entry_price=candle.open, account_at_entry=equity, margin_used=margin_used, stop_price=candle.open * (1.0 - stop_loss_pct if side == "long" else 1.0 + stop_loss_pct), take_price=None if take_profit_pct is None else candle.open * (1.0 + take_profit_pct if side == "long" else 1.0 - take_profit_pct), max_hold_bars=max_hold_bars, ) def _empty_result(candles: list[Candle]) -> SegmentResult: return SegmentResult(0, 0.0, 0.0, 0.0, [], [{"ts": candles[0].ts, "equity": INITIAL_EQUITY, "close": candles[0].close}]) def _finalize_result(trades: list[dict[str, object]], equity_curve: list[dict[str, float | int]]) -> SegmentResult: wins = sum(1 for trade in trades if float(trade["pnl"]) > 0.0) values = [float(point["equity"]) for point in equity_curve] return SegmentResult( trade_count=len(trades), win_rate=wins / len(trades) if trades else 0.0, gross_total_return=values[-1] / INITIAL_EQUITY - 1.0, gross_max_drawdown=_max_drawdown(values), trades=trades, equity_curve=equity_curve, ) def _session_ok(candle: Candle, session: str) -> bool: hour = pd.to_datetime(candle.ts, unit="ms", utc=True).hour if session == "all": return True if session == "us": return 13 <= hour < 22 if session == "asia": return 0 <= hour < 8 return 8 <= hour < 16 def run_range_retest( candles: list[Candle], *, lookback: int, pullback_pct: float, stop_loss_pct: float, take_profit_pct: float, max_hold_bars: int, margin_fraction: float, session: str, ) -> SegmentResult: if len(candles) <= lookback + max_hold_bars: return _empty_result(candles) highs = pd.Series([c.high for c in candles], dtype=float).shift(1).rolling(lookback).max().tolist() lows = pd.Series([c.low for c in candles], dtype=float).shift(1).rolling(lookback).min().tolist() trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] equity = INITIAL_EQUITY position: Position | None = None pending_entry: str | None = None pending_exit_price: float | None = None breakout: dict[str, object] | None = None for index in range(lookback, len(candles)): candle = candles[index] if pending_exit_price is not None and position is not None: equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=pending_exit_price) position = None pending_exit_price = None if pending_entry is not None and position is None and equity > 0.0: position = _open_position( side=pending_entry, candle=candle, index=index, equity=equity, margin_fraction=margin_fraction, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, max_hold_bars=max_hold_bars, ) pending_entry = None if position is not None: stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price) take_hit = position.take_price is not None and ( (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price) ) held = index - position.entry_index if stop_hit or take_hit or held >= position.max_hold_bars: exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price)) position = None current_equity = _mark_account(equity, position, candle.close) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) if index == len(candles) - 1 or equity <= 0.0: continue if position is not None or not _session_ok(candle, session): continue if breakout is not None: side = str(breakout["side"]) level = float(breakout["level"]) age = index - int(breakout["index"]) if age > 4: breakout = None elif side == "long" and candle.low <= level * (1.0 + pullback_pct) and candle.close > level: pending_entry = "long" breakout = None elif side == "short" and candle.high >= level * (1.0 - pullback_pct) and candle.close < level: pending_entry = "short" breakout = None continue if highs[index] == highs[index] and candle.close > float(highs[index]): breakout = {"side": "long", "level": float(highs[index]), "index": index} elif lows[index] == lows[index] and candle.close < float(lows[index]): breakout = {"side": "short", "level": float(lows[index]), "index": index} return _finalize_result(trades, equity_curve) def run_atr_expansion( candles: list[Candle], *, range_window: int, atr_window: int, atr_quantile_window: int, atr_quantile: float, stop_loss_pct: float, take_profit_pct: float, max_hold_bars: int, margin_fraction: float, session: str, ) -> SegmentResult: if len(candles) <= max(range_window, atr_window, atr_quantile_window): return _empty_result(candles) highs = pd.Series([c.high for c in candles], dtype=float) lows = pd.Series([c.low for c in candles], dtype=float) closes = pd.Series([c.close for c in candles], dtype=float) prev_close = closes.shift(1) true_range = pd.concat([(highs - lows), (highs - prev_close).abs(), (lows - prev_close).abs()], axis=1).max(axis=1) atr = true_range.rolling(atr_window).mean() / closes atr_limit = atr.rolling(atr_quantile_window).quantile(atr_quantile).tolist() atr_values = atr.tolist() range_high = highs.shift(1).rolling(range_window).max().tolist() range_low = lows.shift(1).rolling(range_window).min().tolist() warmup = max(range_window, atr_window, atr_quantile_window) trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] equity = INITIAL_EQUITY position: Position | None = None pending_entry: str | None = None for index in range(warmup, len(candles)): candle = candles[index] if pending_entry is not None and position is None and equity > 0.0: position = _open_position( side=pending_entry, candle=candle, index=index, equity=equity, margin_fraction=margin_fraction, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, max_hold_bars=max_hold_bars, ) pending_entry = None if position is not None: stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price) take_hit = position.take_price is not None and ( (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price) ) held = index - position.entry_index if stop_hit or take_hit or held >= position.max_hold_bars: exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price)) position = None current_equity = _mark_account(equity, position, candle.close) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) if index == len(candles) - 1 or position is not None or not _session_ok(candle, session): continue values = (atr_values[index - 1], atr_limit[index - 1], range_high[index], range_low[index]) if any(value != value for value in values): continue compressed = float(atr_values[index - 1]) <= float(atr_limit[index - 1]) if compressed and candle.close > float(range_high[index]): pending_entry = "long" elif compressed and candle.close < float(range_low[index]): pending_entry = "short" return _finalize_result(trades, equity_curve) def run_false_breakout( candles: list[Candle], *, lookback: int, reclaim_buffer: float, stop_loss_pct: float, take_profit_pct: float, max_hold_bars: int, margin_fraction: float, session: str, ) -> SegmentResult: highs = pd.Series([c.high for c in candles], dtype=float).shift(1).rolling(lookback).max().tolist() lows = pd.Series([c.low for c in candles], dtype=float).shift(1).rolling(lookback).min().tolist() trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] equity = INITIAL_EQUITY position: Position | None = None pending_entry: str | None = None for index in range(lookback, len(candles)): candle = candles[index] if pending_entry is not None and position is None and equity > 0.0: position = _open_position( side=pending_entry, candle=candle, index=index, equity=equity, margin_fraction=margin_fraction, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, max_hold_bars=max_hold_bars, ) pending_entry = None if position is not None: stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price) take_hit = position.take_price is not None and ( (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price) ) held = index - position.entry_index if stop_hit or take_hit or held >= position.max_hold_bars: exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price)) position = None current_equity = _mark_account(equity, position, candle.close) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) if index == len(candles) - 1 or position is not None or not _session_ok(candle, session): continue if highs[index] == highs[index] and candle.high > float(highs[index]) and candle.close < float(highs[index]) * (1.0 - reclaim_buffer): pending_entry = "short" elif lows[index] == lows[index] and candle.low < float(lows[index]) and candle.close > float(lows[index]) * (1.0 + reclaim_buffer): pending_entry = "long" return _finalize_result(trades, equity_curve) def run_vwap_midline_deviation( candles: list[Candle], *, window: int, entry_z: float, exit_z: float, stop_loss_pct: float, max_hold_bars: int, margin_fraction: float, session: str, ) -> SegmentResult: closes = pd.Series([c.close for c in candles], dtype=float) volumes = pd.Series([c.volume for c in candles], dtype=float) highs = pd.Series([c.high for c in candles], dtype=float) lows = pd.Series([c.low for c in candles], dtype=float) typical = (highs + lows + closes) / 3.0 vwap = (typical * volumes).rolling(window).sum() / volumes.rolling(window).sum() deviation = (closes - vwap) / vwap deviation_std = deviation.rolling(window).std(ddof=0) zscore = (deviation / deviation_std).tolist() trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] equity = INITIAL_EQUITY position: Position | None = None pending_entry: str | None = None pending_exit = False warmup = window * 2 for index in range(warmup, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=candle.open) position = None pending_exit = False if pending_entry is not None and position is None and equity > 0.0: position = _open_position( side=pending_entry, candle=candle, index=index, equity=equity, margin_fraction=margin_fraction, stop_loss_pct=stop_loss_pct, take_profit_pct=None, max_hold_bars=max_hold_bars, ) pending_entry = None if position is not None: stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price) held = index - position.entry_index if stop_hit or held >= position.max_hold_bars: exit_price = position.stop_price if stop_hit else candle.close equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price)) position = None current_equity = _mark_account(equity, position, candle.close) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) if index == len(candles) - 1 or not _session_ok(candle, session): continue z = zscore[index] if z != z: continue if position is not None: if (position.side == "long" and z >= -exit_z) or (position.side == "short" and z <= exit_z): pending_exit = True continue if z <= -entry_z: pending_entry = "long" elif z >= entry_z: pending_entry = "short" return _finalize_result(trades, equity_curve) def run_opening_split( candles: list[Candle], *, first_bars: int, confirm_bars: int, min_range_pct: float, stop_loss_pct: float, take_profit_pct: float, max_hold_bars: int, margin_fraction: float, direction: str, ) -> SegmentResult: by_day: dict[str, list[int]] = {} for index, candle in enumerate(candles): day = pd.to_datetime(candle.ts, unit="ms", utc=True).strftime("%Y-%m-%d") by_day.setdefault(day, []).append(index) trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] equity = INITIAL_EQUITY position: Position | None = None pending_by_index: dict[int, str] = {} days = set(by_day) for index, candle in enumerate(candles): day = pd.to_datetime(candle.ts, unit="ms", utc=True).strftime("%Y-%m-%d") if day in days and by_day[day][0] == index and len(by_day[day]) > first_bars + confirm_bars: first = by_day[day][:first_bars] confirm = by_day[day][first_bars : first_bars + confirm_bars] first_high = max(candles[i].high for i in first) first_low = min(candles[i].low for i in first) first_open = candles[first[0]].open range_pct = (first_high - first_low) / first_open if range_pct >= min_range_pct: confirm_close = candles[confirm[-1]].close entry_index = confirm[-1] + 1 if entry_index < len(candles): if direction in ("follow", "both") and confirm_close > first_high: pending_by_index[entry_index] = "long" elif direction in ("fade", "both") and confirm_close < first_low: pending_by_index[entry_index] = "short" side = pending_by_index.pop(index, None) if side is not None and position is None and equity > 0.0: position = _open_position( side=side, candle=candle, index=index, equity=equity, margin_fraction=margin_fraction, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, max_hold_bars=max_hold_bars, ) if position is not None: stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price) take_hit = position.take_price is not None and ( (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price) ) held = index - position.entry_index if stop_hit or take_hit or held >= position.max_hold_bars: exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price)) position = None current_equity = _mark_account(equity, position, candle.close) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) return _finalize_result(trades, equity_curve) def build_variants() -> list[Variant]: variants: list[Variant] = [] for lookback in (48, 96, 192): for pullback_pct in (0.000, 0.0015): for margin_fraction in (0.25, 0.4): for session in ("all", "us", "eu"): params = { "lookback": lookback, "pullback_pct": pullback_pct, "stop_loss_pct": 0.008, "take_profit_pct": 0.014, "max_hold_bars": 32, "margin_fraction": margin_fraction, "session": session, } name = f"range-retest-l{lookback}-pb{pullback_pct:g}-sl0.008-tp0.014-mf{margin_fraction:g}-{session}" variants.append(Variant("range_breakout_retest", name, params, lambda candles, params=params: run_range_retest(candles, **params))) for range_window in (48, 96): for atr_quantile in (0.15, 0.25): for margin_fraction in (0.25, 0.4): for session in ("all", "us"): params = { "range_window": range_window, "atr_window": 48, "atr_quantile_window": 480, "atr_quantile": atr_quantile, "stop_loss_pct": 0.008, "take_profit_pct": 0.016, "max_hold_bars": 32, "margin_fraction": margin_fraction, "session": session, } name = f"atr-compress-expand-r{range_window}-q{atr_quantile:g}-sl0.008-tp0.016-mf{margin_fraction:g}-{session}" variants.append(Variant("atr_compression_expansion", name, params, lambda candles, params=params: run_atr_expansion(candles, **params))) for lookback in (48, 96, 192): for reclaim_buffer in (0.000, 0.001): for margin_fraction in (0.25, 0.4): for session in ("all", "us"): params = { "lookback": lookback, "reclaim_buffer": reclaim_buffer, "stop_loss_pct": 0.007, "take_profit_pct": 0.010, "max_hold_bars": 24, "margin_fraction": margin_fraction, "session": session, } name = f"false-breakout-l{lookback}-rb{reclaim_buffer:g}-sl0.007-tp0.010-mf{margin_fraction:g}-{session}" variants.append(Variant("donchian_false_breakout", name, params, lambda candles, params=params: run_false_breakout(candles, **params))) for window in (96, 192): for entry_z in (2.0, 2.5): for margin_fraction in (0.25, 0.4): for session in ("all", "us"): params = { "window": window, "entry_z": entry_z, "exit_z": 0.25, "stop_loss_pct": 0.008, "max_hold_bars": 48, "margin_fraction": margin_fraction, "session": session, } name = f"vwap-mid-dev-w{window}-z{entry_z:g}-x0.25-sl0.008-mf{margin_fraction:g}-{session}" variants.append(Variant("vwap_midline_deviation", name, params, lambda candles, params=params: run_vwap_midline_deviation(candles, **params))) for first_bars in (1, 2): for confirm_bars in (1, 2): for min_range_pct in (0.003, 0.006): for margin_fraction in (0.25, 0.4): params = { "first_bars": first_bars, "confirm_bars": confirm_bars, "min_range_pct": min_range_pct, "stop_loss_pct": 0.007, "take_profit_pct": 0.012, "max_hold_bars": 24, "margin_fraction": margin_fraction, "direction": "follow", } name = f"opening-split-f{first_bars}-c{confirm_bars}-r{min_range_pct:g}-sl0.007-tp0.012-mf{margin_fraction:g}" variants.append(Variant("opening_first_bars_split", name, params, lambda candles, params=params: run_opening_split(candles, **params))) return variants def cost_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade["cost_weight"]) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) last_ts = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True) if rows[-1]["ts"] < last_ts: rows.append({"ts": last_ts, "equity": equity}) return pd.DataFrame(rows) def equity_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 drawdown = _max_drawdown([float(value) for value in frame["equity"]]) return { "net_total_return": total_return, "net_annualized_return": annualized, "net_max_drawdown": drawdown, "net_calmar": annualized / drawdown if drawdown else 0.0, "risk_reward_ratio": total_return / drawdown if drawdown else 0.0, } def trade_distribution(result: SegmentResult) -> dict[str, float]: wins = [float(trade["return_on_margin_pct"]) for trade in result.trades if float(trade["pnl"]) > 0.0] losses = [float(trade["return_on_margin_pct"]) for trade in result.trades if float(trade["pnl"]) < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = sum(losses) / len(losses) if losses else 0.0 gross_profit = sum(float(trade["pnl"]) for trade in result.trades if float(trade["pnl"]) > 0.0) gross_loss = -sum(float(trade["pnl"]) for trade in result.trades if float(trade["pnl"]) < 0.0) return { "avg_win_on_margin_pct": avg_win, "avg_loss_on_margin_pct": avg_loss, "win_loss_ratio": avg_win / abs(avg_loss) if avg_loss else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, } def horizon_rows(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: if offset is None: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) else: cutoff = end_time - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) start_time = cutoff after = frame[frame["ts"] > cutoff] horizon_frame = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **equity_metrics(horizon_frame, int(start_time.timestamp() * 1000), last_ts), } ) return rows def monthly_rows(frame: pd.DataFrame) -> list[dict[str, object]]: monthly = frame.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna() return [{"month": idx.strftime("%Y-%m"), "monthly_return": float(value)} for idx, value in monthly.items()] def worst_month(frame: pd.DataFrame) -> tuple[str, float]: rows = monthly_rows(frame) if not rows: return "", 0.0 worst = min(rows, key=lambda row: float(row["monthly_return"])) return str(worst["month"]), float(worst["monthly_return"]) def robust_ranking(summary: pd.DataFrame, horizons: pd.DataFrame) -> pd.DataFrame: primary = summary[summary["cost"] == PRIMARY_COST].copy() recent = horizons[(horizons["cost"] == PRIMARY_COST) & (horizons["horizon"] != "all")] pivot = recent.pivot_table( index="name", columns="horizon", values=["net_total_return", "net_max_drawdown", "net_calmar"], aggfunc="first", observed=False, ) pivot.columns = [f"{metric}_{horizon}" for metric, horizon in pivot.columns] ranked = primary.merge(pivot.reset_index(), on="name") ranked["min_recent_return"] = ranked[ ["net_total_return_3y", "net_total_return_1y", "net_total_return_6m", "net_total_return_3m"] ].min(axis=1) ranked["max_recent_drawdown"] = ranked[ ["net_max_drawdown_3y", "net_max_drawdown_1y", "net_max_drawdown_6m", "net_max_drawdown_3m"] ].max(axis=1) ranked["all_recent_positive"] = ranked["min_recent_return"] > 0.0 return ranked.sort_values( [ "all_recent_positive", "min_recent_return", "max_recent_drawdown", "net_calmar", "net_total_return", ], ascending=[False, False, True, False, False], ) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report( *, summary: pd.DataFrame, horizons: pd.DataFrame, monthly: pd.DataFrame, robust: pd.DataFrame, output_files: list[Path], command: str, first_ts: int, last_ts: int, requested_years: float, ) -> str: primary = summary[summary["cost"] == PRIMARY_COST].copy() top = primary.head(10) low_dd = primary[(primary["net_max_drawdown"] <= 0.35) & (primary["net_total_return"] > 0.0)].head(10) horizon_top = ( horizons[(horizons["cost"] == PRIMARY_COST) & (horizons["horizon"] != "all")] .sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) .groupby("horizon", observed=True) .head(3) ) family = ( primary.groupby("family", as_index=False) .agg( best_calmar=("net_calmar", "max"), best_annualized=("net_annualized_return", "max"), best_drawdown=("net_max_drawdown", "min"), candidates=("name", "count"), ) .sort_values(["best_calmar", "best_annualized"], ascending=False) ) best_name = str(primary.iloc[0]["name"]) if len(primary) else "" best_monthly = monthly[(monthly["cost"] == PRIMARY_COST) & (monthly["name"] == best_name)].sort_values("monthly_return").head(12) robust_top = robust.head(10) lines = [ "# ETH microstructure non-RSI exploration", "", f"Run command: `{command}`", f"Requested years: {requested_years:g}", f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.", "", "No order placement or exchange API path is used; this script only reads local candle CSV files.", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Primary ranking: maker_taker by net Calmar, annualized return, total return, then lower drawdown.", "", "Top 10 maker_taker:", markdown_table( top[ [ "family", "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "win_loss_ratio", "profit_factor", "risk_reward_ratio", "worst_month", "worst_month_return", ] ] ), "", "Family leaders:", markdown_table(family), "", "Recent horizon leaders:", markdown_table( horizon_top[ [ "horizon", "family", "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "risk_reward_ratio", ] ] ), "", "Robust ranking by recent survival:", markdown_table( robust_top[ [ "all_recent_positive", "family", "name", "trades", "net_total_return", "net_max_drawdown", "min_recent_return", "max_recent_drawdown", "net_total_return_3y", "net_total_return_1y", "net_total_return_6m", "net_total_return_3m", ] ] ), "", "Low drawdown positive candidates:", markdown_table( low_dd[ [ "family", "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "worst_month_return", ] ] ) if len(low_dd) else "No maker_taker candidate met net_max_drawdown <= 0.35 with positive total return.", "", f"Worst months for best candidate `{best_name}`:", markdown_table(best_monthly[["month", "monthly_return"]]) if len(best_monthly) else "No monthly rows.", ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() candles = _load_candles(SYMBOL, args.bar) requested_bars = int(args.years * 365 * 24 * 60 / 15) candles = candles[-requested_bars:] variants = build_variants() summary_rows: list[dict[str, object]] = [] horizon_output_rows: list[dict[str, object]] = [] monthly_output_rows: list[dict[str, object]] = [] for index, variant in enumerate(variants, start=1): result = variant.run(candles) distribution = trade_distribution(result) for cost_name, cost in COSTS: frame = cost_equity_frame(result, cost) month, month_return = worst_month(frame) base = { "family": variant.family, "cost": cost_name, "symbol": SYMBOL, "bar": args.bar, "name": variant.name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "years": (candles[-1].ts - candles[0].ts) / 86_400_000 / 365, "trades": result.trade_count, "win_rate": result.win_rate, "gross_total_return": result.gross_total_return, "gross_max_drawdown_mark_to_market": result.gross_max_drawdown, "worst_month": month, "worst_month_return": month_return, **variant.params, **distribution, } summary_rows.append({**base, **equity_metrics(frame, candles[0].ts, candles[-1].ts)}) for row in horizon_rows(frame, candles[-1].ts): horizon_output_rows.append({**base, **row}) for row in monthly_rows(frame): monthly_output_rows.append({**base, **row}) print(f"done {index}/{len(variants)} {variant.family} {variant.name} trades={result.trade_count}", flush=True) summary = pd.DataFrame(summary_rows).sort_values( ["cost", "net_calmar", "net_annualized_return", "net_total_return", "net_max_drawdown"], ascending=[True, False, False, False, True], ) primary = summary[summary["cost"] == PRIMARY_COST] summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True) horizons = pd.DataFrame(horizon_output_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizons = horizons.sort_values(["cost", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) monthly = pd.DataFrame(monthly_output_rows).sort_values(["cost", "name", "month"]) robust = robust_ranking(summary, horizons) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / f"{PREFIX}-summary.csv" horizons_path = args.output_dir / f"{PREFIX}-horizons.csv" top10_path = args.output_dir / f"{PREFIX}-top10.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly.csv" robust_path = args.output_dir / f"{PREFIX}-robust.csv" json_path = args.output_dir / f"{PREFIX}-best.json" report_path = args.output_dir / f"{PREFIX}-report.md" output_files = [summary_path, horizons_path, top10_path, monthly_path, robust_path, json_path, report_path] summary.to_csv(summary_path, index=False) horizons.to_csv(horizons_path, index=False) primary.head(10).to_csv(top10_path, index=False) monthly.to_csv(monthly_path, index=False) robust.to_csv(robust_path, index=False) json_path.write_text(json.dumps(primary.head(10).to_dict("records"), indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years:g}" report_path.write_text( write_report( summary=summary, horizons=horizons, monthly=monthly, robust=robust, output_files=output_files, command=command, first_ts=candles[0].ts, last_ts=candles[-1].ts, requested_years=args.years, ), encoding="utf-8", ) print(primary.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())