from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore from scripts.search_eth_btc_nextgen_variants import markdown_table, metrics_from_daily_equity, monthly_rows OUTPUT_DIR = Path("reports/strategy-expansion") PREFIX = "mean-reversion" BASE_BAR = "15m" YEARS = 10.0 LEVERAGE = 3 ROUNDTRIP_TAKER_COST_ON_MARGIN = 0.0004 * 2 * LEVERAGE HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Params: family: str symbol: str bar: str side_mode: str rsi_length: int rsi_entry: float rsi_exit: float bb_length: int bb_mult: float kc_mult: float squeeze_lookback: int vol_lookback: int vol_quantile_lookback: int vol_quantile: float btc_trend_sma: int btc_momentum_lookback: int btc_min_momentum: float exit_midline: bool max_hold_bars: int stop_loss_pct: float cooldown_bars: int @property def name(self) -> str: return ( f"{self.symbol}-{self.bar}-{self.family}-{self.side_mode}" f"-r{self.rsi_length}-{self.rsi_entry:g}-{self.rsi_exit:g}" f"-bb{self.bb_length}-{self.bb_mult:g}-kc{self.kc_mult:g}-sq{self.squeeze_lookback}" f"-vw{self.vol_lookback}-vq{self.vol_quantile_lookback}-{self.vol_quantile:g}" f"-bt{self.btc_trend_sma}-bm{self.btc_momentum_lookback}-{self.btc_min_momentum:g}" f"-mid{int(self.exit_midline)}-mh{self.max_hold_bars}-sl{self.stop_loss_pct:g}-cd{self.cooldown_bars}" ) def load_base_candles(symbol: str, years: float) -> list[Candle]: candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, BASE_BAR) if not candles: raise FileNotFoundError(f"missing cached candles for {symbol} {BASE_BAR}") requested = explore.history_bars_for_years(BASE_BAR, years) return candles[-requested:] if len(candles) > requested else candles def resample_candles(candles: list[Candle], bar: str) -> list[Candle]: if bar == BASE_BAR: return candles rule_by_bar = {"1H": "1h", "4H": "4h", "1D": "1D"} frame = pd.DataFrame( { "ts": pd.to_datetime([candle.ts for candle in candles], unit="ms", utc=True), "open": [candle.open for candle in candles], "high": [candle.high for candle in candles], "low": [candle.low for candle in candles], "close": [candle.close for candle in candles], "volume": [candle.volume for candle in candles], } ).set_index("ts") sampled = frame.resample(rule_by_bar[bar], label="left", closed="left").agg( {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} ) sampled = sampled.dropna(subset=["open", "high", "low", "close"]) symbol = candles[0].symbol return [ Candle( symbol=symbol, ts=int(index.timestamp() * 1000), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for index, row in sampled.iterrows() ] def true_range(highs: pd.Series, lows: pd.Series, closes: pd.Series) -> pd.Series: prev_close = closes.shift(1) return pd.concat([(highs - lows), (highs - prev_close).abs(), (lows - prev_close).abs()], axis=1).max(axis=1) def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, ) -> tuple[float, bool]: exit_equity = trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - float(position["margin_used"]) trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / float(position["margin_used"]) * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def risk_allows(side: str, btc_close: float, btc_trend: float, btc_momentum: float, btc_vol: float, vol_cap: float, params: Params) -> bool: if btc_vol > vol_cap: return False if side == "long": return btc_close > btc_trend and btc_momentum >= params.btc_min_momentum return btc_close < btc_trend and btc_momentum <= -params.btc_min_momentum def run_segment(candles: list[Candle], btc_candles: list[Candle], params: Params) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) highs = pd.Series([candle.high for candle in candles], dtype=float) lows = pd.Series([candle.low for candle in candles], dtype=float) btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) returns = closes.pct_change() btc_returns = btc_closes.pct_change() rsi = explore._compute_rsi(closes, params.rsi_length) middle = closes.rolling(params.bb_length).mean() stdev = closes.rolling(params.bb_length).std(ddof=0) upper = middle + params.bb_mult * stdev lower = middle - params.bb_mult * stdev atr = true_range(highs, lows, closes).rolling(params.bb_length).mean() keltner_upper = middle + params.kc_mult * atr keltner_lower = middle - params.kc_mult * atr squeeze = ((upper < keltner_upper) & (lower > keltner_lower)).rolling(params.squeeze_lookback).max() realized_vol = returns.rolling(params.vol_lookback).std(ddof=1) vol_cap = realized_vol.rolling(params.vol_quantile_lookback).quantile(params.vol_quantile) btc_trend = btc_closes.rolling(params.btc_trend_sma).mean() btc_vol = btc_returns.rolling(params.vol_lookback).std(ddof=1) btc_vol_cap = btc_vol.rolling(params.vol_quantile_lookback).quantile(params.vol_quantile) warmup_bars = max( params.bb_length + params.squeeze_lookback, params.vol_lookback + params.vol_quantile_lookback, params.btc_trend_sma, params.btc_momentum_lookback, params.rsi_length + 2, ) equity = explore.INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 last_exit_index = -10**9 pending_side: str | None = None pending_exit = False position: dict[str, object] | None = None trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += 1 if won else 0 position = None pending_exit = False last_exit_index = index if pending_side is not None and position is None and equity > 0.0: position = { "side": pending_side, "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1.0 - params.stop_loss_pct if pending_side == "long" else 1.0 + params.stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_side}) pending_side = None current_equity = equity if position is not None: side = str(position["side"]) stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or ( side == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), ) wins += 1 if won else 0 current_equity = equity position = None last_exit_index = index if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue values = [ rsi[index], middle.iloc[index], upper.iloc[index], lower.iloc[index], squeeze.iloc[index], realized_vol.iloc[index], vol_cap.iloc[index], btc_trend.iloc[index], btc_vol.iloc[index], btc_vol_cap.iloc[index], ] if any(value != value for value in values): continue current_rsi = float(rsi[index]) current_middle = float(middle.iloc[index]) current_vol = float(realized_vol.iloc[index]) current_vol_cap = float(vol_cap.iloc[index]) btc_momentum = btc_candles[index].close / btc_candles[index - params.btc_momentum_lookback].close - 1.0 if position is not None: side = str(position["side"]) held_bars = index - int(position["entry_index"]) if side == "long": exit_signal = current_rsi >= params.rsi_exit or (params.exit_midline and candle.close >= current_middle) else: exit_signal = current_rsi <= 100.0 - params.rsi_exit or (params.exit_midline and candle.close <= current_middle) if exit_signal or held_bars >= params.max_hold_bars: pending_exit = True continue if index - last_exit_index < params.cooldown_bars or current_vol > current_vol_cap: continue btc_vol_limit = float(btc_vol_cap.iloc[index]) long_allowed = params.side_mode in ("long", "both") and risk_allows( "long", btc_candles[index].close, float(btc_trend.iloc[index]), btc_momentum, float(btc_vol.iloc[index]), btc_vol_limit, params, ) short_allowed = params.side_mode in ("short", "both") and risk_allows( "short", btc_candles[index].close, float(btc_trend.iloc[index]), btc_momentum, float(btc_vol.iloc[index]), btc_vol_limit, params, ) if params.family == "rsi": if long_allowed and current_rsi <= params.rsi_entry: pending_side = "long" elif short_allowed and current_rsi >= 100.0 - params.rsi_entry: pending_side = "short" elif params.family == "bb_squeeze": if long_allowed and bool(squeeze.iloc[index]) and candle.close <= float(lower.iloc[index]) and current_rsi <= params.rsi_exit: pending_side = "long" elif short_allowed and bool(squeeze.iloc[index]) and candle.close >= float(upper.iloc[index]) and current_rsi >= 100.0 - params.rsi_exit: pending_side = "short" elif params.family == "range_band": if long_allowed and candle.close <= float(lower.iloc[index]) and current_rsi <= params.rsi_exit: pending_side = "long" elif short_allowed and candle.close >= float(upper.iloc[index]) and current_rsi >= 100.0 - params.rsi_exit: pending_side = "short" else: raise ValueError(f"unknown family {params.family}") trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def daily_equity(frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() start_day = start.normalize() end_day = end.normalize() series = pd.concat([pd.Series([explore.INITIAL_EQUITY], index=[start_day]), series]).sort_index() series = series.groupby(level=0).last() index = pd.date_range(start_day, end_day, freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index) def trade_stats_for_window(result: SegmentResult, cost: float, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float | int]: returns: list[float] = [] for trade in result.trades: exit_time = pd.to_datetime(str(trade["exit_time"]), utc=True) if start <= exit_time <= end: returns.append(float(trade["return_pct"]) / 100.0 - cost) wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss_abs = abs(sum(losses) / len(losses)) if losses else 0.0 return { "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0, } def horizon_rows(name: str, result: SegmentResult, cost: float, series: pd.Series) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = series.index[-1] for label, offset in HORIZONS: horizon = series if offset is None else series[series.index >= end_time - offset] if len(horizon) < 2: horizon = series rows.append( { "name": name, "horizon": label, "horizon_start": horizon.index[0].strftime("%Y-%m-%d"), "horizon_end": horizon.index[-1].strftime("%Y-%m-%d"), **metrics_from_daily_equity(horizon), **trade_stats_for_window(result, cost, horizon.index[0], horizon.index[-1]), } ) return rows def build_params() -> list[Params]: params: list[Params] = [] for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP"): for bar in ("1H", "4H", "1D"): hold_by_bar = {"1H": (48,), "4H": (24,), "1D": (10,)}[bar] trend_by_bar = {"1H": (200, 400), "4H": (100, 200), "1D": (80, 160)}[bar] momentum_by_bar = {"1H": (24,), "4H": (12,), "1D": (10,)}[bar] for family in ("rsi", "bb_squeeze", "range_band"): for rsi_length in ((2, 4) if family == "rsi" else (4,)): for rsi_entry in ((5.0, 8.0, 12.0) if family == "rsi" else (20.0,)): for bb_length in ((20, 40) if family != "rsi" else (20,)): for bb_mult in ((2.0,) if family != "range_band" else (1.8, 2.2)): for kc_mult in ((1.5,) if family == "bb_squeeze" else (1.5,)): for btc_trend in trend_by_bar: for btc_momentum in momentum_by_bar: for max_hold in hold_by_bar: params.append( Params( family=family, symbol=symbol, bar=bar, side_mode="long", rsi_length=rsi_length, rsi_entry=rsi_entry, rsi_exit=55.0, bb_length=bb_length, bb_mult=bb_mult, kc_mult=kc_mult, squeeze_lookback=8, vol_lookback=24, vol_quantile_lookback=240, vol_quantile=0.75, btc_trend_sma=btc_trend, btc_momentum_lookback=btc_momentum, btc_min_momentum=0.0, exit_midline=family != "rsi", max_hold_bars=max_hold, stop_loss_pct=0.08 if bar == "1D" else 0.045, cooldown_bars=max(2, max_hold // 4), ) ) return params def markdown_report(paths: list[Path], totals: pd.DataFrame, horizon: pd.DataFrame, monthly: pd.DataFrame, command: str) -> str: top = totals.head(20) top_names = set(top.head(5)["name"]) selected_horizons = horizon[horizon["name"].isin(top_names)] selected_monthly = monthly[monthly["name"].isin(top_names)] lines = [ "# Mean reversion strategy expansion", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: ETH/BTC 1H/4H/1D mean-reversion search from 15m OKX cache.", "Families: RSI2/RSI4 pullback, Bollinger/Keltner squeeze reversal, and range-band reversal.", f"Cost: 0.04% single-side taker, roundtrip cost on margin at {LEVERAGE}x = {ROUNDTRIP_TAKER_COST_ON_MARGIN:.6f}.", "", "## Top candidates", "", markdown_table( top[ [ "name", "symbol", "bar", "family", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "worst_month_return", ] ] ), "", "## Horizon metrics for top five", "", markdown_table( selected_horizons[ [ "name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "win_rate", "payoff_ratio", ] ] ), "", "## Monthly returns for top five", "", markdown_table(selected_monthly[["name", "month", "return", "start_equity", "end_equity"]].tail(120)), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int, default=0) args = parser.parse_args() base_data = { symbol: load_base_candles(symbol, args.years) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP") } data = { (symbol, bar): resample_candles(base_data[symbol], bar) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP") for bar in ("1H", "4H", "1D") } params_grid = build_params() if args.max_candidates: params_grid = params_grid[: args.max_candidates] total_rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] for index, params in enumerate(params_grid, start=1): candles, btc_candles = explore.align_pair_candles(data[(params.symbol, params.bar)], data[("BTC-USDT-SWAP", params.bar)]) result = run_segment(candles, btc_candles, params) if not result.equity_curve: continue frame = explore.cost_adjusted_trade_equity_frame(result, ROUNDTRIP_TAKER_COST_ON_MARGIN) start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True) end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True) daily = daily_equity(frame, start, end) metrics = metrics_from_daily_equity(daily) stats = trade_stats_for_window(result, ROUNDTRIP_TAKER_COST_ON_MARGIN, daily.index[0], daily.index[-1]) monthly = monthly_rows(params.name, daily) current_horizons = horizon_rows(params.name, result, ROUNDTRIP_TAKER_COST_ON_MARGIN, daily) horizon_by_label = {str(row["horizon"]): float(row["net_total_return"]) for row in current_horizons} row = { "name": params.name, "symbol": params.symbol, "bar": params.bar, "family": params.family, "eligible_sample": stats["trades"] >= 20, "roundtrip_cost_on_margin": ROUNDTRIP_TAKER_COST_ON_MARGIN, "first_candle": start.strftime("%Y-%m-%d %H:%M"), "last_candle": end.strftime("%Y-%m-%d %H:%M"), "years": (end - start).total_seconds() / 86_400 / 365, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month_return": float(monthly["return"].min()), "return_3y": horizon_by_label.get("3y", 0.0), "return_1y": horizon_by_label.get("1y", 0.0), "return_6m": horizon_by_label.get("6m", 0.0), "return_3m": horizon_by_label.get("3m", 0.0), **params.__dict__, **stats, **metrics, } total_rows.append(row) horizon_output.extend(current_horizons) monthly_frames.append(monthly) print(f"done {index}/{len(params_grid)} {params.name}", flush=True) totals = pd.DataFrame(total_rows) totals["eligible_candidate"] = totals["eligible_sample"] & (totals["net_total_return"] > 0.0) totals = totals.sort_values( ["eligible_candidate", "net_calmar", "net_annualized_return", "net_max_drawdown", "trades"], ascending=[False, False, False, True, True], ) horizon = pd.DataFrame(horizon_output) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["full", "3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["name", "horizon"]) monthly = pd.concat(monthly_frames, ignore_index=True) top_names = set(totals.head(25)["name"]) monthly_top = monthly[monthly["name"].isin(top_names)].sort_values(["name", "month"]) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / f"{PREFIX}-totals.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv" best_path = args.output_dir / f"{PREFIX}-best.json" report_path = args.output_dir / f"{PREFIX}-report.md" totals.to_csv(totals_path, index=False) horizon.to_csv(horizon_path, index=False) monthly_top.to_csv(monthly_path, index=False) best_path.write_text(json.dumps(totals.head(3).to_dict(orient="records"), indent=2), encoding="utf-8") paths = [totals_path, horizon_path, monthly_path, best_path, report_path] command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" report_path.write_text(markdown_report(paths, totals, horizon, monthly_top, command), encoding="utf-8") print(totals.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())