from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore from scripts.search_eth_btc_nextgen_variants import markdown_table, metrics_from_daily_equity, monthly_rows from scripts.search_expansion_mean_reversion import ( ROUNDTRIP_TAKER_COST_ON_MARGIN, daily_equity, horizon_rows, load_base_candles, resample_candles, trade_stats_for_window, true_range, ) OUTPUT_DIR = Path("reports/strategy-expansion") PREFIX = "mean-reversion-regime" YEARS = 10.0 LEVERAGE = 3 @dataclass(frozen=True) class BaseParams: rsi_length: int = 2 rsi_entry: float = 8.0 rsi_exit: float = 55.0 bb_length: int = 20 vol_lookback: int = 24 vol_quantile_lookback: int = 240 vol_quantile: float = 0.75 btc_trend_sma: int = 200 btc_momentum_lookback: int = 12 max_hold_bars: int = 24 stop_loss_pct: float = 0.045 cooldown_bars: int = 6 @dataclass(frozen=True) class Regime: name: str category: str rule: str def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, ) -> tuple[float, bool, float]: exit_equity = trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - float(position["margin_used"]) return_pct = pnl / float(position["margin_used"]) trades.append( { "side": "Long", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(return_pct * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return exit_equity, pnl > 0.0, return_pct - ROUNDTRIP_TAKER_COST_ON_MARGIN def adx(highs: pd.Series, lows: pd.Series, closes: pd.Series, length: int) -> pd.Series: up_move = highs.diff() down_move = -lows.diff() plus_dm = up_move.where((up_move > down_move) & (up_move > 0.0), 0.0) minus_dm = down_move.where((down_move > up_move) & (down_move > 0.0), 0.0) atr = true_range(highs, lows, closes).rolling(length).mean() plus_di = 100.0 * plus_dm.rolling(length).mean() / atr minus_di = 100.0 * minus_dm.rolling(length).mean() / atr dx = 100.0 * (plus_di - minus_di).abs() / (plus_di + minus_di) return dx.rolling(length).mean() def daily_regime_to_4h(candles: list[Candle], daily_candles: list[Candle], daily_values: pd.Series) -> pd.Series: daily_frame = pd.DataFrame( { "day": [pd.to_datetime(candle.ts, unit="ms", utc=True).normalize() for candle in daily_candles], "value": daily_values.shift(1).to_numpy(), } ).dropna() by_day = daily_frame.set_index("day")["value"].sort_index() days = pd.Series([pd.to_datetime(candle.ts, unit="ms", utc=True).normalize() for candle in candles]) return days.map(by_day).astype("boolean") def static_regime_series(regime: Regime, candles: list[Candle], btc_candles: list[Candle], eth_daily: list[Candle], btc_daily: list[Candle]) -> pd.Series: closes = pd.Series([candle.close for candle in candles], dtype=float) highs = pd.Series([candle.high for candle in candles], dtype=float) lows = pd.Series([candle.low for candle in candles], dtype=float) returns = closes.pct_change() btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) if regime.name == "baseline": return pd.Series([True] * len(candles)) if regime.name.startswith("btc_4h_sma"): length = int(regime.name.removeprefix("btc_4h_sma")) return btc_closes > btc_closes.rolling(length).mean() if regime.name.startswith("btc_4h_mom"): bars = int(regime.name.removeprefix("btc_4h_mom")) return btc_closes / btc_closes.shift(bars) - 1.0 > 0.0 if regime.name.startswith("eth_daily_sma"): length = int(regime.name.removeprefix("eth_daily_sma")) daily_close = pd.Series([candle.close for candle in eth_daily], dtype=float) return daily_regime_to_4h(candles, eth_daily, daily_close > daily_close.rolling(length).mean()) if regime.name.startswith("eth_daily_mom"): days = int(regime.name.removeprefix("eth_daily_mom")) daily_close = pd.Series([candle.close for candle in eth_daily], dtype=float) return daily_regime_to_4h(candles, eth_daily, daily_close / daily_close.shift(days) - 1.0 > 0.0) if regime.name.startswith("btc_daily_sma"): length = int(regime.name.removeprefix("btc_daily_sma")) daily_close = pd.Series([candle.close for candle in btc_daily], dtype=float) return daily_regime_to_4h(candles, btc_daily, daily_close > daily_close.rolling(length).mean()) if regime.name.startswith("vol_q"): quantile = float(regime.name.removeprefix("vol_q")) / 100.0 realized_vol = returns.rolling(24).std(ddof=1) return realized_vol <= realized_vol.rolling(240).quantile(quantile) if regime.name.startswith("adx_lt"): limit = float(regime.name.removeprefix("adx_lt")) return adx(highs, lows, closes, 14) < limit if regime.name.startswith("adx_gt"): limit = float(regime.name.removeprefix("adx_gt")) return adx(highs, lows, closes, 14) > limit raise ValueError(f"unknown static regime {regime.name}") def baseline_equity_regime_series(regime: Regime, candles: list[Candle], baseline_result: SegmentResult, baseline_daily: pd.Series) -> pd.Series: if regime.name.startswith("eq_trades"): count = int(regime.name.removeprefix("eq_trades")) closed: list[float] = [] trades = sorted(baseline_result.trades, key=lambda trade: pd.to_datetime(str(trade["exit_time"]), utc=True)) trade_index = 0 allowed: list[bool] = [] for candle in candles: current_time = pd.to_datetime(candle.ts, unit="ms", utc=True) while trade_index < len(trades) and pd.to_datetime(str(trades[trade_index]["exit_time"]), utc=True) < current_time: closed.append(float(trades[trade_index]["return_pct"]) / 100.0 - ROUNDTRIP_TAKER_COST_ON_MARGIN) trade_index += 1 allowed.append(len(closed) >= count and sum(closed[-count:]) > 0.0) return pd.Series(allowed) if regime.name.startswith("eq_days"): days = int(regime.name.removeprefix("eq_days")) allowed = [] for candle in candles: current_day = pd.to_datetime(candle.ts, unit="ms", utc=True).normalize() end_day = current_day - pd.Timedelta(days=1) start_day = end_day - pd.Timedelta(days=days) allowed.append( start_day in baseline_daily.index and end_day in baseline_daily.index and float(baseline_daily.loc[end_day] / baseline_daily.loc[start_day] - 1.0) > 0.0 ) return pd.Series(allowed) raise ValueError(f"unknown equity regime {regime.name}") def run_segment( candles: list[Candle], btc_candles: list[Candle], eth_daily: list[Candle], btc_daily: list[Candle], params: BaseParams, regime: Regime, regime_allowed: pd.Series | None = None, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) highs = pd.Series([candle.high for candle in candles], dtype=float) lows = pd.Series([candle.low for candle in candles], dtype=float) returns = closes.pct_change() btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) btc_returns = btc_closes.pct_change() rsi = explore._compute_rsi(closes, params.rsi_length) realized_vol = returns.rolling(params.vol_lookback).std(ddof=1) vol_cap = realized_vol.rolling(params.vol_quantile_lookback).quantile(params.vol_quantile) btc_trend = btc_closes.rolling(params.btc_trend_sma).mean() btc_vol = btc_returns.rolling(params.vol_lookback).std(ddof=1) btc_vol_cap = btc_vol.rolling(params.vol_quantile_lookback).quantile(params.vol_quantile) static_allowed = regime_allowed if regime_allowed is not None else static_regime_series(regime, candles, btc_candles, eth_daily, btc_daily) warmup_bars = max( params.vol_lookback + params.vol_quantile_lookback, params.btc_trend_sma, params.btc_momentum_lookback, params.rsi_length + 2, ) equity = explore.INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 last_exit_index = -10**9 pending_side = False pending_exit = False position: dict[str, object] | None = None trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won, net_return = close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += 1 if won else 0 position = None pending_exit = False last_exit_index = index if pending_side and position is None and equity > 0.0: position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1.0 - params.stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_side = False current_equity = equity if position is not None: if candle.low <= float(position["stop_price"]): equity, won, net_return = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), ) wins += 1 if won else 0 current_equity = equity position = None last_exit_index = index if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue values = [ rsi[index], realized_vol.iloc[index], vol_cap.iloc[index], btc_trend.iloc[index], btc_vol.iloc[index], btc_vol_cap.iloc[index], ] if any(value != value for value in values): continue current_rsi = float(rsi[index]) if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= params.rsi_exit or held_bars >= params.max_hold_bars: pending_exit = True continue if index - last_exit_index < params.cooldown_bars or float(realized_vol.iloc[index]) > float(vol_cap.iloc[index]): continue btc_momentum = btc_candles[index].close / btc_candles[index - params.btc_momentum_lookback].close - 1.0 if btc_candles[index].close <= float(btc_trend.iloc[index]) or btc_momentum < 0.0 or float(btc_vol.iloc[index]) > float(btc_vol_cap.iloc[index]): continue if not bool(static_allowed.iloc[index]): continue if current_rsi <= params.rsi_entry: pending_side = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def regimes() -> list[Regime]: return [ Regime("baseline", "baseline", "existing ETH 4H RSI2 entry<=8; BTC 4H close>SMA200 and 12-bar momentum>=0"), Regime("btc_4h_sma400", "btc_trend", "baseline plus BTC 4H close>SMA400"), Regime("btc_4h_mom24", "btc_trend", "baseline plus BTC 4H 24-bar return>0"), Regime("btc_daily_sma100", "btc_trend", "baseline plus BTC daily close>SMA100"), Regime("btc_daily_sma200", "btc_trend", "baseline plus BTC daily close>SMA200"), Regime("eth_daily_sma50", "eth_daily_trend", "baseline plus ETH daily close>SMA50"), Regime("eth_daily_sma100", "eth_daily_trend", "baseline plus ETH daily close>SMA100"), Regime("eth_daily_sma200", "eth_daily_trend", "baseline plus ETH daily close>SMA200"), Regime("eth_daily_mom20", "eth_daily_trend", "baseline plus ETH daily 20-day return>0"), Regime("vol_q50", "volatility", "baseline with ETH 4H realized vol <= rolling 50th percentile"), Regime("vol_q60", "volatility", "baseline with ETH 4H realized vol <= rolling 60th percentile"), Regime("adx_lt20", "trend_strength", "baseline plus ETH 4H ADX14<20"), Regime("adx_lt25", "trend_strength", "baseline plus ETH 4H ADX14<25"), Regime("adx_gt25", "trend_strength", "baseline plus ETH 4H ADX14>25"), Regime("eq_trades3", "equity_momentum", "baseline only after last 3 closed net trades have positive summed return"), Regime("eq_trades5", "equity_momentum", "baseline only after last 5 closed net trades have positive summed return"), Regime("eq_days30", "equity_momentum", "baseline only after previous 30 closed-trade net equity days are positive"), Regime("eq_days60", "equity_momentum", "baseline only after previous 60 closed-trade net equity days are positive"), ] def build_row(regime: Regime, result: SegmentResult, daily: pd.Series) -> tuple[dict[str, object], list[dict[str, object]], pd.DataFrame]: monthly = monthly_rows(regime.name, daily) current_horizons = horizon_rows(regime.name, result, ROUNDTRIP_TAKER_COST_ON_MARGIN, daily) horizon_by_label = {str(row["horizon"]): float(row["net_total_return"]) for row in current_horizons} stats = trade_stats_for_window(result, ROUNDTRIP_TAKER_COST_ON_MARGIN, daily.index[0], daily.index[-1]) row = { "name": regime.name, "category": regime.category, "rule": regime.rule, "first_candle": daily.index[0].strftime("%Y-%m-%d"), "last_candle": daily.index[-1].strftime("%Y-%m-%d"), "years": (daily.index[-1] - daily.index[0]).total_seconds() / 86_400 / 365, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month_return": float(monthly["return"].min()), "return_3y": horizon_by_label.get("3y", 0.0), "return_1y": horizon_by_label.get("1y", 0.0), "return_6m": horizon_by_label.get("6m", 0.0), "return_3m": horizon_by_label.get("3m", 0.0), **stats, **metrics_from_daily_equity(daily), } return row, current_horizons, monthly def markdown_report(paths: list[Path], totals: pd.DataFrame, horizon: pd.DataFrame, monthly: pd.DataFrame, command: str) -> str: baseline = totals[totals["name"] == "baseline"].iloc[0] improved = totals[ (totals["name"] != "baseline") & (totals["return_6m"] > float(baseline["return_6m"]) + 1e-9) & (totals["return_3m"] > float(baseline["return_3m"]) + 1e-9) ].sort_values(["return_3m", "return_6m", "net_calmar"], ascending=[False, False, False]) top_names = set(totals.head(10)["name"]) | {"baseline"} lines = [ "# Mean reversion regime review", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: ETH-USDT-SWAP 4H RSI2 long mean reversion from existing expansion best candidate.", f"Cost: roundtrip taker cost on margin at {LEVERAGE}x = {ROUNDTRIP_TAKER_COST_ON_MARGIN:.6f}.", "", "## Baseline", "", markdown_table(pd.DataFrame([baseline])[["name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m"]]), "", "## Conditions improving both 6m and 3m", "", markdown_table(improved[["name", "category", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "rule"]]), "", "## All regime tests", "", markdown_table(totals[["name", "category", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "worst_month_return"]]), "", "## Horizon metrics", "", markdown_table(horizon[horizon["name"].isin(top_names)][["name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "win_rate", "payoff_ratio"]]), "", "## Monthly returns", "", markdown_table(monthly[monthly["name"].isin(top_names)][["name", "month", "return", "start_equity", "end_equity"]].tail(180)), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth_15m = load_base_candles("ETH-USDT-SWAP", args.years) btc_15m = load_base_candles("BTC-USDT-SWAP", args.years) eth_4h = resample_candles(eth_15m, "4H") btc_4h = resample_candles(btc_15m, "4H") eth_daily = resample_candles(eth_15m, "1D") btc_daily = resample_candles(btc_15m, "1D") candles, btc_candles = explore.align_pair_candles(eth_4h, btc_4h) params = BaseParams() total_rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] tested = regimes() baseline_regime = tested[0] baseline_result = run_segment(candles, btc_candles, eth_daily, btc_daily, params, baseline_regime) baseline_frame = explore.cost_adjusted_trade_equity_frame(baseline_result, ROUNDTRIP_TAKER_COST_ON_MARGIN) baseline_start = pd.to_datetime(baseline_result.equity_curve[0]["ts"], unit="ms", utc=True) baseline_end = pd.to_datetime(baseline_result.equity_curve[-1]["ts"], unit="ms", utc=True) baseline_daily = daily_equity(baseline_frame, baseline_start, baseline_end) for index, regime in enumerate(tested, start=1): allowed = baseline_equity_regime_series(regime, candles, baseline_result, baseline_daily) if regime.category == "equity_momentum" else None result = baseline_result if regime.name == "baseline" else run_segment(candles, btc_candles, eth_daily, btc_daily, params, regime, allowed) frame = explore.cost_adjusted_trade_equity_frame(result, ROUNDTRIP_TAKER_COST_ON_MARGIN) start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True) end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True) daily = daily_equity(frame, start, end) row, current_horizons, monthly = build_row(regime, result, daily) total_rows.append(row) horizon_output.extend(current_horizons) monthly_frames.append(monthly) print(f"done {index}/{len(tested)} {regime.name}", flush=True) totals = pd.DataFrame(total_rows).sort_values( ["return_3m", "return_6m", "net_calmar", "net_total_return"], ascending=[False, False, False, False], ) horizon = pd.DataFrame(horizon_output) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["full", "3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["name", "horizon"]) monthly = pd.concat(monthly_frames, ignore_index=True).sort_values(["name", "month"]) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / f"{PREFIX}-totals.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv" best_path = args.output_dir / f"{PREFIX}-best.json" report_path = args.output_dir / f"{PREFIX}-report.md" totals.to_csv(totals_path, index=False) horizon.to_csv(horizon_path, index=False) monthly.to_csv(monthly_path, index=False) best_path.write_text(json.dumps(totals.head(5).to_dict(orient="records"), indent=2), encoding="utf-8") paths = [totals_path, horizon_path, monthly_path, best_path, report_path] command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" report_path.write_text(markdown_report(paths, totals, horizon, monthly, command), encoding="utf-8") print(totals.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())