| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import argparse
- import json
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/recent-market-adaptation")
- ETH = "ETH-USDT-SWAP"
- BTC = "BTC-USDT-SWAP"
- BAR = "15m"
- INITIAL_EQUITY = 10_000.0
- LEVERAGE = 3.0
- ROUNDTRIP_COST_ON_MARGIN = 0.0021
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- ("30d", pd.DateOffset(days=30)),
- ("14d", pd.DateOffset(days=14)),
- )
- @dataclass(frozen=True)
- class Strategy:
- name: str
- description: str
- kind: str
- STRATEGIES = (
- Strategy(
- name="btc-lead-momentum",
- kind="btc_lead",
- description="Trade ETH in the direction of BTC 2h momentum when ETH confirms over 1h; fixed stop, take-profit, or 4h max hold.",
- ),
- Strategy(
- name="eth-compression-breakout",
- kind="breakout",
- description="Trade ETH 15m close breakouts from a compressed 12h range; fixed stop, take-profit, or 6h max hold.",
- ),
- Strategy(
- name="eth-btc-relative-weakness-short",
- kind="relative_weak",
- description="Short ETH when BTC is rising over 6h while ETH/BTC keeps falling; fixed stop, take-profit, or 6h max hold.",
- ),
- Strategy(
- name="recent-style-router",
- kind="router",
- description="Use prior 30d ETH-vs-BTC relative return and prior 90d volatility: relative weakness routes short, trend/high-vol routes breakout, otherwise BTC lead momentum.",
- ),
- )
- def load_frame(symbol: str, bar: str) -> pd.DataFrame:
- path = DATA_DIR / symbol / f"{bar}.csv"
- frame = pd.read_csv(path)
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts")
- def load_market() -> pd.DataFrame:
- eth = load_frame(ETH, BAR).add_prefix("eth_")
- btc = load_frame(BTC, BAR).add_prefix("btc_")
- frame = eth.join(btc, how="inner")
- frame["eth_ret"] = frame["eth_close"].pct_change()
- frame["btc_ret"] = frame["btc_close"].pct_change()
- return frame.dropna().copy()
- def rsi(series: pd.Series, length: int) -> pd.Series:
- delta = series.diff()
- gain = delta.clip(lower=0.0).rolling(length).mean()
- loss = (-delta.clip(upper=0.0)).rolling(length).mean()
- rs = gain / loss
- return 100.0 - 100.0 / (1.0 + rs)
- def add_signals(frame: pd.DataFrame) -> pd.DataFrame:
- out = frame.copy()
- out["btc_2h_return"] = out["btc_close"] / out["btc_close"].shift(8) - 1.0
- out["eth_1h_return"] = out["eth_close"] / out["eth_close"].shift(4) - 1.0
- out["btc_lead_raw"] = 0
- out.loc[(out["btc_2h_return"] >= 0.008) & (out["eth_1h_return"] >= 0.002), "btc_lead_raw"] = 1
- out.loc[(out["btc_2h_return"] <= -0.008) & (out["eth_1h_return"] <= -0.002), "btc_lead_raw"] = -1
- out["btc_lead_signal"] = out["btc_lead_raw"].where(out["btc_lead_raw"] != out["btc_lead_raw"].shift(1), 0)
- prior_high = out["eth_high"].shift(1).rolling(48).max()
- prior_low = out["eth_low"].shift(1).rolling(48).min()
- width = (prior_high - prior_low) / out["eth_close"]
- compressed = width <= width.rolling(384).quantile(0.30)
- out["breakout_raw"] = 0
- out.loc[compressed & (out["eth_close"] > prior_high), "breakout_raw"] = 1
- out.loc[compressed & (out["eth_close"] < prior_low), "breakout_raw"] = -1
- out["breakout_signal"] = out["breakout_raw"].where(out["breakout_raw"] != out["breakout_raw"].shift(1), 0)
- out["eth_rsi2"] = rsi(out["eth_close"], 2)
- out["eth_24h_return_abs"] = (out["eth_close"] / out["eth_close"].shift(96) - 1.0).abs()
- weak_trend = out["eth_24h_return_abs"] <= 0.035
- out["reversion_raw"] = 0
- out.loc[weak_trend & (out["eth_rsi2"] <= 8.0), "reversion_raw"] = 1
- out.loc[weak_trend & (out["eth_rsi2"] >= 92.0), "reversion_raw"] = -1
- out["reversion_signal"] = out["reversion_raw"].where(out["reversion_raw"] != out["reversion_raw"].shift(1), 0)
- out["eth_btc_ratio"] = out["eth_close"] / out["btc_close"]
- out["btc_6h_return"] = out["btc_close"] / out["btc_close"].shift(24) - 1.0
- out["ratio_6h_return"] = out["eth_btc_ratio"] / out["eth_btc_ratio"].shift(24) - 1.0
- out["relative_weak_raw"] = 0
- out.loc[(out["btc_6h_return"] >= 0.006) & (out["ratio_6h_return"] <= -0.010), "relative_weak_raw"] = -1
- out["relative_weak_signal"] = out["relative_weak_raw"].where(out["relative_weak_raw"] != out["relative_weak_raw"].shift(1), 0)
- out["eth_30d_return"] = out["eth_close"] / out["eth_close"].shift(96 * 30) - 1.0
- out["btc_30d_return"] = out["btc_close"] / out["btc_close"].shift(96 * 30) - 1.0
- out["eth_90d_vol"] = out["eth_ret"].rolling(96 * 90).std(ddof=0) * (96 * 365) ** 0.5
- relative_weak_style = (out["eth_30d_return"] <= out["btc_30d_return"] - 0.06) & (out["eth_90d_vol"] <= 0.75)
- trend_style = (
- (out["eth_30d_return"].abs() >= 0.12)
- | (out["btc_30d_return"].abs() >= 0.10)
- | (out["eth_90d_vol"] >= 0.88)
- )
- out["router_signal"] = out["btc_lead_signal"]
- out.loc[trend_style, "router_signal"] = out.loc[trend_style, "breakout_signal"]
- out.loc[relative_weak_style, "router_signal"] = out.loc[relative_weak_style, "relative_weak_signal"]
- out["router_regime"] = "btc_lead_momentum"
- out.loc[trend_style, "router_regime"] = "trend_or_high_vol_breakout"
- out.loc[relative_weak_style, "router_regime"] = "relative_weakness_short"
- return out
- def exit_hit(side: int, entry: float, row: object, stop_pct: float, take_pct: float) -> float | None:
- stop = entry * (1.0 - stop_pct if side == 1 else 1.0 + stop_pct)
- take = entry * (1.0 + take_pct if side == 1 else 1.0 - take_pct)
- if side == 1:
- if float(row.eth_open) <= stop or float(row.eth_open) >= take:
- return float(row.eth_open)
- if float(row.eth_low) <= stop:
- return stop
- if float(row.eth_high) >= take:
- return take
- else:
- if float(row.eth_open) >= stop or float(row.eth_open) <= take:
- return float(row.eth_open)
- if float(row.eth_high) >= stop:
- return stop
- if float(row.eth_low) <= take:
- return take
- return None
- def exit_by_rule(kind: str, side: int, row: object) -> bool:
- if kind == "reversion":
- return (side == 1 and float(row.eth_rsi2) >= 52.0) or (side == -1 and float(row.eth_rsi2) <= 48.0)
- return False
- def params_for(kind: str, row: object) -> tuple[str, int, float, float, int]:
- if kind == "router":
- if str(row.router_regime) == "relative_weakness_short":
- return "relative_weak_signal", 24, 0.007, 0.012, 96 * 90
- if str(row.router_regime) == "trend_or_high_vol_breakout":
- return "breakout_signal", 24, 0.008, 0.014, 48 + 384
- return "btc_lead_signal", 16, 0.007, 0.011, 48 + 384
- if kind == "btc_lead":
- return "btc_lead_signal", 16, 0.007, 0.011, 8
- if kind == "breakout":
- return "breakout_signal", 24, 0.008, 0.014, 48 + 384
- if kind == "reversion":
- return "reversion_signal", 16, 0.006, 0.007, 96
- if kind == "relative_weak":
- return "relative_weak_signal", 24, 0.007, 0.012, 96 * 30
- raise ValueError(f"unknown strategy kind: {kind}")
- def run_strategy(frame: pd.DataFrame, strategy: Strategy) -> tuple[pd.DataFrame, list[dict[str, object]]]:
- rows = list(frame.itertuples())
- equity = INITIAL_EQUITY
- curve = [{"ts": frame.index[0], "equity": equity}]
- trades: list[dict[str, object]] = []
- position: dict[str, object] | None = None
- start_index = 96 * 90 if strategy.kind == "router" else 48 + 384
- index = start_index
- while index < len(rows) - 1:
- row = rows[index]
- if position is None:
- signal_col, hold, stop_pct, take_pct, _ = params_for(strategy.kind, row)
- side = int(getattr(row, signal_col))
- if side == 0:
- index += 1
- continue
- entry_index = index + 1
- entry_row = rows[entry_index]
- position = {
- "side": side,
- "entry_index": entry_index,
- "entry_time": frame.index[entry_index],
- "entry": float(entry_row.eth_open),
- "hold": hold,
- "stop_pct": stop_pct,
- "take_pct": take_pct,
- "routed_kind": "breakout"
- if signal_col == "breakout_signal"
- else "relative_weak"
- if signal_col == "relative_weak_signal"
- else "reversion"
- if signal_col == "reversion_signal"
- else "btc_lead",
- "regime": getattr(row, "router_regime", "") if strategy.kind == "router" else "",
- }
- index = entry_index
- continue
- side = int(position["side"])
- entry = float(position["entry"])
- exit_price = exit_hit(side, entry, row, float(position["stop_pct"]), float(position["take_pct"]))
- held = index - int(position["entry_index"])
- if exit_price is None and exit_by_rule(str(position["routed_kind"]), side, row):
- exit_price = float(row.eth_close)
- if exit_price is None and held >= int(position["hold"]):
- exit_price = float(row.eth_close)
- if exit_price is None:
- index += 1
- continue
- gross = exit_price / entry - 1.0 if side == 1 else entry / exit_price - 1.0
- net_return = gross * LEVERAGE - ROUNDTRIP_COST_ON_MARGIN
- equity *= 1.0 + net_return
- trades.append(
- {
- "strategy": strategy.name,
- "side": "long" if side == 1 else "short",
- "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"),
- "exit_time": frame.index[index].strftime("%Y-%m-%d %H:%M"),
- "entry_price": entry,
- "exit_price": exit_price,
- "net_return": net_return,
- "routed_kind": position["routed_kind"],
- "regime": position["regime"],
- }
- )
- curve.append({"ts": frame.index[index], "equity": equity})
- position = None
- index += 1
- if pd.Timestamp(curve[-1]["ts"]) < frame.index[-1]:
- curve.append({"ts": frame.index[-1], "equity": equity})
- return pd.DataFrame(curve), trades
- def max_drawdown(equity: pd.Series) -> float:
- peak = equity.cummax()
- return float(((peak - equity) / peak).max())
- def horizon_curve(curve: pd.DataFrame, start: pd.Timestamp) -> pd.DataFrame:
- before = curve[curve["ts"] <= start]
- if len(before):
- start_equity = float(before["equity"].iloc[-1])
- after = curve[curve["ts"] > start]
- return pd.concat([pd.DataFrame([{"ts": start, "equity": start_equity}]), after], ignore_index=True)
- return curve.copy()
- def trade_stats(trades: list[dict[str, object]], start: pd.Timestamp, end: pd.Timestamp) -> dict[str, object]:
- scoped = [trade for trade in trades if pd.to_datetime(str(trade["entry_time"]), utc=True) >= start]
- returns = [float(trade["net_return"]) for trade in scoped]
- wins = [value for value in returns if value > 0.0]
- losses = [value for value in returns if value < 0.0]
- gross_profit = sum(wins)
- gross_loss = abs(sum(losses))
- months = max((end - start).total_seconds() / 86_400.0 / 30.0, 1e-9)
- return {
- "trades": len(returns),
- "trades_per_30d": len(returns) / months,
- "win_rate": len(wins) / len(returns) if returns else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- "avg_trade_return": sum(returns) / len(returns) if returns else 0.0,
- }
- def metrics(curve: pd.DataFrame, trades: list[dict[str, object]], label: str, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, object]:
- current = horizon_curve(curve, start)
- years = max((end - start).total_seconds() / 86_400.0 / 365.0, 1e-9)
- total_return = float(current["equity"].iloc[-1] / current["equity"].iloc[0] - 1.0)
- annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0
- drawdown = max_drawdown(current.set_index("ts")["equity"])
- return {
- "horizon": label,
- "start": start.strftime("%Y-%m-%d %H:%M"),
- "end": end.strftime("%Y-%m-%d %H:%M"),
- "total_return": total_return,
- "annualized_return": annualized,
- "max_drawdown": drawdown,
- "calmar": annualized / drawdown if drawdown else 0.0,
- **trade_stats(trades, start, end),
- }
- def recent_style_rows(frame: pd.DataFrame) -> list[dict[str, object]]:
- end = frame.index[-1]
- rows = []
- for label, offset in (("90d", pd.DateOffset(days=90)), ("30d", pd.DateOffset(days=30))):
- start = end - offset
- current = frame[frame.index >= start]
- eth_return = float(current["eth_close"].iloc[-1] / current["eth_close"].iloc[0] - 1.0)
- btc_return = float(current["btc_close"].iloc[-1] / current["btc_close"].iloc[0] - 1.0)
- eth_vol = float(current["eth_ret"].std(ddof=0) * (96 * 365) ** 0.5)
- corr = float(current["eth_ret"].corr(current["btc_ret"]))
- rows.append(
- {
- "window": label,
- "start": start.strftime("%Y-%m-%d %H:%M"),
- "end": end.strftime("%Y-%m-%d %H:%M"),
- "eth_return": eth_return,
- "btc_return": btc_return,
- "eth_annualized_vol": eth_vol,
- "eth_btc_corr_15m": corr,
- }
- )
- return rows
- def markdown_table(frame: pd.DataFrame) -> str:
- def cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows)
- def write_report(
- path: Path,
- command: str,
- style: pd.DataFrame,
- summary: pd.DataFrame,
- horizons: pd.DataFrame,
- regime: pd.DataFrame,
- outputs: list[Path],
- ) -> None:
- primary = summary.sort_values(["min_recent_return", "calmar"], ascending=[False, False])
- lines = [
- "# Recent Market Adaptation Exploration",
- "",
- f"Command: `{command}`",
- "",
- "Scope: local OKX ETH/BTC 15m candle cache only. No live executor changes, no deployment, no orders.",
- "",
- f"Cost model: {ROUNDTRIP_COST_ON_MARGIN:.4f} roundtrip cost on margin, leverage {LEVERAGE:g}x.",
- "",
- "## Recent 90d/30d Style",
- "",
- markdown_table(style),
- "",
- "## Fixed Strategy Set",
- "",
- markdown_table(pd.DataFrame([strategy.__dict__ for strategy in STRATEGIES])),
- "",
- "## Summary",
- "",
- markdown_table(
- primary[
- [
- "strategy",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades",
- "trades_per_30d",
- "win_rate",
- "profit_factor",
- "min_recent_return",
- ]
- ]
- ),
- "",
- "## Required Horizons",
- "",
- markdown_table(
- horizons[
- [
- "strategy",
- "horizon",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades",
- "trades_per_30d",
- "win_rate",
- "profit_factor",
- ]
- ]
- ),
- "",
- "## Router Regime Split",
- "",
- markdown_table(regime),
- "",
- "## Output Files",
- "",
- *[f"- `{output}`" for output in outputs],
- "",
- ]
- path.write_text("\n".join(lines), encoding="utf-8")
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- frame = add_signals(load_market())
- end = frame.index[-1]
- style = pd.DataFrame(recent_style_rows(frame))
- horizon_rows: list[dict[str, object]] = []
- summary_rows: list[dict[str, object]] = []
- trade_rows: list[dict[str, object]] = []
- regime_rows: list[dict[str, object]] = []
- for strategy in STRATEGIES:
- curve, trades = run_strategy(frame, strategy)
- trade_rows.extend(trades)
- rows = []
- for label, offset in HORIZONS:
- start = frame.index[0] if offset is None else max(frame.index[0], end - offset)
- row = {"strategy": strategy.name, **metrics(curve, trades, label, start, end)}
- rows.append(row)
- horizon_rows.append(row)
- full = rows[0]
- recent = [row["total_return"] for row in rows if row["horizon"] in {"3m", "30d", "14d"}]
- summary_rows.append(
- {
- "strategy": strategy.name,
- "description": strategy.description,
- **{key: value for key, value in full.items() if key not in {"horizon", "start", "end"}},
- "first_candle": frame.index[0].strftime("%Y-%m-%d %H:%M"),
- "last_candle": end.strftime("%Y-%m-%d %H:%M"),
- "min_recent_return": min(float(value) for value in recent),
- }
- )
- if strategy.kind == "router":
- router_trades = pd.DataFrame(trades)
- if len(router_trades):
- grouped = router_trades.groupby(["regime", "routed_kind"])["net_return"].agg(["count", "mean", "sum"]).reset_index()
- regime_rows.extend(grouped.rename(columns={"count": "trades", "mean": "avg_trade_return", "sum": "sum_trade_return"}).to_dict("records"))
- summary = pd.DataFrame(summary_rows).sort_values(["min_recent_return", "calmar"], ascending=[False, False])
- horizons = pd.DataFrame(horizon_rows)
- horizon_order = [label for label, _ in HORIZONS]
- horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=horizon_order, ordered=True)
- horizons = horizons.sort_values(["strategy", "horizon"])
- trades = pd.DataFrame(trade_rows)
- regime = pd.DataFrame(regime_rows) if regime_rows else pd.DataFrame(columns=["regime", "routed_kind", "trades", "avg_trade_return", "sum_trade_return"])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- style_path = args.output_dir / "recent-style.csv"
- summary_path = args.output_dir / "strategy-summary.csv"
- horizon_path = args.output_dir / "strategy-horizons.csv"
- trades_path = args.output_dir / "strategy-trades.csv"
- regime_path = args.output_dir / "router-regime-split.csv"
- json_path = args.output_dir / "summary.json"
- report_path = args.output_dir / "report.md"
- style.to_csv(style_path, index=False)
- summary.to_csv(summary_path, index=False)
- horizons.to_csv(horizon_path, index=False)
- trades.to_csv(trades_path, index=False)
- regime.to_csv(regime_path, index=False)
- outputs = [style_path, summary_path, horizon_path, trades_path, regime_path, json_path, report_path]
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --output-dir {args.output_dir.as_posix()}"
- json_path.write_text(
- json.dumps(
- {
- "command": command,
- "first_candle": frame.index[0].isoformat(),
- "last_candle": end.isoformat(),
- "cost_model": {"roundtrip_cost_on_margin": ROUNDTRIP_COST_ON_MARGIN, "leverage": LEVERAGE},
- "top_summary": summary.to_dict("records"),
- "outputs": [str(output) for output in outputs],
- },
- indent=2,
- ),
- encoding="utf-8",
- )
- write_report(report_path, command, style, summary, horizons, regime, outputs)
- print(summary.to_string(index=False))
- print(f"wrote {report_path}")
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|