| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Callable
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- SYMBOL = "ETH-USDT-SWAP"
- BAR = "15m"
- YEARS = 10.0
- LEVERAGE = 3
- INITIAL_EQUITY = 10_000.0
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/eth-exploration")
- PREFIX = "eth-microstructure"
- PRIMARY_COST = "maker_taker"
- COSTS = (
- ("maker_maker", 0.0012),
- ("maker_taker", 0.0021),
- ("taker_taker", 0.0030),
- )
- HORIZONS = (
- ("all", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Position:
- side: str
- entry_time: int
- entry_index: int
- entry_price: float
- account_at_entry: float
- margin_used: float
- stop_price: float
- take_price: float | None
- max_hold_bars: int
- @dataclass(frozen=True)
- class SegmentResult:
- trade_count: int
- win_rate: float
- gross_total_return: float
- gross_max_drawdown: float
- trades: list[dict[str, object]]
- equity_curve: list[dict[str, float | int]]
- @dataclass(frozen=True)
- class Variant:
- family: str
- name: str
- params: dict[str, object]
- run: Callable[[list[Candle]], SegmentResult]
- def _format_ts(ts: int) -> str:
- return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M")
- def _load_candles(symbol: str, bar: str) -> list[Candle]:
- frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv")
- return [
- Candle(
- symbol=symbol,
- ts=int(row.ts),
- open=float(row.open),
- high=float(row.high),
- low=float(row.low),
- close=float(row.close),
- volume=float(row.volume),
- )
- for row in frame.itertuples(index=False)
- ]
- def _trade_equity(side: str, margin_used: float, entry_price: float, exit_price: float) -> float:
- if side == "long":
- price_return = (exit_price - entry_price) / entry_price
- else:
- price_return = (entry_price - exit_price) / entry_price
- return margin_used + margin_used * LEVERAGE * price_return
- def _mark_account(equity: float, position: Position | None, mark_price: float) -> float:
- if position is None:
- return equity
- marked = _trade_equity(position.side, position.margin_used, position.entry_price, mark_price)
- return equity - position.margin_used + marked
- def _max_drawdown(values: list[float]) -> float:
- peak = values[0]
- drawdown = 0.0
- for value in values:
- peak = max(peak, value)
- drawdown = max(drawdown, (peak - value) / peak if peak else 0.0)
- return drawdown
- def _close_position(
- *,
- trades: list[dict[str, object]],
- position: Position,
- candle: Candle,
- exit_price: float,
- ) -> tuple[float, bool]:
- exit_margin_equity = _trade_equity(position.side, position.margin_used, position.entry_price, exit_price)
- account_equity = position.account_at_entry - position.margin_used + exit_margin_equity
- pnl = exit_margin_equity - position.margin_used
- trades.append(
- {
- "side": "Long" if position.side == "long" else "Short",
- "entry_time": _format_ts(position.entry_time),
- "exit_time": _format_ts(candle.ts),
- "entry_price": round(position.entry_price, 4),
- "exit_price": round(exit_price, 4),
- "pnl": pnl,
- "return_pct": pnl / position.account_at_entry * 100.0,
- "return_on_margin_pct": pnl / position.margin_used * 100.0,
- "cost_weight": position.margin_used / position.account_at_entry,
- "hold_bars": int((candle.ts - position.entry_time) / 900_000),
- }
- )
- return account_equity, pnl > 0.0
- def _open_position(
- *,
- side: str,
- candle: Candle,
- index: int,
- equity: float,
- margin_fraction: float,
- stop_loss_pct: float,
- take_profit_pct: float | None,
- max_hold_bars: int,
- ) -> Position:
- margin_used = equity * margin_fraction
- return Position(
- side=side,
- entry_time=candle.ts,
- entry_index=index,
- entry_price=candle.open,
- account_at_entry=equity,
- margin_used=margin_used,
- stop_price=candle.open * (1.0 - stop_loss_pct if side == "long" else 1.0 + stop_loss_pct),
- take_price=None if take_profit_pct is None else candle.open * (1.0 + take_profit_pct if side == "long" else 1.0 - take_profit_pct),
- max_hold_bars=max_hold_bars,
- )
- def _empty_result(candles: list[Candle]) -> SegmentResult:
- return SegmentResult(0, 0.0, 0.0, 0.0, [], [{"ts": candles[0].ts, "equity": INITIAL_EQUITY, "close": candles[0].close}])
- def _finalize_result(trades: list[dict[str, object]], equity_curve: list[dict[str, float | int]]) -> SegmentResult:
- wins = sum(1 for trade in trades if float(trade["pnl"]) > 0.0)
- values = [float(point["equity"]) for point in equity_curve]
- return SegmentResult(
- trade_count=len(trades),
- win_rate=wins / len(trades) if trades else 0.0,
- gross_total_return=values[-1] / INITIAL_EQUITY - 1.0,
- gross_max_drawdown=_max_drawdown(values),
- trades=trades,
- equity_curve=equity_curve,
- )
- def _session_ok(candle: Candle, session: str) -> bool:
- hour = pd.to_datetime(candle.ts, unit="ms", utc=True).hour
- if session == "all":
- return True
- if session == "us":
- return 13 <= hour < 22
- if session == "asia":
- return 0 <= hour < 8
- return 8 <= hour < 16
- def run_range_retest(
- candles: list[Candle],
- *,
- lookback: int,
- pullback_pct: float,
- stop_loss_pct: float,
- take_profit_pct: float,
- max_hold_bars: int,
- margin_fraction: float,
- session: str,
- ) -> SegmentResult:
- if len(candles) <= lookback + max_hold_bars:
- return _empty_result(candles)
- highs = pd.Series([c.high for c in candles], dtype=float).shift(1).rolling(lookback).max().tolist()
- lows = pd.Series([c.low for c in candles], dtype=float).shift(1).rolling(lookback).min().tolist()
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- equity = INITIAL_EQUITY
- position: Position | None = None
- pending_entry: str | None = None
- pending_exit_price: float | None = None
- breakout: dict[str, object] | None = None
- for index in range(lookback, len(candles)):
- candle = candles[index]
- if pending_exit_price is not None and position is not None:
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=pending_exit_price)
- position = None
- pending_exit_price = None
- if pending_entry is not None and position is None and equity > 0.0:
- position = _open_position(
- side=pending_entry,
- candle=candle,
- index=index,
- equity=equity,
- margin_fraction=margin_fraction,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=take_profit_pct,
- max_hold_bars=max_hold_bars,
- )
- pending_entry = None
- if position is not None:
- stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price)
- take_hit = position.take_price is not None and (
- (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price)
- )
- held = index - position.entry_index
- if stop_hit or take_hit or held >= position.max_hold_bars:
- exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price))
- position = None
- current_equity = _mark_account(equity, position, candle.close)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- if index == len(candles) - 1 or equity <= 0.0:
- continue
- if position is not None or not _session_ok(candle, session):
- continue
- if breakout is not None:
- side = str(breakout["side"])
- level = float(breakout["level"])
- age = index - int(breakout["index"])
- if age > 4:
- breakout = None
- elif side == "long" and candle.low <= level * (1.0 + pullback_pct) and candle.close > level:
- pending_entry = "long"
- breakout = None
- elif side == "short" and candle.high >= level * (1.0 - pullback_pct) and candle.close < level:
- pending_entry = "short"
- breakout = None
- continue
- if highs[index] == highs[index] and candle.close > float(highs[index]):
- breakout = {"side": "long", "level": float(highs[index]), "index": index}
- elif lows[index] == lows[index] and candle.close < float(lows[index]):
- breakout = {"side": "short", "level": float(lows[index]), "index": index}
- return _finalize_result(trades, equity_curve)
- def run_atr_expansion(
- candles: list[Candle],
- *,
- range_window: int,
- atr_window: int,
- atr_quantile_window: int,
- atr_quantile: float,
- stop_loss_pct: float,
- take_profit_pct: float,
- max_hold_bars: int,
- margin_fraction: float,
- session: str,
- ) -> SegmentResult:
- if len(candles) <= max(range_window, atr_window, atr_quantile_window):
- return _empty_result(candles)
- highs = pd.Series([c.high for c in candles], dtype=float)
- lows = pd.Series([c.low for c in candles], dtype=float)
- closes = pd.Series([c.close for c in candles], dtype=float)
- prev_close = closes.shift(1)
- true_range = pd.concat([(highs - lows), (highs - prev_close).abs(), (lows - prev_close).abs()], axis=1).max(axis=1)
- atr = true_range.rolling(atr_window).mean() / closes
- atr_limit = atr.rolling(atr_quantile_window).quantile(atr_quantile).tolist()
- atr_values = atr.tolist()
- range_high = highs.shift(1).rolling(range_window).max().tolist()
- range_low = lows.shift(1).rolling(range_window).min().tolist()
- warmup = max(range_window, atr_window, atr_quantile_window)
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- equity = INITIAL_EQUITY
- position: Position | None = None
- pending_entry: str | None = None
- for index in range(warmup, len(candles)):
- candle = candles[index]
- if pending_entry is not None and position is None and equity > 0.0:
- position = _open_position(
- side=pending_entry,
- candle=candle,
- index=index,
- equity=equity,
- margin_fraction=margin_fraction,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=take_profit_pct,
- max_hold_bars=max_hold_bars,
- )
- pending_entry = None
- if position is not None:
- stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price)
- take_hit = position.take_price is not None and (
- (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price)
- )
- held = index - position.entry_index
- if stop_hit or take_hit or held >= position.max_hold_bars:
- exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price))
- position = None
- current_equity = _mark_account(equity, position, candle.close)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- if index == len(candles) - 1 or position is not None or not _session_ok(candle, session):
- continue
- values = (atr_values[index - 1], atr_limit[index - 1], range_high[index], range_low[index])
- if any(value != value for value in values):
- continue
- compressed = float(atr_values[index - 1]) <= float(atr_limit[index - 1])
- if compressed and candle.close > float(range_high[index]):
- pending_entry = "long"
- elif compressed and candle.close < float(range_low[index]):
- pending_entry = "short"
- return _finalize_result(trades, equity_curve)
- def run_false_breakout(
- candles: list[Candle],
- *,
- lookback: int,
- reclaim_buffer: float,
- stop_loss_pct: float,
- take_profit_pct: float,
- max_hold_bars: int,
- margin_fraction: float,
- session: str,
- ) -> SegmentResult:
- highs = pd.Series([c.high for c in candles], dtype=float).shift(1).rolling(lookback).max().tolist()
- lows = pd.Series([c.low for c in candles], dtype=float).shift(1).rolling(lookback).min().tolist()
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- equity = INITIAL_EQUITY
- position: Position | None = None
- pending_entry: str | None = None
- for index in range(lookback, len(candles)):
- candle = candles[index]
- if pending_entry is not None and position is None and equity > 0.0:
- position = _open_position(
- side=pending_entry,
- candle=candle,
- index=index,
- equity=equity,
- margin_fraction=margin_fraction,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=take_profit_pct,
- max_hold_bars=max_hold_bars,
- )
- pending_entry = None
- if position is not None:
- stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price)
- take_hit = position.take_price is not None and (
- (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price)
- )
- held = index - position.entry_index
- if stop_hit or take_hit or held >= position.max_hold_bars:
- exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price))
- position = None
- current_equity = _mark_account(equity, position, candle.close)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- if index == len(candles) - 1 or position is not None or not _session_ok(candle, session):
- continue
- if highs[index] == highs[index] and candle.high > float(highs[index]) and candle.close < float(highs[index]) * (1.0 - reclaim_buffer):
- pending_entry = "short"
- elif lows[index] == lows[index] and candle.low < float(lows[index]) and candle.close > float(lows[index]) * (1.0 + reclaim_buffer):
- pending_entry = "long"
- return _finalize_result(trades, equity_curve)
- def run_vwap_midline_deviation(
- candles: list[Candle],
- *,
- window: int,
- entry_z: float,
- exit_z: float,
- stop_loss_pct: float,
- max_hold_bars: int,
- margin_fraction: float,
- session: str,
- ) -> SegmentResult:
- closes = pd.Series([c.close for c in candles], dtype=float)
- volumes = pd.Series([c.volume for c in candles], dtype=float)
- highs = pd.Series([c.high for c in candles], dtype=float)
- lows = pd.Series([c.low for c in candles], dtype=float)
- typical = (highs + lows + closes) / 3.0
- vwap = (typical * volumes).rolling(window).sum() / volumes.rolling(window).sum()
- deviation = (closes - vwap) / vwap
- deviation_std = deviation.rolling(window).std(ddof=0)
- zscore = (deviation / deviation_std).tolist()
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- equity = INITIAL_EQUITY
- position: Position | None = None
- pending_entry: str | None = None
- pending_exit = False
- warmup = window * 2
- for index in range(warmup, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=candle.open)
- position = None
- pending_exit = False
- if pending_entry is not None and position is None and equity > 0.0:
- position = _open_position(
- side=pending_entry,
- candle=candle,
- index=index,
- equity=equity,
- margin_fraction=margin_fraction,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=None,
- max_hold_bars=max_hold_bars,
- )
- pending_entry = None
- if position is not None:
- stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price)
- held = index - position.entry_index
- if stop_hit or held >= position.max_hold_bars:
- exit_price = position.stop_price if stop_hit else candle.close
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price))
- position = None
- current_equity = _mark_account(equity, position, candle.close)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- if index == len(candles) - 1 or not _session_ok(candle, session):
- continue
- z = zscore[index]
- if z != z:
- continue
- if position is not None:
- if (position.side == "long" and z >= -exit_z) or (position.side == "short" and z <= exit_z):
- pending_exit = True
- continue
- if z <= -entry_z:
- pending_entry = "long"
- elif z >= entry_z:
- pending_entry = "short"
- return _finalize_result(trades, equity_curve)
- def run_opening_split(
- candles: list[Candle],
- *,
- first_bars: int,
- confirm_bars: int,
- min_range_pct: float,
- stop_loss_pct: float,
- take_profit_pct: float,
- max_hold_bars: int,
- margin_fraction: float,
- direction: str,
- ) -> SegmentResult:
- by_day: dict[str, list[int]] = {}
- for index, candle in enumerate(candles):
- day = pd.to_datetime(candle.ts, unit="ms", utc=True).strftime("%Y-%m-%d")
- by_day.setdefault(day, []).append(index)
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- equity = INITIAL_EQUITY
- position: Position | None = None
- pending_by_index: dict[int, str] = {}
- days = set(by_day)
- for index, candle in enumerate(candles):
- day = pd.to_datetime(candle.ts, unit="ms", utc=True).strftime("%Y-%m-%d")
- if day in days and by_day[day][0] == index and len(by_day[day]) > first_bars + confirm_bars:
- first = by_day[day][:first_bars]
- confirm = by_day[day][first_bars : first_bars + confirm_bars]
- first_high = max(candles[i].high for i in first)
- first_low = min(candles[i].low for i in first)
- first_open = candles[first[0]].open
- range_pct = (first_high - first_low) / first_open
- if range_pct >= min_range_pct:
- confirm_close = candles[confirm[-1]].close
- entry_index = confirm[-1] + 1
- if entry_index < len(candles):
- if direction in ("follow", "both") and confirm_close > first_high:
- pending_by_index[entry_index] = "long"
- elif direction in ("fade", "both") and confirm_close < first_low:
- pending_by_index[entry_index] = "short"
- side = pending_by_index.pop(index, None)
- if side is not None and position is None and equity > 0.0:
- position = _open_position(
- side=side,
- candle=candle,
- index=index,
- equity=equity,
- margin_fraction=margin_fraction,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=take_profit_pct,
- max_hold_bars=max_hold_bars,
- )
- if position is not None:
- stop_hit = (position.side == "long" and candle.low <= position.stop_price) or (position.side == "short" and candle.high >= position.stop_price)
- take_hit = position.take_price is not None and (
- (position.side == "long" and candle.high >= position.take_price) or (position.side == "short" and candle.low <= position.take_price)
- )
- held = index - position.entry_index
- if stop_hit or take_hit or held >= position.max_hold_bars:
- exit_price = position.stop_price if stop_hit else position.take_price if take_hit else candle.close
- equity, _ = _close_position(trades=trades, position=position, candle=candle, exit_price=float(exit_price))
- position = None
- current_equity = _mark_account(equity, position, candle.close)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- return _finalize_result(trades, equity_curve)
- def build_variants() -> list[Variant]:
- variants: list[Variant] = []
- for lookback in (48, 96, 192):
- for pullback_pct in (0.000, 0.0015):
- for margin_fraction in (0.25, 0.4):
- for session in ("all", "us", "eu"):
- params = {
- "lookback": lookback,
- "pullback_pct": pullback_pct,
- "stop_loss_pct": 0.008,
- "take_profit_pct": 0.014,
- "max_hold_bars": 32,
- "margin_fraction": margin_fraction,
- "session": session,
- }
- name = f"range-retest-l{lookback}-pb{pullback_pct:g}-sl0.008-tp0.014-mf{margin_fraction:g}-{session}"
- variants.append(Variant("range_breakout_retest", name, params, lambda candles, params=params: run_range_retest(candles, **params)))
- for range_window in (48, 96):
- for atr_quantile in (0.15, 0.25):
- for margin_fraction in (0.25, 0.4):
- for session in ("all", "us"):
- params = {
- "range_window": range_window,
- "atr_window": 48,
- "atr_quantile_window": 480,
- "atr_quantile": atr_quantile,
- "stop_loss_pct": 0.008,
- "take_profit_pct": 0.016,
- "max_hold_bars": 32,
- "margin_fraction": margin_fraction,
- "session": session,
- }
- name = f"atr-compress-expand-r{range_window}-q{atr_quantile:g}-sl0.008-tp0.016-mf{margin_fraction:g}-{session}"
- variants.append(Variant("atr_compression_expansion", name, params, lambda candles, params=params: run_atr_expansion(candles, **params)))
- for lookback in (48, 96, 192):
- for reclaim_buffer in (0.000, 0.001):
- for margin_fraction in (0.25, 0.4):
- for session in ("all", "us"):
- params = {
- "lookback": lookback,
- "reclaim_buffer": reclaim_buffer,
- "stop_loss_pct": 0.007,
- "take_profit_pct": 0.010,
- "max_hold_bars": 24,
- "margin_fraction": margin_fraction,
- "session": session,
- }
- name = f"false-breakout-l{lookback}-rb{reclaim_buffer:g}-sl0.007-tp0.010-mf{margin_fraction:g}-{session}"
- variants.append(Variant("donchian_false_breakout", name, params, lambda candles, params=params: run_false_breakout(candles, **params)))
- for window in (96, 192):
- for entry_z in (2.0, 2.5):
- for margin_fraction in (0.25, 0.4):
- for session in ("all", "us"):
- params = {
- "window": window,
- "entry_z": entry_z,
- "exit_z": 0.25,
- "stop_loss_pct": 0.008,
- "max_hold_bars": 48,
- "margin_fraction": margin_fraction,
- "session": session,
- }
- name = f"vwap-mid-dev-w{window}-z{entry_z:g}-x0.25-sl0.008-mf{margin_fraction:g}-{session}"
- variants.append(Variant("vwap_midline_deviation", name, params, lambda candles, params=params: run_vwap_midline_deviation(candles, **params)))
- for first_bars in (1, 2):
- for confirm_bars in (1, 2):
- for min_range_pct in (0.003, 0.006):
- for margin_fraction in (0.25, 0.4):
- params = {
- "first_bars": first_bars,
- "confirm_bars": confirm_bars,
- "min_range_pct": min_range_pct,
- "stop_loss_pct": 0.007,
- "take_profit_pct": 0.012,
- "max_hold_bars": 24,
- "margin_fraction": margin_fraction,
- "direction": "follow",
- }
- name = f"opening-split-f{first_bars}-c{confirm_bars}-r{min_range_pct:g}-sl0.007-tp0.012-mf{margin_fraction:g}"
- variants.append(Variant("opening_first_bars_split", name, params, lambda candles, params=params: run_opening_split(candles, **params)))
- return variants
- def cost_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame:
- rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}]
- equity = INITIAL_EQUITY
- for trade in result.trades:
- equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade["cost_weight"])
- rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity})
- last_ts = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True)
- if rows[-1]["ts"] < last_ts:
- rows.append({"ts": last_ts, "equity": equity})
- return pd.DataFrame(rows)
- def equity_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]:
- years = (last_ts - first_ts) / 86_400_000 / 365
- total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0)
- annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0
- drawdown = _max_drawdown([float(value) for value in frame["equity"]])
- return {
- "net_total_return": total_return,
- "net_annualized_return": annualized,
- "net_max_drawdown": drawdown,
- "net_calmar": annualized / drawdown if drawdown else 0.0,
- "risk_reward_ratio": total_return / drawdown if drawdown else 0.0,
- }
- def trade_distribution(result: SegmentResult) -> dict[str, float]:
- wins = [float(trade["return_on_margin_pct"]) for trade in result.trades if float(trade["pnl"]) > 0.0]
- losses = [float(trade["return_on_margin_pct"]) for trade in result.trades if float(trade["pnl"]) < 0.0]
- avg_win = sum(wins) / len(wins) if wins else 0.0
- avg_loss = sum(losses) / len(losses) if losses else 0.0
- gross_profit = sum(float(trade["pnl"]) for trade in result.trades if float(trade["pnl"]) > 0.0)
- gross_loss = -sum(float(trade["pnl"]) for trade in result.trades if float(trade["pnl"]) < 0.0)
- return {
- "avg_win_on_margin_pct": avg_win,
- "avg_loss_on_margin_pct": avg_loss,
- "win_loss_ratio": avg_win / abs(avg_loss) if avg_loss else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- }
- def horizon_rows(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- end_time = pd.to_datetime(last_ts, unit="ms", utc=True)
- for label, offset in HORIZONS:
- if offset is None:
- horizon_frame = frame[["ts", "equity"]].copy()
- start_time = pd.Timestamp(horizon_frame["ts"].iloc[0])
- else:
- cutoff = end_time - offset
- before = frame[frame["ts"] <= cutoff]
- if len(before):
- start_equity = float(before["equity"].iloc[-1])
- start_time = cutoff
- after = frame[frame["ts"] > cutoff]
- horizon_frame = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True)
- else:
- horizon_frame = frame[["ts", "equity"]].copy()
- start_time = pd.Timestamp(horizon_frame["ts"].iloc[0])
- rows.append(
- {
- "horizon": label,
- "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"),
- "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"),
- **equity_metrics(horizon_frame, int(start_time.timestamp() * 1000), last_ts),
- }
- )
- return rows
- def monthly_rows(frame: pd.DataFrame) -> list[dict[str, object]]:
- monthly = frame.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna()
- return [{"month": idx.strftime("%Y-%m"), "monthly_return": float(value)} for idx, value in monthly.items()]
- def worst_month(frame: pd.DataFrame) -> tuple[str, float]:
- rows = monthly_rows(frame)
- if not rows:
- return "", 0.0
- worst = min(rows, key=lambda row: float(row["monthly_return"]))
- return str(worst["month"]), float(worst["monthly_return"])
- def robust_ranking(summary: pd.DataFrame, horizons: pd.DataFrame) -> pd.DataFrame:
- primary = summary[summary["cost"] == PRIMARY_COST].copy()
- recent = horizons[(horizons["cost"] == PRIMARY_COST) & (horizons["horizon"] != "all")]
- pivot = recent.pivot_table(
- index="name",
- columns="horizon",
- values=["net_total_return", "net_max_drawdown", "net_calmar"],
- aggfunc="first",
- observed=False,
- )
- pivot.columns = [f"{metric}_{horizon}" for metric, horizon in pivot.columns]
- ranked = primary.merge(pivot.reset_index(), on="name")
- ranked["min_recent_return"] = ranked[
- ["net_total_return_3y", "net_total_return_1y", "net_total_return_6m", "net_total_return_3m"]
- ].min(axis=1)
- ranked["max_recent_drawdown"] = ranked[
- ["net_max_drawdown_3y", "net_max_drawdown_1y", "net_max_drawdown_6m", "net_max_drawdown_3m"]
- ].max(axis=1)
- ranked["all_recent_positive"] = ranked["min_recent_return"] > 0.0
- return ranked.sort_values(
- [
- "all_recent_positive",
- "min_recent_return",
- "max_recent_drawdown",
- "net_calmar",
- "net_total_return",
- ],
- ascending=[False, False, True, False, False],
- )
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_table(frame: pd.DataFrame) -> str:
- columns = list(frame.columns)
- rows = [columns, ["---" for _ in columns]]
- for record in frame.to_dict("records"):
- rows.append([record[column] for column in columns])
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def write_report(
- *,
- summary: pd.DataFrame,
- horizons: pd.DataFrame,
- monthly: pd.DataFrame,
- robust: pd.DataFrame,
- output_files: list[Path],
- command: str,
- first_ts: int,
- last_ts: int,
- requested_years: float,
- ) -> str:
- primary = summary[summary["cost"] == PRIMARY_COST].copy()
- top = primary.head(10)
- low_dd = primary[(primary["net_max_drawdown"] <= 0.35) & (primary["net_total_return"] > 0.0)].head(10)
- horizon_top = (
- horizons[(horizons["cost"] == PRIMARY_COST) & (horizons["horizon"] != "all")]
- .sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False])
- .groupby("horizon", observed=True)
- .head(3)
- )
- family = (
- primary.groupby("family", as_index=False)
- .agg(
- best_calmar=("net_calmar", "max"),
- best_annualized=("net_annualized_return", "max"),
- best_drawdown=("net_max_drawdown", "min"),
- candidates=("name", "count"),
- )
- .sort_values(["best_calmar", "best_annualized"], ascending=False)
- )
- best_name = str(primary.iloc[0]["name"]) if len(primary) else ""
- best_monthly = monthly[(monthly["cost"] == PRIMARY_COST) & (monthly["name"] == best_name)].sort_values("monthly_return").head(12)
- robust_top = robust.head(10)
- lines = [
- "# ETH microstructure non-RSI exploration",
- "",
- f"Run command: `{command}`",
- f"Requested years: {requested_years:g}",
- f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.",
- "",
- "No order placement or exchange API path is used; this script only reads local candle CSV files.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in output_files],
- "",
- "Primary ranking: maker_taker by net Calmar, annualized return, total return, then lower drawdown.",
- "",
- "Top 10 maker_taker:",
- markdown_table(
- top[
- [
- "family",
- "name",
- "trades",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "win_rate",
- "win_loss_ratio",
- "profit_factor",
- "risk_reward_ratio",
- "worst_month",
- "worst_month_return",
- ]
- ]
- ),
- "",
- "Family leaders:",
- markdown_table(family),
- "",
- "Recent horizon leaders:",
- markdown_table(
- horizon_top[
- [
- "horizon",
- "family",
- "name",
- "trades",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "risk_reward_ratio",
- ]
- ]
- ),
- "",
- "Robust ranking by recent survival:",
- markdown_table(
- robust_top[
- [
- "all_recent_positive",
- "family",
- "name",
- "trades",
- "net_total_return",
- "net_max_drawdown",
- "min_recent_return",
- "max_recent_drawdown",
- "net_total_return_3y",
- "net_total_return_1y",
- "net_total_return_6m",
- "net_total_return_3m",
- ]
- ]
- ),
- "",
- "Low drawdown positive candidates:",
- markdown_table(
- low_dd[
- [
- "family",
- "name",
- "trades",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "worst_month_return",
- ]
- ]
- )
- if len(low_dd)
- else "No maker_taker candidate met net_max_drawdown <= 0.35 with positive total return.",
- "",
- f"Worst months for best candidate `{best_name}`:",
- markdown_table(best_monthly[["month", "monthly_return"]]) if len(best_monthly) else "No monthly rows.",
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bar", default=BAR)
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- candles = _load_candles(SYMBOL, args.bar)
- requested_bars = int(args.years * 365 * 24 * 60 / 15)
- candles = candles[-requested_bars:]
- variants = build_variants()
- summary_rows: list[dict[str, object]] = []
- horizon_output_rows: list[dict[str, object]] = []
- monthly_output_rows: list[dict[str, object]] = []
- for index, variant in enumerate(variants, start=1):
- result = variant.run(candles)
- distribution = trade_distribution(result)
- for cost_name, cost in COSTS:
- frame = cost_equity_frame(result, cost)
- month, month_return = worst_month(frame)
- base = {
- "family": variant.family,
- "cost": cost_name,
- "symbol": SYMBOL,
- "bar": args.bar,
- "name": variant.name,
- "first_candle": _format_ts(candles[0].ts),
- "last_candle": _format_ts(candles[-1].ts),
- "years": (candles[-1].ts - candles[0].ts) / 86_400_000 / 365,
- "trades": result.trade_count,
- "win_rate": result.win_rate,
- "gross_total_return": result.gross_total_return,
- "gross_max_drawdown_mark_to_market": result.gross_max_drawdown,
- "worst_month": month,
- "worst_month_return": month_return,
- **variant.params,
- **distribution,
- }
- summary_rows.append({**base, **equity_metrics(frame, candles[0].ts, candles[-1].ts)})
- for row in horizon_rows(frame, candles[-1].ts):
- horizon_output_rows.append({**base, **row})
- for row in monthly_rows(frame):
- monthly_output_rows.append({**base, **row})
- print(f"done {index}/{len(variants)} {variant.family} {variant.name} trades={result.trade_count}", flush=True)
- summary = pd.DataFrame(summary_rows).sort_values(
- ["cost", "net_calmar", "net_annualized_return", "net_total_return", "net_max_drawdown"],
- ascending=[True, False, False, False, True],
- )
- primary = summary[summary["cost"] == PRIMARY_COST]
- summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True)
- horizons = pd.DataFrame(horizon_output_rows)
- horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True)
- horizons = horizons.sort_values(["cost", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False])
- monthly = pd.DataFrame(monthly_output_rows).sort_values(["cost", "name", "month"])
- robust = robust_ranking(summary, horizons)
- args.output_dir.mkdir(parents=True, exist_ok=True)
- summary_path = args.output_dir / f"{PREFIX}-summary.csv"
- horizons_path = args.output_dir / f"{PREFIX}-horizons.csv"
- top10_path = args.output_dir / f"{PREFIX}-top10.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly.csv"
- robust_path = args.output_dir / f"{PREFIX}-robust.csv"
- json_path = args.output_dir / f"{PREFIX}-best.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- output_files = [summary_path, horizons_path, top10_path, monthly_path, robust_path, json_path, report_path]
- summary.to_csv(summary_path, index=False)
- horizons.to_csv(horizons_path, index=False)
- primary.head(10).to_csv(top10_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- robust.to_csv(robust_path, index=False)
- json_path.write_text(json.dumps(primary.head(10).to_dict("records"), indent=2), encoding="utf-8")
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years:g}"
- report_path.write_text(
- write_report(
- summary=summary,
- horizons=horizons,
- monthly=monthly,
- robust=robust,
- output_files=output_files,
- command=command,
- first_ts=candles[0].ts,
- last_ts=candles[-1].ts,
- requested_years=args.years,
- ),
- encoding="utf-8",
- )
- print(primary.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|