| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- from scripts import explore_ultrashort as explore
- from scripts.search_eth_btc_nextgen_variants import markdown_table, metrics_from_daily_equity, monthly_rows
- from scripts.search_expansion_mean_reversion import (
- ROUNDTRIP_TAKER_COST_ON_MARGIN,
- daily_equity,
- horizon_rows,
- load_base_candles,
- resample_candles,
- trade_stats_for_window,
- true_range,
- )
- OUTPUT_DIR = Path("reports/strategy-expansion")
- PREFIX = "mean-reversion-regime"
- YEARS = 10.0
- LEVERAGE = 3
- @dataclass(frozen=True)
- class BaseParams:
- rsi_length: int = 2
- rsi_entry: float = 8.0
- rsi_exit: float = 55.0
- bb_length: int = 20
- vol_lookback: int = 24
- vol_quantile_lookback: int = 240
- vol_quantile: float = 0.75
- btc_trend_sma: int = 200
- btc_momentum_lookback: int = 12
- max_hold_bars: int = 24
- stop_loss_pct: float = 0.045
- cooldown_bars: int = 6
- @dataclass(frozen=True)
- class Regime:
- name: str
- category: str
- rule: str
- def close_position(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- candle: Candle,
- exit_price: float,
- ) -> tuple[float, bool, float]:
- exit_equity = trade_equity(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - float(position["margin_used"])
- return_pct = pnl / float(position["margin_used"])
- trades.append(
- {
- "side": "Long",
- "entry_time": explore._format_ts(int(position["entry_time"])),
- "exit_time": explore._format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(return_pct * 100.0, 4),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": "long"})
- return exit_equity, pnl > 0.0, return_pct - ROUNDTRIP_TAKER_COST_ON_MARGIN
- def adx(highs: pd.Series, lows: pd.Series, closes: pd.Series, length: int) -> pd.Series:
- up_move = highs.diff()
- down_move = -lows.diff()
- plus_dm = up_move.where((up_move > down_move) & (up_move > 0.0), 0.0)
- minus_dm = down_move.where((down_move > up_move) & (down_move > 0.0), 0.0)
- atr = true_range(highs, lows, closes).rolling(length).mean()
- plus_di = 100.0 * plus_dm.rolling(length).mean() / atr
- minus_di = 100.0 * minus_dm.rolling(length).mean() / atr
- dx = 100.0 * (plus_di - minus_di).abs() / (plus_di + minus_di)
- return dx.rolling(length).mean()
- def daily_regime_to_4h(candles: list[Candle], daily_candles: list[Candle], daily_values: pd.Series) -> pd.Series:
- daily_frame = pd.DataFrame(
- {
- "day": [pd.to_datetime(candle.ts, unit="ms", utc=True).normalize() for candle in daily_candles],
- "value": daily_values.shift(1).to_numpy(),
- }
- ).dropna()
- by_day = daily_frame.set_index("day")["value"].sort_index()
- days = pd.Series([pd.to_datetime(candle.ts, unit="ms", utc=True).normalize() for candle in candles])
- return days.map(by_day).astype("boolean")
- def static_regime_series(regime: Regime, candles: list[Candle], btc_candles: list[Candle], eth_daily: list[Candle], btc_daily: list[Candle]) -> pd.Series:
- closes = pd.Series([candle.close for candle in candles], dtype=float)
- highs = pd.Series([candle.high for candle in candles], dtype=float)
- lows = pd.Series([candle.low for candle in candles], dtype=float)
- returns = closes.pct_change()
- btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float)
- if regime.name == "baseline":
- return pd.Series([True] * len(candles))
- if regime.name.startswith("btc_4h_sma"):
- length = int(regime.name.removeprefix("btc_4h_sma"))
- return btc_closes > btc_closes.rolling(length).mean()
- if regime.name.startswith("btc_4h_mom"):
- bars = int(regime.name.removeprefix("btc_4h_mom"))
- return btc_closes / btc_closes.shift(bars) - 1.0 > 0.0
- if regime.name.startswith("eth_daily_sma"):
- length = int(regime.name.removeprefix("eth_daily_sma"))
- daily_close = pd.Series([candle.close for candle in eth_daily], dtype=float)
- return daily_regime_to_4h(candles, eth_daily, daily_close > daily_close.rolling(length).mean())
- if regime.name.startswith("eth_daily_mom"):
- days = int(regime.name.removeprefix("eth_daily_mom"))
- daily_close = pd.Series([candle.close for candle in eth_daily], dtype=float)
- return daily_regime_to_4h(candles, eth_daily, daily_close / daily_close.shift(days) - 1.0 > 0.0)
- if regime.name.startswith("btc_daily_sma"):
- length = int(regime.name.removeprefix("btc_daily_sma"))
- daily_close = pd.Series([candle.close for candle in btc_daily], dtype=float)
- return daily_regime_to_4h(candles, btc_daily, daily_close > daily_close.rolling(length).mean())
- if regime.name.startswith("vol_q"):
- quantile = float(regime.name.removeprefix("vol_q")) / 100.0
- realized_vol = returns.rolling(24).std(ddof=1)
- return realized_vol <= realized_vol.rolling(240).quantile(quantile)
- if regime.name.startswith("adx_lt"):
- limit = float(regime.name.removeprefix("adx_lt"))
- return adx(highs, lows, closes, 14) < limit
- if regime.name.startswith("adx_gt"):
- limit = float(regime.name.removeprefix("adx_gt"))
- return adx(highs, lows, closes, 14) > limit
- raise ValueError(f"unknown static regime {regime.name}")
- def baseline_equity_regime_series(regime: Regime, candles: list[Candle], baseline_result: SegmentResult, baseline_daily: pd.Series) -> pd.Series:
- if regime.name.startswith("eq_trades"):
- count = int(regime.name.removeprefix("eq_trades"))
- closed: list[float] = []
- trades = sorted(baseline_result.trades, key=lambda trade: pd.to_datetime(str(trade["exit_time"]), utc=True))
- trade_index = 0
- allowed: list[bool] = []
- for candle in candles:
- current_time = pd.to_datetime(candle.ts, unit="ms", utc=True)
- while trade_index < len(trades) and pd.to_datetime(str(trades[trade_index]["exit_time"]), utc=True) < current_time:
- closed.append(float(trades[trade_index]["return_pct"]) / 100.0 - ROUNDTRIP_TAKER_COST_ON_MARGIN)
- trade_index += 1
- allowed.append(len(closed) >= count and sum(closed[-count:]) > 0.0)
- return pd.Series(allowed)
- if regime.name.startswith("eq_days"):
- days = int(regime.name.removeprefix("eq_days"))
- allowed = []
- for candle in candles:
- current_day = pd.to_datetime(candle.ts, unit="ms", utc=True).normalize()
- end_day = current_day - pd.Timedelta(days=1)
- start_day = end_day - pd.Timedelta(days=days)
- allowed.append(
- start_day in baseline_daily.index
- and end_day in baseline_daily.index
- and float(baseline_daily.loc[end_day] / baseline_daily.loc[start_day] - 1.0) > 0.0
- )
- return pd.Series(allowed)
- raise ValueError(f"unknown equity regime {regime.name}")
- def run_segment(
- candles: list[Candle],
- btc_candles: list[Candle],
- eth_daily: list[Candle],
- btc_daily: list[Candle],
- params: BaseParams,
- regime: Regime,
- regime_allowed: pd.Series | None = None,
- ) -> SegmentResult:
- closes = pd.Series([candle.close for candle in candles], dtype=float)
- highs = pd.Series([candle.high for candle in candles], dtype=float)
- lows = pd.Series([candle.low for candle in candles], dtype=float)
- returns = closes.pct_change()
- btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float)
- btc_returns = btc_closes.pct_change()
- rsi = explore._compute_rsi(closes, params.rsi_length)
- realized_vol = returns.rolling(params.vol_lookback).std(ddof=1)
- vol_cap = realized_vol.rolling(params.vol_quantile_lookback).quantile(params.vol_quantile)
- btc_trend = btc_closes.rolling(params.btc_trend_sma).mean()
- btc_vol = btc_returns.rolling(params.vol_lookback).std(ddof=1)
- btc_vol_cap = btc_vol.rolling(params.vol_quantile_lookback).quantile(params.vol_quantile)
- static_allowed = regime_allowed if regime_allowed is not None else static_regime_series(regime, candles, btc_candles, eth_daily, btc_daily)
- warmup_bars = max(
- params.vol_lookback + params.vol_quantile_lookback,
- params.btc_trend_sma,
- params.btc_momentum_lookback,
- params.rsi_length + 2,
- )
- equity = explore.INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- last_exit_index = -10**9
- pending_side = False
- pending_exit = False
- position: dict[str, object] | None = None
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- for index in range(warmup_bars, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- equity, won, net_return = close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open)
- wins += 1 if won else 0
- position = None
- pending_exit = False
- last_exit_index = index
- if pending_side and position is None and equity > 0.0:
- position = {
- "side": "long",
- "entry_time": candle.ts,
- "entry_price": candle.open,
- "entry_index": index,
- "margin_used": equity,
- "stop_price": candle.open * (1.0 - params.stop_loss_pct),
- }
- entries.append({"ts": candle.ts, "price": candle.open, "side": "long"})
- pending_side = False
- current_equity = equity
- if position is not None:
- if candle.low <= float(position["stop_price"]):
- equity, won, net_return = close_position(
- trades=trades,
- exits=exits,
- position=position,
- candle=candle,
- exit_price=float(position["stop_price"]),
- )
- wins += 1 if won else 0
- current_equity = equity
- position = None
- last_exit_index = index
- if position is not None:
- current_equity = mark_to_market(
- side="long",
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1 or equity <= 0.0:
- continue
- values = [
- rsi[index],
- realized_vol.iloc[index],
- vol_cap.iloc[index],
- btc_trend.iloc[index],
- btc_vol.iloc[index],
- btc_vol_cap.iloc[index],
- ]
- if any(value != value for value in values):
- continue
- current_rsi = float(rsi[index])
- if position is not None:
- held_bars = index - int(position["entry_index"])
- if current_rsi >= params.rsi_exit or held_bars >= params.max_hold_bars:
- pending_exit = True
- continue
- if index - last_exit_index < params.cooldown_bars or float(realized_vol.iloc[index]) > float(vol_cap.iloc[index]):
- continue
- btc_momentum = btc_candles[index].close / btc_candles[index - params.btc_momentum_lookback].close - 1.0
- if btc_candles[index].close <= float(btc_trend.iloc[index]) or btc_momentum < 0.0 or float(btc_vol.iloc[index]) > float(btc_vol_cap.iloc[index]):
- continue
- if not bool(static_allowed.iloc[index]):
- continue
- if current_rsi <= params.rsi_entry:
- pending_side = True
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY,
- win_rate=wins / trade_count if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=candles[warmup_bars:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def regimes() -> list[Regime]:
- return [
- Regime("baseline", "baseline", "existing ETH 4H RSI2 entry<=8; BTC 4H close>SMA200 and 12-bar momentum>=0"),
- Regime("btc_4h_sma400", "btc_trend", "baseline plus BTC 4H close>SMA400"),
- Regime("btc_4h_mom24", "btc_trend", "baseline plus BTC 4H 24-bar return>0"),
- Regime("btc_daily_sma100", "btc_trend", "baseline plus BTC daily close>SMA100"),
- Regime("btc_daily_sma200", "btc_trend", "baseline plus BTC daily close>SMA200"),
- Regime("eth_daily_sma50", "eth_daily_trend", "baseline plus ETH daily close>SMA50"),
- Regime("eth_daily_sma100", "eth_daily_trend", "baseline plus ETH daily close>SMA100"),
- Regime("eth_daily_sma200", "eth_daily_trend", "baseline plus ETH daily close>SMA200"),
- Regime("eth_daily_mom20", "eth_daily_trend", "baseline plus ETH daily 20-day return>0"),
- Regime("vol_q50", "volatility", "baseline with ETH 4H realized vol <= rolling 50th percentile"),
- Regime("vol_q60", "volatility", "baseline with ETH 4H realized vol <= rolling 60th percentile"),
- Regime("adx_lt20", "trend_strength", "baseline plus ETH 4H ADX14<20"),
- Regime("adx_lt25", "trend_strength", "baseline plus ETH 4H ADX14<25"),
- Regime("adx_gt25", "trend_strength", "baseline plus ETH 4H ADX14>25"),
- Regime("eq_trades3", "equity_momentum", "baseline only after last 3 closed net trades have positive summed return"),
- Regime("eq_trades5", "equity_momentum", "baseline only after last 5 closed net trades have positive summed return"),
- Regime("eq_days30", "equity_momentum", "baseline only after previous 30 closed-trade net equity days are positive"),
- Regime("eq_days60", "equity_momentum", "baseline only after previous 60 closed-trade net equity days are positive"),
- ]
- def build_row(regime: Regime, result: SegmentResult, daily: pd.Series) -> tuple[dict[str, object], list[dict[str, object]], pd.DataFrame]:
- monthly = monthly_rows(regime.name, daily)
- current_horizons = horizon_rows(regime.name, result, ROUNDTRIP_TAKER_COST_ON_MARGIN, daily)
- horizon_by_label = {str(row["horizon"]): float(row["net_total_return"]) for row in current_horizons}
- stats = trade_stats_for_window(result, ROUNDTRIP_TAKER_COST_ON_MARGIN, daily.index[0], daily.index[-1])
- row = {
- "name": regime.name,
- "category": regime.category,
- "rule": regime.rule,
- "first_candle": daily.index[0].strftime("%Y-%m-%d"),
- "last_candle": daily.index[-1].strftime("%Y-%m-%d"),
- "years": (daily.index[-1] - daily.index[0]).total_seconds() / 86_400 / 365,
- "gross_total_return": result.total_return,
- "gross_max_drawdown_mark_to_market": result.max_drawdown,
- "worst_month_return": float(monthly["return"].min()),
- "return_3y": horizon_by_label.get("3y", 0.0),
- "return_1y": horizon_by_label.get("1y", 0.0),
- "return_6m": horizon_by_label.get("6m", 0.0),
- "return_3m": horizon_by_label.get("3m", 0.0),
- **stats,
- **metrics_from_daily_equity(daily),
- }
- return row, current_horizons, monthly
- def markdown_report(paths: list[Path], totals: pd.DataFrame, horizon: pd.DataFrame, monthly: pd.DataFrame, command: str) -> str:
- baseline = totals[totals["name"] == "baseline"].iloc[0]
- improved = totals[
- (totals["name"] != "baseline")
- & (totals["return_6m"] > float(baseline["return_6m"]) + 1e-9)
- & (totals["return_3m"] > float(baseline["return_3m"]) + 1e-9)
- ].sort_values(["return_3m", "return_6m", "net_calmar"], ascending=[False, False, False])
- top_names = set(totals.head(10)["name"]) | {"baseline"}
- lines = [
- "# Mean reversion regime review",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Scope: ETH-USDT-SWAP 4H RSI2 long mean reversion from existing expansion best candidate.",
- f"Cost: roundtrip taker cost on margin at {LEVERAGE}x = {ROUNDTRIP_TAKER_COST_ON_MARGIN:.6f}.",
- "",
- "## Baseline",
- "",
- markdown_table(pd.DataFrame([baseline])[["name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m"]]),
- "",
- "## Conditions improving both 6m and 3m",
- "",
- markdown_table(improved[["name", "category", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "rule"]]),
- "",
- "## All regime tests",
- "",
- markdown_table(totals[["name", "category", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "worst_month_return"]]),
- "",
- "## Horizon metrics",
- "",
- markdown_table(horizon[horizon["name"].isin(top_names)][["name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "win_rate", "payoff_ratio"]]),
- "",
- "## Monthly returns",
- "",
- markdown_table(monthly[monthly["name"].isin(top_names)][["name", "month", "return", "start_equity", "end_equity"]].tail(180)),
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- eth_15m = load_base_candles("ETH-USDT-SWAP", args.years)
- btc_15m = load_base_candles("BTC-USDT-SWAP", args.years)
- eth_4h = resample_candles(eth_15m, "4H")
- btc_4h = resample_candles(btc_15m, "4H")
- eth_daily = resample_candles(eth_15m, "1D")
- btc_daily = resample_candles(btc_15m, "1D")
- candles, btc_candles = explore.align_pair_candles(eth_4h, btc_4h)
- params = BaseParams()
- total_rows: list[dict[str, object]] = []
- horizon_output: list[dict[str, object]] = []
- monthly_frames: list[pd.DataFrame] = []
- tested = regimes()
- baseline_regime = tested[0]
- baseline_result = run_segment(candles, btc_candles, eth_daily, btc_daily, params, baseline_regime)
- baseline_frame = explore.cost_adjusted_trade_equity_frame(baseline_result, ROUNDTRIP_TAKER_COST_ON_MARGIN)
- baseline_start = pd.to_datetime(baseline_result.equity_curve[0]["ts"], unit="ms", utc=True)
- baseline_end = pd.to_datetime(baseline_result.equity_curve[-1]["ts"], unit="ms", utc=True)
- baseline_daily = daily_equity(baseline_frame, baseline_start, baseline_end)
- for index, regime in enumerate(tested, start=1):
- allowed = baseline_equity_regime_series(regime, candles, baseline_result, baseline_daily) if regime.category == "equity_momentum" else None
- result = baseline_result if regime.name == "baseline" else run_segment(candles, btc_candles, eth_daily, btc_daily, params, regime, allowed)
- frame = explore.cost_adjusted_trade_equity_frame(result, ROUNDTRIP_TAKER_COST_ON_MARGIN)
- start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True)
- end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True)
- daily = daily_equity(frame, start, end)
- row, current_horizons, monthly = build_row(regime, result, daily)
- total_rows.append(row)
- horizon_output.extend(current_horizons)
- monthly_frames.append(monthly)
- print(f"done {index}/{len(tested)} {regime.name}", flush=True)
- totals = pd.DataFrame(total_rows).sort_values(
- ["return_3m", "return_6m", "net_calmar", "net_total_return"],
- ascending=[False, False, False, False],
- )
- horizon = pd.DataFrame(horizon_output)
- horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["full", "3y", "1y", "6m", "3m"], ordered=True)
- horizon = horizon.sort_values(["name", "horizon"])
- monthly = pd.concat(monthly_frames, ignore_index=True).sort_values(["name", "month"])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- totals_path = args.output_dir / f"{PREFIX}-totals.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizons.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv"
- best_path = args.output_dir / f"{PREFIX}-best.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- totals.to_csv(totals_path, index=False)
- horizon.to_csv(horizon_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- best_path.write_text(json.dumps(totals.head(5).to_dict(orient="records"), indent=2), encoding="utf-8")
- paths = [totals_path, horizon_path, monthly_path, best_path, report_path]
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}"
- report_path.write_text(markdown_report(paths, totals, horizon, monthly, command), encoding="utf-8")
- print(totals.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|