#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from itertools import product from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from scripts import explore_ultrashort as explore ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" PREFIX = "eth-regime-router" OUTPUT_DIR = Path("reports/eth-exploration") YEARS = 10.0 BAR = "15m" PRIMARY_COST = "maker_taker" COSTS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("all", None), ("10y", pd.DateOffset(years=10)), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class RouterSpec: name: str trend_sma: int vol_lookback: int corr_lookback: int ratio_lookback: int lead_lookback: int eth_trend_min: float btc_trend_min: float max_eth_vol: float min_corr: float ratio_z_entry: float lead_return_min: float lag_gap_min: float min_volume_ratio: float stop_loss_pct: float take_profit_pct: float max_hold_bars: int def load_candles(symbol: str, bar: str, years: float) -> list[explore.Candle]: candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar) if not candles: raise FileNotFoundError(f"missing cached candles for {symbol} {bar}") requested = explore.history_bars_for_years(bar, years) return candles[-requested:] if len(candles) > requested else candles def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: explore.Candle, exit_price: float, leverage: int, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = explore.trade_equity( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100, 6), "cost_weight": 1.0, "regime": str(position["regime"]), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long", "regime": str(position["regime"])}) return account_equity + pnl, pnl > 0.0 def regime_action( *, index: int, eth: list[explore.Candle], btc: list[explore.Candle], eth_close: pd.Series, btc_close: pd.Series, eth_volume: pd.Series, eth_sma: pd.Series, btc_sma: pd.Series, eth_vol: pd.Series, btc_vol: pd.Series, corr: pd.Series, ratio_z: pd.Series, volume_ratio: pd.Series, eth_rsi: list[float], spec: RouterSpec, ) -> str: values = ( eth_sma.iloc[index], btc_sma.iloc[index], eth_vol.iloc[index], btc_vol.iloc[index], corr.iloc[index], ratio_z.iloc[index], volume_ratio.iloc[index], ) if any(value != value for value in values): return "cash" eth_trend = eth_close.iloc[index] / eth_sma.iloc[index] - 1.0 btc_trend = btc_close.iloc[index] / btc_sma.iloc[index] - 1.0 eth_ret = eth_close.iloc[index] / eth_close.iloc[index - spec.lead_lookback] - 1.0 btc_ret = btc_close.iloc[index] / btc_close.iloc[index - spec.lead_lookback] - 1.0 current_rsi = eth_rsi[index] if current_rsi != current_rsi: return "cash" hour = pd.to_datetime(eth[index].ts, unit="ms", utc=True).hour liquid = volume_ratio.iloc[index] >= spec.min_volume_ratio and hour not in (0, 1, 2, 3) calm = eth_vol.iloc[index] <= spec.max_eth_vol and btc_vol.iloc[index] <= spec.max_eth_vol * 0.85 coupled = corr.iloc[index] >= spec.min_corr if not liquid: return "cash" if coupled and calm and eth_trend >= spec.eth_trend_min and btc_trend >= spec.btc_trend_min and current_rsi <= 8.0: return "trend_follow" if coupled and btc_ret >= spec.lead_return_min and btc_ret - eth_ret >= spec.lag_gap_min and calm and current_rsi <= 35.0: return "btc_lead" if abs(eth_trend) < spec.eth_trend_min and ratio_z.iloc[index] <= -spec.ratio_z_entry and calm and current_rsi <= 15.0: return "mean_reversion" return "cash" def run_router_segment( *, eth: list[explore.Candle], btc: list[explore.Candle], spec: RouterSpec, leverage: int, ) -> explore.SegmentResult: eth_close = pd.Series([candle.close for candle in eth], dtype=float) btc_close = pd.Series([candle.close for candle in btc], dtype=float) eth_volume = pd.Series([candle.volume for candle in eth], dtype=float) eth_rsi = explore._compute_rsi(eth_close, 2) eth_ret = eth_close.pct_change() btc_ret = btc_close.pct_change() ratio = eth_close / btc_close ratio_mean = ratio.rolling(spec.ratio_lookback).mean() ratio_std = ratio.rolling(spec.ratio_lookback).std(ddof=0) ratio_z = (ratio - ratio_mean) / ratio_std eth_sma = eth_close.rolling(spec.trend_sma).mean() btc_sma = btc_close.rolling(spec.trend_sma).mean() eth_vol = eth_ret.rolling(spec.vol_lookback).std(ddof=0) btc_vol = btc_ret.rolling(spec.vol_lookback).std(ddof=0) corr = eth_ret.rolling(spec.corr_lookback).corr(btc_ret) volume_ratio = eth_volume / eth_volume.rolling(spec.vol_lookback).median() warmup = max(spec.trend_sma, spec.vol_lookback, spec.corr_lookback, spec.ratio_lookback, spec.lead_lookback + 1) equity = explore.INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry: str | None = None pending_exit = False for index in range(warmup, len(eth)): candle = eth[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False if pending_entry is not None and position is None and equity > 0.0: position = { "side": "long", "regime": pending_entry, "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1.0 - spec.stop_loss_pct), "take_profit_price": candle.open * (1.0 + spec.take_profit_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "long", "regime": pending_entry}) pending_entry = None current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None and candle.high >= float(position["take_profit_price"]): equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["take_profit_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = explore.mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue if position is not None: held_bars = index - int(position["entry_index"]) current_rsi = eth_rsi[index] current_ratio_z = ratio_z.iloc[index] current_sma = eth_sma.iloc[index] regime = str(position["regime"]) if held_bars >= spec.max_hold_bars: pending_exit = True elif regime == "trend_follow" and current_rsi == current_rsi and current_rsi >= 55.0: pending_exit = True elif regime == "trend_follow" and current_sma == current_sma and candle.close < float(current_sma): pending_exit = True elif regime == "mean_reversion" and current_rsi == current_rsi and current_rsi >= 50.0: pending_exit = True elif regime == "mean_reversion" and current_ratio_z == current_ratio_z and current_ratio_z >= 0.0: pending_exit = True elif regime == "btc_lead" and current_rsi == current_rsi and current_rsi >= 60.0: pending_exit = True continue action = regime_action( index=index, eth=eth, btc=btc, eth_close=eth_close, btc_close=btc_close, eth_volume=eth_volume, eth_sma=eth_sma, btc_sma=btc_sma, eth_vol=eth_vol, btc_vol=btc_vol, corr=corr, ratio_z=ratio_z, volume_ratio=volume_ratio, eth_rsi=eth_rsi, spec=spec, ) if action != "cash": pending_entry = action trade_count = len(trades) return explore.SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth[warmup:], equity_curve=equity_curve, entries=entries, exits=exits, ) def specs() -> list[RouterSpec]: out: list[RouterSpec] = [] for trend_sma, max_eth_vol, min_corr, ratio_z_entry in product( (96, 192), (0.018, 0.024), (0.45, 0.60), (1.0, 1.5), ): eth_trend_min = 0.006 lead_return_min = 0.016 min_volume_ratio = 0.90 out.append( RouterSpec( name=( f"router-t{trend_sma}-et{eth_trend_min}-v{max_eth_vol}-c{min_corr}" f"-rz{ratio_z_entry}-br{lead_return_min}-liq{min_volume_ratio}" ), trend_sma=trend_sma, vol_lookback=96, corr_lookback=192, ratio_lookback=192, lead_lookback=16, eth_trend_min=eth_trend_min, btc_trend_min=eth_trend_min * 0.6, max_eth_vol=max_eth_vol, min_corr=min_corr, ratio_z_entry=ratio_z_entry, lead_return_min=lead_return_min, lag_gap_min=0.006, min_volume_ratio=min_volume_ratio, stop_loss_pct=0.010, take_profit_pct=0.022, max_hold_bars=48, ) ) return out def cost_frame(result: explore.SegmentResult, cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": explore.INITIAL_EQUITY}] equity = explore.INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) return pd.DataFrame(rows) def trade_stats(result: explore.SegmentResult, cost: float) -> dict[str, float]: returns = [float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) for trade in result.trades] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0 gross_profit = sum(wins) gross_loss = abs(sum(losses)) return { "win_rate": len(wins) / len(returns) if returns else 0.0, "profit_loss_ratio": avg_win / avg_loss if avg_loss else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, } def horizon_rows(name: str, frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: if offset is None: horizon = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon["ts"].iloc[0]) else: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) after_cutoff = frame[frame["ts"] > cutoff] horizon = pd.concat( [pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after_cutoff[["ts", "equity"]]], ignore_index=True, ) start_time = cutoff else: horizon = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon["ts"].iloc[0]) metrics = explore.annualized_metrics_from_equity(horizon, int(start_time.timestamp() * 1000), last_ts) rows.append( { "name": name, "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **metrics, "risk_return_ratio": metrics["net_total_return"] / metrics["net_max_drawdown"] if metrics["net_max_drawdown"] else 0.0, } ) return rows def monthly_rows(name: str, frame: pd.DataFrame) -> pd.DataFrame: series = frame.set_index("ts")["equity"].resample("ME").last().ffill() out = pd.DataFrame( { "name": name, "month": series.index.strftime("%Y-%m"), "start_equity": series.shift(1).fillna(frame["equity"].iloc[0]).to_numpy(), "end_equity": series.to_numpy(), } ) out["return"] = out["end_equity"] / out["start_equity"] - 1.0 return out def regime_rows(name: str, result: explore.SegmentResult, cost: float) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for regime, group in pd.DataFrame(result.trades).groupby("regime") if result.trades else []: net_returns = group["return_pct"].astype(float) / 100.0 - cost * group.get("cost_weight", 1.0).astype(float) wins = net_returns[net_returns > 0.0] losses = net_returns[net_returns < 0.0] rows.append( { "name": name, "regime": regime, "trades": len(group), "net_return_contribution": float(net_returns.sum()), "win_rate": float(len(wins) / len(group)) if len(group) else 0.0, "profit_factor": float(wins.sum() / abs(losses.sum())) if len(losses) and abs(losses.sum()) > 0.0 else 0.0, } ) if not rows: rows.append({"name": name, "regime": "none", "trades": 0, "net_return_contribution": 0.0, "win_rate": 0.0, "profit_factor": 0.0}) return rows def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_report(command: str, output_files: list[Path], total: pd.DataFrame, horizon: pd.DataFrame, regime: pd.DataFrame, monthly: pd.DataFrame) -> str: primary = total[total["cost_model"] == PRIMARY_COST].head(10) names = set(primary["name"]) horizon_top = horizon[(horizon["cost_model"] == PRIMARY_COST) & horizon["name"].isin(names)].copy() regime_top = regime[(regime["cost_model"] == PRIMARY_COST) & regime["name"].isin(names)].copy() worst_months = monthly[(monthly["cost_model"] == PRIMARY_COST) & monthly["name"].isin(names)].sort_values("return").head(20) best = primary.iloc[0] if len(primary) else pd.Series(dtype=object) decision = "No router variants produced trades." if len(best): decision = ( f"Best maker_taker router `{best['name']}`: annualized={best['net_annualized_return']:.4f}, " f"DD={best['net_max_drawdown']:.4f}, Calmar={best['net_calmar']:.4f}, " f"win_rate={best['win_rate']:.4f}, profit_factor={best['profit_factor']:.4f}." ) lines = [ "# ETH regime router variants", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Router actions: trend_follow, mean_reversion, btc_lead, or cash. Regime inputs: BTC/ETH volatility, trend distance from SMA, rolling correlation, ETH/BTC ratio z-score, UTC hour, and ETH volume ratio.", f"Costs: {', '.join(COSTS)}. Primary sort uses {PRIMARY_COST}, qualified horizons, Calmar, annualized return, and drawdown.", "", f"Decision: {decision}", "", "## Top maker_taker routers", "", markdown_table( primary[ [ "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "profit_loss_ratio", "profit_factor", "risk_return_ratio", "min_horizon_total_return", ] ] ), "", "## Horizon checks", "", markdown_table( horizon_top[ [ "name", "horizon", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "risk_return_ratio", ] ] ), "", "## Regime contribution", "", markdown_table(regime_top[["name", "regime", "trades", "net_return_contribution", "win_rate", "profit_factor"]]), "", "## Worst months among top routers", "", markdown_table(worst_months[["name", "month", "return"]]), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int) args = parser.parse_args() eth_raw = load_candles(ETH_SYMBOL, args.bar, args.years) btc_raw = load_candles(BTC_SYMBOL, args.bar, args.years) eth, btc = explore.align_pair_candles(eth_raw, btc_raw) if not eth or not btc: raise RuntimeError("no aligned ETH/BTC candles") candidates = specs() if args.max_candidates is not None: candidates = candidates[: args.max_candidates] total_rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] regime_output: list[dict[str, object]] = [] for index, spec in enumerate(candidates, start=1): result = run_router_segment(eth=eth, btc=btc, spec=spec, leverage=explore.LEVERAGE) print(f"done {index}/{len(candidates)} {spec.name} trades={result.trade_count}", flush=True) for cost_model, cost in COSTS.items(): frame = cost_frame(result, cost) metric_start_ts = int(pd.Timestamp(frame["ts"].iloc[0]).timestamp() * 1000) metric_end_ts = int(pd.Timestamp(frame["ts"].iloc[-1]).timestamp() * 1000) metrics = explore.annualized_metrics_from_equity(frame, metric_start_ts, metric_end_ts) stats = trade_stats(result, cost) current_horizons = horizon_rows(spec.name, frame, eth[-1].ts) min_horizon_return = min(float(row["net_total_return"]) for row in current_horizons if row["horizon"] != "all") total_rows.append( { "name": spec.name, "cost_model": cost_model, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL, "bar": args.bar, "first_candle": pd.Timestamp(frame["ts"].iloc[0]).strftime("%Y-%m-%d %H:%M"), "last_candle": pd.Timestamp(frame["ts"].iloc[-1]).strftime("%Y-%m-%d %H:%M"), "years": (metric_end_ts - metric_start_ts) / 86_400_000 / 365, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "risk_return_ratio": metrics["net_total_return"] / metrics["net_max_drawdown"] if metrics["net_max_drawdown"] else 0.0, "min_horizon_total_return": min_horizon_return, **spec.__dict__, **metrics, **stats, } ) for row in current_horizons: horizon_output.append({"cost_model": cost_model, **row}) monthly_frames.append(monthly_rows(spec.name, frame).assign(cost_model=cost_model)) for row in regime_rows(spec.name, result, cost): regime_output.append({"cost_model": cost_model, **row}) total = pd.DataFrame(total_rows).sort_values( ["cost_model", "min_horizon_total_return", "net_calmar", "net_annualized_return", "net_max_drawdown"], ascending=[True, False, False, False, True], ) horizon = pd.DataFrame(horizon_output) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizon = horizon.sort_values(["cost_model", "name", "horizon"]) monthly = pd.concat(monthly_frames, ignore_index=True) regime = pd.DataFrame(regime_output).sort_values(["cost_model", "name", "net_return_contribution"], ascending=[True, True, False]) args.output_dir.mkdir(parents=True, exist_ok=True) total_path = args.output_dir / f"{PREFIX}-total.csv" horizon_path = args.output_dir / f"{PREFIX}-horizon.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly.csv" regime_path = args.output_dir / f"{PREFIX}-regime-contribution.csv" top_path = args.output_dir / f"{PREFIX}-top10.csv" json_path = args.output_dir / f"{PREFIX}-summary.json" report_path = args.output_dir / f"{PREFIX}-report.md" total.to_csv(total_path, index=False) horizon.to_csv(horizon_path, index=False) monthly.to_csv(monthly_path, index=False) regime.to_csv(regime_path, index=False) total[total["cost_model"] == PRIMARY_COST].head(10).to_csv(top_path, index=False) summary = { "report": PREFIX, "command": f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}", "primary_cost": PRIMARY_COST, "candidate_count": len(candidates), "top_maker_taker": total[total["cost_model"] == PRIMARY_COST].head(10).to_dict("records"), "output_files": [str(path) for path in [total_path, horizon_path, monthly_path, regime_path, top_path, json_path, report_path]], } json_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") report_path.write_text( markdown_report( command=summary["command"], output_files=[total_path, horizon_path, monthly_path, regime_path, top_path, json_path, report_path], total=total, horizon=horizon, regime=regime, monthly=monthly, ), encoding="utf-8", ) print(total[total["cost_model"] == PRIMARY_COST].head(20).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())