|
|
@@ -0,0 +1,299 @@
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import argparse
|
|
|
+import sys
|
|
|
+from dataclasses import dataclass
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
|
|
|
+
|
|
|
+from okx_codex_trader.models import Candle
|
|
|
+from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
|
|
|
+from scripts.search_live_bb_squeeze_exit_variants import (
|
|
|
+ COSTS,
|
|
|
+ DATA_DIR,
|
|
|
+ HORIZONS,
|
|
|
+ INITIAL_EQUITY,
|
|
|
+ LEVERAGE,
|
|
|
+ OUTPUT_DIR,
|
|
|
+ PRIMARY_COST,
|
|
|
+ _format_ts,
|
|
|
+ _load_candles,
|
|
|
+ cost_equity_frame,
|
|
|
+ equity_metrics,
|
|
|
+ horizon_rows,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+BTC_SYMBOL = "BTC-USDT-SWAP"
|
|
|
+ETH_SYMBOL = "ETH-USDT-SWAP"
|
|
|
+BAR = "15m"
|
|
|
+
|
|
|
+
|
|
|
+@dataclass(frozen=True)
|
|
|
+class Variant:
|
|
|
+ crash_mode: str
|
|
|
+
|
|
|
+ @property
|
|
|
+ def name(self) -> str:
|
|
|
+ return f"live-bb-squeeze-mxbuf0.0005-crash-{self.crash_mode}"
|
|
|
+
|
|
|
+
|
|
|
+def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]:
|
|
|
+ btc_by_ts = {candle.ts: candle for candle in btc}
|
|
|
+ left: list[Candle] = []
|
|
|
+ right: list[Candle] = []
|
|
|
+ for candle in eth:
|
|
|
+ other = btc_by_ts.get(candle.ts)
|
|
|
+ if other is not None:
|
|
|
+ left.append(candle)
|
|
|
+ right.append(other)
|
|
|
+ return left, right
|
|
|
+
|
|
|
+
|
|
|
+def crash_risk_state(eth: list[Candle], btc: list[Candle]) -> list[bool]:
|
|
|
+ frame = pd.DataFrame(
|
|
|
+ {
|
|
|
+ "ts": [candle.ts for candle in eth],
|
|
|
+ "open": [candle.open for candle in eth],
|
|
|
+ "high": [candle.high for candle in eth],
|
|
|
+ "low": [candle.low for candle in eth],
|
|
|
+ "close": [candle.close for candle in eth],
|
|
|
+ "volume": [candle.volume for candle in eth],
|
|
|
+ "btc_close": [candle.close for candle in btc],
|
|
|
+ }
|
|
|
+ )
|
|
|
+ frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
|
|
|
+ hourly = (
|
|
|
+ frame.set_index("dt")
|
|
|
+ .resample("1h", label="left", closed="left")
|
|
|
+ .agg(close=("close", "last"), high=("high", "max"), low=("low", "min"), btc_close=("btc_close", "last"))
|
|
|
+ .dropna()
|
|
|
+ )
|
|
|
+ close = hourly["close"].astype(float)
|
|
|
+ high = hourly["high"].astype(float)
|
|
|
+ low = hourly["low"].astype(float)
|
|
|
+ btc_close = hourly["btc_close"].astype(float)
|
|
|
+ slow = close.ewm(span=120, adjust=False).mean()
|
|
|
+ ret = close / close.shift(8) - 1.0
|
|
|
+ range_pct = (high - low) / close
|
|
|
+ range_rank = range_pct.rolling(160).rank(pct=True)
|
|
|
+ btc_slow = btc_close.rolling(120).mean()
|
|
|
+ btc_drop = btc_close / btc_close.shift(24) - 1.0
|
|
|
+ hourly_risk = (btc_close < btc_slow) & (btc_drop < -0.015) & (close < slow) & (ret < -0.035) & (range_rank > 0.75)
|
|
|
+ risk_15m = hourly_risk.reindex(frame["dt"], method="ffill").fillna(False)
|
|
|
+ return [bool(value) for value in risk_15m.tolist()]
|
|
|
+
|
|
|
+
|
|
|
+def close_position(trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float) -> tuple[float, bool]:
|
|
|
+ margin_used = float(position["margin_used"])
|
|
|
+ exit_equity = trade_equity(
|
|
|
+ side=str(position["side"]),
|
|
|
+ margin_used=margin_used,
|
|
|
+ entry_price=float(position["entry_price"]),
|
|
|
+ exit_price=exit_price,
|
|
|
+ leverage=LEVERAGE,
|
|
|
+ )
|
|
|
+ pnl = exit_equity - margin_used
|
|
|
+ trades.append(
|
|
|
+ {
|
|
|
+ "side": "Long" if position["side"] == "long" else "Short",
|
|
|
+ "entry_time": _format_ts(int(position["entry_time"])),
|
|
|
+ "exit_time": _format_ts(candle.ts),
|
|
|
+ "entry_price": round(float(position["entry_price"]), 4),
|
|
|
+ "exit_price": round(exit_price, 4),
|
|
|
+ "pnl": round(pnl, 4),
|
|
|
+ "return_pct": round(pnl / margin_used * 100.0, 4),
|
|
|
+ "cost_weight": 1.0,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
|
|
|
+ return exit_equity, pnl > 0.0
|
|
|
+
|
|
|
+
|
|
|
+def run_variant(eth: list[Candle], crash_state: list[bool], variant: Variant) -> SegmentResult:
|
|
|
+ close = pd.Series([candle.close for candle in eth], dtype=float)
|
|
|
+ middle_series = close.rolling(48).mean()
|
|
|
+ stdev = close.rolling(48).std(ddof=0)
|
|
|
+ upper = (middle_series + 2.0 * stdev).tolist()
|
|
|
+ lower = (middle_series - 2.0 * stdev).tolist()
|
|
|
+ middle = middle_series.tolist()
|
|
|
+ bandwidth = ((pd.Series(upper) - pd.Series(lower)) / middle_series).tolist()
|
|
|
+ threshold = pd.Series(bandwidth, dtype=float).rolling(960).quantile(0.25).tolist()
|
|
|
+ eth_vol = close.pct_change().rolling(96).std(ddof=0).tolist()
|
|
|
+
|
|
|
+ equity = INITIAL_EQUITY
|
|
|
+ ending_equity = equity
|
|
|
+ peak_equity = equity
|
|
|
+ max_drawdown = 0.0
|
|
|
+ wins = 0
|
|
|
+ trades: list[dict[str, object]] = []
|
|
|
+ entries: list[dict[str, object]] = []
|
|
|
+ exits: list[dict[str, object]] = []
|
|
|
+ equity_curve: list[dict[str, float | int]] = []
|
|
|
+ position: dict[str, object] | None = None
|
|
|
+ pending_entry_side: str | None = None
|
|
|
+ pending_exit = False
|
|
|
+ middle_exit_streak = 0
|
|
|
+ cooldown_until = -1
|
|
|
+
|
|
|
+ for index in range(960, len(eth)):
|
|
|
+ candle = eth[index]
|
|
|
+ if pending_exit and position is not None:
|
|
|
+ equity, won = close_position(trades, exits, position, candle, candle.open)
|
|
|
+ wins += int(won)
|
|
|
+ position = None
|
|
|
+ pending_exit = False
|
|
|
+ middle_exit_streak = 0
|
|
|
+ cooldown_until = index + 24
|
|
|
+ if pending_entry_side is not None and position is None and equity > 0.0:
|
|
|
+ position = {
|
|
|
+ "side": pending_entry_side,
|
|
|
+ "entry_time": candle.ts,
|
|
|
+ "entry_price": candle.open,
|
|
|
+ "margin_used": equity,
|
|
|
+ "stop_price": candle.open * (0.99 if pending_entry_side == "long" else 1.01),
|
|
|
+ }
|
|
|
+ entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side})
|
|
|
+ pending_entry_side = None
|
|
|
+
|
|
|
+ current_equity = equity
|
|
|
+ if position is not None:
|
|
|
+ side = str(position["side"])
|
|
|
+ force_crash_exit = variant.crash_mode == "exit_long" and side == "long" and crash_state[index]
|
|
|
+ stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or (
|
|
|
+ side == "short" and candle.high >= float(position["stop_price"])
|
|
|
+ )
|
|
|
+ if force_crash_exit:
|
|
|
+ equity, won = close_position(trades, exits, position, candle, candle.close)
|
|
|
+ wins += int(won)
|
|
|
+ current_equity = equity
|
|
|
+ position = None
|
|
|
+ middle_exit_streak = 0
|
|
|
+ cooldown_until = index + 24
|
|
|
+ elif stop_hit:
|
|
|
+ equity, won = close_position(trades, exits, position, candle, float(position["stop_price"]))
|
|
|
+ wins += int(won)
|
|
|
+ current_equity = equity
|
|
|
+ position = None
|
|
|
+ middle_exit_streak = 0
|
|
|
+ cooldown_until = index + 24
|
|
|
+ if position is not None:
|
|
|
+ current_equity = mark_to_market(
|
|
|
+ side=str(position["side"]),
|
|
|
+ margin_used=float(position["margin_used"]),
|
|
|
+ entry_price=float(position["entry_price"]),
|
|
|
+ mark_price=candle.close,
|
|
|
+ leverage=LEVERAGE,
|
|
|
+ )
|
|
|
+
|
|
|
+ peak_equity = max(peak_equity, current_equity)
|
|
|
+ max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
|
|
|
+ equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
|
|
|
+ ending_equity = current_equity
|
|
|
+ if index == len(eth) - 1 or equity <= 0.0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], eth_vol[index])
|
|
|
+ if any(value != value for value in values):
|
|
|
+ continue
|
|
|
+ if position is not None:
|
|
|
+ middle_exit = (
|
|
|
+ position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - 0.0005)
|
|
|
+ ) or (
|
|
|
+ position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + 0.0005)
|
|
|
+ )
|
|
|
+ middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0
|
|
|
+ if middle_exit_streak >= 1:
|
|
|
+ pending_exit = True
|
|
|
+ continue
|
|
|
+ if index < cooldown_until:
|
|
|
+ continue
|
|
|
+ if float(eth_vol[index]) > 0.006:
|
|
|
+ continue
|
|
|
+ if bandwidth[index] <= threshold[index]:
|
|
|
+ skip_long = variant.crash_mode in {"skip_long", "exit_long", "skip_all"} and crash_state[index]
|
|
|
+ skip_short = variant.crash_mode == "skip_all" and crash_state[index]
|
|
|
+ if candle.close > float(upper[index]) and not skip_long:
|
|
|
+ pending_entry_side = "long"
|
|
|
+ elif candle.close < float(lower[index]) and not skip_short:
|
|
|
+ pending_entry_side = "short"
|
|
|
+
|
|
|
+ trade_count = len(trades)
|
|
|
+ return SegmentResult(
|
|
|
+ trade_count=trade_count,
|
|
|
+ total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
|
|
|
+ win_rate=wins / trade_count if trade_count else 0.0,
|
|
|
+ max_drawdown=max_drawdown,
|
|
|
+ trades=trades,
|
|
|
+ open_position=position,
|
|
|
+ candles=eth[960:],
|
|
|
+ equity_curve=equity_curve,
|
|
|
+ entries=entries,
|
|
|
+ exits=exits,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def markdown_table(frame: pd.DataFrame) -> str:
|
|
|
+ def cell(value: object) -> str:
|
|
|
+ if isinstance(value, float):
|
|
|
+ return f"{value:.6g}"
|
|
|
+ return str(value).replace("|", "\\|")
|
|
|
+
|
|
|
+ rows = [list(frame.columns), ["---" for _ in frame.columns]]
|
|
|
+ rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
|
|
|
+ return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows)
|
|
|
+
|
|
|
+
|
|
|
+def main() -> int:
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ eth, btc = align_pair(_load_candles(ETH_SYMBOL, BAR), _load_candles(BTC_SYMBOL, BAR))
|
|
|
+ crash_state = crash_risk_state(eth, btc)
|
|
|
+ variants = [Variant("none"), Variant("skip_long"), Variant("exit_long"), Variant("skip_all")]
|
|
|
+ primary_cost = dict(COSTS)[PRIMARY_COST]
|
|
|
+ rows: list[dict[str, object]] = []
|
|
|
+ horizon_detail: list[dict[str, object]] = []
|
|
|
+ for variant in variants:
|
|
|
+ result = run_variant(eth, crash_state, variant)
|
|
|
+ frame = cost_equity_frame(result, primary_cost)
|
|
|
+ metrics = equity_metrics(frame, eth[0].ts, eth[-1].ts)
|
|
|
+ rows.append(
|
|
|
+ {
|
|
|
+ "name": variant.name,
|
|
|
+ "crash_mode": variant.crash_mode,
|
|
|
+ "trades": result.trade_count,
|
|
|
+ **metrics,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ for horizon in horizon_rows(frame, eth[-1].ts):
|
|
|
+ horizon_detail.append({"name": variant.name, "crash_mode": variant.crash_mode, **horizon})
|
|
|
+
|
|
|
+ summary = pd.DataFrame(rows).sort_values(["net_calmar", "net_total_return"], ascending=[False, False])
|
|
|
+ detail = pd.DataFrame(horizon_detail)
|
|
|
+ args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ summary_path = args.output_dir / "live-bb-squeeze-crash-filter-summary.csv"
|
|
|
+ detail_path = args.output_dir / "live-bb-squeeze-crash-filter-horizon.csv"
|
|
|
+ report_path = args.output_dir / "live-bb-squeeze-crash-filter-report.md"
|
|
|
+ summary.to_csv(summary_path, index=False)
|
|
|
+ detail.to_csv(detail_path, index=False)
|
|
|
+ report_path.write_text(
|
|
|
+ "# Live BB squeeze crash-filter evaluation\n\n"
|
|
|
+ "Scope: current live BB squeeze rules plus crash-follow risk-state filters. No live path touched.\n\n"
|
|
|
+ f"Output files:\n- `{summary_path}`\n- `{detail_path}`\n- `{report_path}`\n\n"
|
|
|
+ "## Summary\n\n"
|
|
|
+ f"{markdown_table(summary)}\n\n"
|
|
|
+ "## Horizon Detail\n\n"
|
|
|
+ f"{markdown_table(detail)}\n",
|
|
|
+ encoding="utf-8",
|
|
|
+ )
|
|
|
+ print(summary.to_string(index=False))
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ raise SystemExit(main())
|