#!/usr/bin/env python3 from __future__ import annotations import argparse import json from dataclasses import dataclass from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/recent-market-adaptation") ETH = "ETH-USDT-SWAP" BTC = "BTC-USDT-SWAP" BAR = "15m" INITIAL_EQUITY = 10_000.0 LEVERAGE = 3.0 ROUNDTRIP_COST_ON_MARGIN = 0.0021 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ("14d", pd.DateOffset(days=14)), ) @dataclass(frozen=True) class Strategy: name: str description: str kind: str STRATEGIES = ( Strategy( name="btc-lead-momentum", kind="btc_lead", description="Trade ETH in the direction of BTC 2h momentum when ETH confirms over 1h; fixed stop, take-profit, or 4h max hold.", ), Strategy( name="eth-compression-breakout", kind="breakout", description="Trade ETH 15m close breakouts from a compressed 12h range; fixed stop, take-profit, or 6h max hold.", ), Strategy( name="eth-btc-relative-weakness-short", kind="relative_weak", description="Short ETH when BTC is rising over 6h while ETH/BTC keeps falling; fixed stop, take-profit, or 6h max hold.", ), Strategy( name="recent-style-router", kind="router", description="Use prior 30d ETH-vs-BTC relative return and prior 90d volatility: relative weakness routes short, trend/high-vol routes breakout, otherwise BTC lead momentum.", ), ) def load_frame(symbol: str, bar: str) -> pd.DataFrame: path = DATA_DIR / symbol / f"{bar}.csv" frame = pd.read_csv(path) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") def load_market() -> pd.DataFrame: eth = load_frame(ETH, BAR).add_prefix("eth_") btc = load_frame(BTC, BAR).add_prefix("btc_") frame = eth.join(btc, how="inner") frame["eth_ret"] = frame["eth_close"].pct_change() frame["btc_ret"] = frame["btc_close"].pct_change() return frame.dropna().copy() def rsi(series: pd.Series, length: int) -> pd.Series: delta = series.diff() gain = delta.clip(lower=0.0).rolling(length).mean() loss = (-delta.clip(upper=0.0)).rolling(length).mean() rs = gain / loss return 100.0 - 100.0 / (1.0 + rs) def add_signals(frame: pd.DataFrame) -> pd.DataFrame: out = frame.copy() out["btc_2h_return"] = out["btc_close"] / out["btc_close"].shift(8) - 1.0 out["eth_1h_return"] = out["eth_close"] / out["eth_close"].shift(4) - 1.0 out["btc_lead_raw"] = 0 out.loc[(out["btc_2h_return"] >= 0.008) & (out["eth_1h_return"] >= 0.002), "btc_lead_raw"] = 1 out.loc[(out["btc_2h_return"] <= -0.008) & (out["eth_1h_return"] <= -0.002), "btc_lead_raw"] = -1 out["btc_lead_signal"] = out["btc_lead_raw"].where(out["btc_lead_raw"] != out["btc_lead_raw"].shift(1), 0) prior_high = out["eth_high"].shift(1).rolling(48).max() prior_low = out["eth_low"].shift(1).rolling(48).min() width = (prior_high - prior_low) / out["eth_close"] compressed = width <= width.rolling(384).quantile(0.30) out["breakout_raw"] = 0 out.loc[compressed & (out["eth_close"] > prior_high), "breakout_raw"] = 1 out.loc[compressed & (out["eth_close"] < prior_low), "breakout_raw"] = -1 out["breakout_signal"] = out["breakout_raw"].where(out["breakout_raw"] != out["breakout_raw"].shift(1), 0) out["eth_rsi2"] = rsi(out["eth_close"], 2) out["eth_24h_return_abs"] = (out["eth_close"] / out["eth_close"].shift(96) - 1.0).abs() weak_trend = out["eth_24h_return_abs"] <= 0.035 out["reversion_raw"] = 0 out.loc[weak_trend & (out["eth_rsi2"] <= 8.0), "reversion_raw"] = 1 out.loc[weak_trend & (out["eth_rsi2"] >= 92.0), "reversion_raw"] = -1 out["reversion_signal"] = out["reversion_raw"].where(out["reversion_raw"] != out["reversion_raw"].shift(1), 0) out["eth_btc_ratio"] = out["eth_close"] / out["btc_close"] out["btc_6h_return"] = out["btc_close"] / out["btc_close"].shift(24) - 1.0 out["ratio_6h_return"] = out["eth_btc_ratio"] / out["eth_btc_ratio"].shift(24) - 1.0 out["relative_weak_raw"] = 0 out.loc[(out["btc_6h_return"] >= 0.006) & (out["ratio_6h_return"] <= -0.010), "relative_weak_raw"] = -1 out["relative_weak_signal"] = out["relative_weak_raw"].where(out["relative_weak_raw"] != out["relative_weak_raw"].shift(1), 0) out["eth_30d_return"] = out["eth_close"] / out["eth_close"].shift(96 * 30) - 1.0 out["btc_30d_return"] = out["btc_close"] / out["btc_close"].shift(96 * 30) - 1.0 out["eth_90d_vol"] = out["eth_ret"].rolling(96 * 90).std(ddof=0) * (96 * 365) ** 0.5 relative_weak_style = (out["eth_30d_return"] <= out["btc_30d_return"] - 0.06) & (out["eth_90d_vol"] <= 0.75) trend_style = ( (out["eth_30d_return"].abs() >= 0.12) | (out["btc_30d_return"].abs() >= 0.10) | (out["eth_90d_vol"] >= 0.88) ) out["router_signal"] = out["btc_lead_signal"] out.loc[trend_style, "router_signal"] = out.loc[trend_style, "breakout_signal"] out.loc[relative_weak_style, "router_signal"] = out.loc[relative_weak_style, "relative_weak_signal"] out["router_regime"] = "btc_lead_momentum" out.loc[trend_style, "router_regime"] = "trend_or_high_vol_breakout" out.loc[relative_weak_style, "router_regime"] = "relative_weakness_short" return out def exit_hit(side: int, entry: float, row: object, stop_pct: float, take_pct: float) -> float | None: stop = entry * (1.0 - stop_pct if side == 1 else 1.0 + stop_pct) take = entry * (1.0 + take_pct if side == 1 else 1.0 - take_pct) if side == 1: if float(row.eth_open) <= stop or float(row.eth_open) >= take: return float(row.eth_open) if float(row.eth_low) <= stop: return stop if float(row.eth_high) >= take: return take else: if float(row.eth_open) >= stop or float(row.eth_open) <= take: return float(row.eth_open) if float(row.eth_high) >= stop: return stop if float(row.eth_low) <= take: return take return None def exit_by_rule(kind: str, side: int, row: object) -> bool: if kind == "reversion": return (side == 1 and float(row.eth_rsi2) >= 52.0) or (side == -1 and float(row.eth_rsi2) <= 48.0) return False def params_for(kind: str, row: object) -> tuple[str, int, float, float, int]: if kind == "router": if str(row.router_regime) == "relative_weakness_short": return "relative_weak_signal", 24, 0.007, 0.012, 96 * 90 if str(row.router_regime) == "trend_or_high_vol_breakout": return "breakout_signal", 24, 0.008, 0.014, 48 + 384 return "btc_lead_signal", 16, 0.007, 0.011, 48 + 384 if kind == "btc_lead": return "btc_lead_signal", 16, 0.007, 0.011, 8 if kind == "breakout": return "breakout_signal", 24, 0.008, 0.014, 48 + 384 if kind == "reversion": return "reversion_signal", 16, 0.006, 0.007, 96 if kind == "relative_weak": return "relative_weak_signal", 24, 0.007, 0.012, 96 * 30 raise ValueError(f"unknown strategy kind: {kind}") def run_strategy(frame: pd.DataFrame, strategy: Strategy) -> tuple[pd.DataFrame, list[dict[str, object]]]: rows = list(frame.itertuples()) equity = INITIAL_EQUITY curve = [{"ts": frame.index[0], "equity": equity}] trades: list[dict[str, object]] = [] position: dict[str, object] | None = None start_index = 96 * 90 if strategy.kind == "router" else 48 + 384 index = start_index while index < len(rows) - 1: row = rows[index] if position is None: signal_col, hold, stop_pct, take_pct, _ = params_for(strategy.kind, row) side = int(getattr(row, signal_col)) if side == 0: index += 1 continue entry_index = index + 1 entry_row = rows[entry_index] position = { "side": side, "entry_index": entry_index, "entry_time": frame.index[entry_index], "entry": float(entry_row.eth_open), "hold": hold, "stop_pct": stop_pct, "take_pct": take_pct, "routed_kind": "breakout" if signal_col == "breakout_signal" else "relative_weak" if signal_col == "relative_weak_signal" else "reversion" if signal_col == "reversion_signal" else "btc_lead", "regime": getattr(row, "router_regime", "") if strategy.kind == "router" else "", } index = entry_index continue side = int(position["side"]) entry = float(position["entry"]) exit_price = exit_hit(side, entry, row, float(position["stop_pct"]), float(position["take_pct"])) held = index - int(position["entry_index"]) if exit_price is None and exit_by_rule(str(position["routed_kind"]), side, row): exit_price = float(row.eth_close) if exit_price is None and held >= int(position["hold"]): exit_price = float(row.eth_close) if exit_price is None: index += 1 continue gross = exit_price / entry - 1.0 if side == 1 else entry / exit_price - 1.0 net_return = gross * LEVERAGE - ROUNDTRIP_COST_ON_MARGIN equity *= 1.0 + net_return trades.append( { "strategy": strategy.name, "side": "long" if side == 1 else "short", "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"), "exit_time": frame.index[index].strftime("%Y-%m-%d %H:%M"), "entry_price": entry, "exit_price": exit_price, "net_return": net_return, "routed_kind": position["routed_kind"], "regime": position["regime"], } ) curve.append({"ts": frame.index[index], "equity": equity}) position = None index += 1 if pd.Timestamp(curve[-1]["ts"]) < frame.index[-1]: curve.append({"ts": frame.index[-1], "equity": equity}) return pd.DataFrame(curve), trades def max_drawdown(equity: pd.Series) -> float: peak = equity.cummax() return float(((peak - equity) / peak).max()) def horizon_curve(curve: pd.DataFrame, start: pd.Timestamp) -> pd.DataFrame: before = curve[curve["ts"] <= start] if len(before): start_equity = float(before["equity"].iloc[-1]) after = curve[curve["ts"] > start] return pd.concat([pd.DataFrame([{"ts": start, "equity": start_equity}]), after], ignore_index=True) return curve.copy() def trade_stats(trades: list[dict[str, object]], start: pd.Timestamp, end: pd.Timestamp) -> dict[str, object]: scoped = [trade for trade in trades if pd.to_datetime(str(trade["entry_time"]), utc=True) >= start] returns = [float(trade["net_return"]) for trade in scoped] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] gross_profit = sum(wins) gross_loss = abs(sum(losses)) months = max((end - start).total_seconds() / 86_400.0 / 30.0, 1e-9) return { "trades": len(returns), "trades_per_30d": len(returns) / months, "win_rate": len(wins) / len(returns) if returns else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, "avg_trade_return": sum(returns) / len(returns) if returns else 0.0, } def metrics(curve: pd.DataFrame, trades: list[dict[str, object]], label: str, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, object]: current = horizon_curve(curve, start) years = max((end - start).total_seconds() / 86_400.0 / 365.0, 1e-9) total_return = float(current["equity"].iloc[-1] / current["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0 drawdown = max_drawdown(current.set_index("ts")["equity"]) return { "horizon": label, "start": start.strftime("%Y-%m-%d %H:%M"), "end": end.strftime("%Y-%m-%d %H:%M"), "total_return": total_return, "annualized_return": annualized, "max_drawdown": drawdown, "calmar": annualized / drawdown if drawdown else 0.0, **trade_stats(trades, start, end), } def recent_style_rows(frame: pd.DataFrame) -> list[dict[str, object]]: end = frame.index[-1] rows = [] for label, offset in (("90d", pd.DateOffset(days=90)), ("30d", pd.DateOffset(days=30))): start = end - offset current = frame[frame.index >= start] eth_return = float(current["eth_close"].iloc[-1] / current["eth_close"].iloc[0] - 1.0) btc_return = float(current["btc_close"].iloc[-1] / current["btc_close"].iloc[0] - 1.0) eth_vol = float(current["eth_ret"].std(ddof=0) * (96 * 365) ** 0.5) corr = float(current["eth_ret"].corr(current["btc_ret"])) rows.append( { "window": label, "start": start.strftime("%Y-%m-%d %H:%M"), "end": end.strftime("%Y-%m-%d %H:%M"), "eth_return": eth_return, "btc_return": btc_return, "eth_annualized_vol": eth_vol, "eth_btc_corr_15m": corr, } ) return rows def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def write_report( path: Path, command: str, style: pd.DataFrame, summary: pd.DataFrame, horizons: pd.DataFrame, regime: pd.DataFrame, outputs: list[Path], ) -> None: primary = summary.sort_values(["min_recent_return", "calmar"], ascending=[False, False]) lines = [ "# Recent Market Adaptation Exploration", "", f"Command: `{command}`", "", "Scope: local OKX ETH/BTC 15m candle cache only. No live executor changes, no deployment, no orders.", "", f"Cost model: {ROUNDTRIP_COST_ON_MARGIN:.4f} roundtrip cost on margin, leverage {LEVERAGE:g}x.", "", "## Recent 90d/30d Style", "", markdown_table(style), "", "## Fixed Strategy Set", "", markdown_table(pd.DataFrame([strategy.__dict__ for strategy in STRATEGIES])), "", "## Summary", "", markdown_table( primary[ [ "strategy", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "trades_per_30d", "win_rate", "profit_factor", "min_recent_return", ] ] ), "", "## Required Horizons", "", markdown_table( horizons[ [ "strategy", "horizon", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "trades_per_30d", "win_rate", "profit_factor", ] ] ), "", "## Router Regime Split", "", markdown_table(regime), "", "## Output Files", "", *[f"- `{output}`" for output in outputs], "", ] path.write_text("\n".join(lines), encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() frame = add_signals(load_market()) end = frame.index[-1] style = pd.DataFrame(recent_style_rows(frame)) horizon_rows: list[dict[str, object]] = [] summary_rows: list[dict[str, object]] = [] trade_rows: list[dict[str, object]] = [] regime_rows: list[dict[str, object]] = [] for strategy in STRATEGIES: curve, trades = run_strategy(frame, strategy) trade_rows.extend(trades) rows = [] for label, offset in HORIZONS: start = frame.index[0] if offset is None else max(frame.index[0], end - offset) row = {"strategy": strategy.name, **metrics(curve, trades, label, start, end)} rows.append(row) horizon_rows.append(row) full = rows[0] recent = [row["total_return"] for row in rows if row["horizon"] in {"3m", "30d", "14d"}] summary_rows.append( { "strategy": strategy.name, "description": strategy.description, **{key: value for key, value in full.items() if key not in {"horizon", "start", "end"}}, "first_candle": frame.index[0].strftime("%Y-%m-%d %H:%M"), "last_candle": end.strftime("%Y-%m-%d %H:%M"), "min_recent_return": min(float(value) for value in recent), } ) if strategy.kind == "router": router_trades = pd.DataFrame(trades) if len(router_trades): grouped = router_trades.groupby(["regime", "routed_kind"])["net_return"].agg(["count", "mean", "sum"]).reset_index() regime_rows.extend(grouped.rename(columns={"count": "trades", "mean": "avg_trade_return", "sum": "sum_trade_return"}).to_dict("records")) summary = pd.DataFrame(summary_rows).sort_values(["min_recent_return", "calmar"], ascending=[False, False]) horizons = pd.DataFrame(horizon_rows) horizon_order = [label for label, _ in HORIZONS] horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=horizon_order, ordered=True) horizons = horizons.sort_values(["strategy", "horizon"]) trades = pd.DataFrame(trade_rows) regime = pd.DataFrame(regime_rows) if regime_rows else pd.DataFrame(columns=["regime", "routed_kind", "trades", "avg_trade_return", "sum_trade_return"]) args.output_dir.mkdir(parents=True, exist_ok=True) style_path = args.output_dir / "recent-style.csv" summary_path = args.output_dir / "strategy-summary.csv" horizon_path = args.output_dir / "strategy-horizons.csv" trades_path = args.output_dir / "strategy-trades.csv" regime_path = args.output_dir / "router-regime-split.csv" json_path = args.output_dir / "summary.json" report_path = args.output_dir / "report.md" style.to_csv(style_path, index=False) summary.to_csv(summary_path, index=False) horizons.to_csv(horizon_path, index=False) trades.to_csv(trades_path, index=False) regime.to_csv(regime_path, index=False) outputs = [style_path, summary_path, horizon_path, trades_path, regime_path, json_path, report_path] command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --output-dir {args.output_dir.as_posix()}" json_path.write_text( json.dumps( { "command": command, "first_candle": frame.index[0].isoformat(), "last_candle": end.isoformat(), "cost_model": {"roundtrip_cost_on_margin": ROUNDTRIP_COST_ON_MARGIN, "leverage": LEVERAGE}, "top_summary": summary.to_dict("records"), "outputs": [str(output) for output in outputs], }, indent=2, ), encoding="utf-8", ) write_report(report_path, command, style, summary, horizons, regime, outputs) print(summary.to_string(index=False)) print(f"wrote {report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())