| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- from __future__ import annotations
- import argparse
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- from scripts.search_live_bb_squeeze_exit_variants import (
- COSTS,
- DATA_DIR,
- INITIAL_EQUITY,
- LEVERAGE,
- OUTPUT_DIR,
- PRIMARY_COST,
- _format_ts,
- _load_candles,
- cost_equity_frame,
- equity_metrics,
- worst_month,
- )
- ETH_SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- BAR = "15m"
- MIN_TRADES_PER_YEAR = 60.0
- RECENT_WINDOWS = ("1y", "6m", "3m")
- WINDOWS = (
- ("full", None),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Variant:
- middle_exit_buffer_pct: float
- middle_exit_confirm_bars: int
- stop_loss_pct: float
- take_profit_pct: float | None
- eth_vol_cap: float
- risk_filter: str
- max_hold_bars: int | None
- @property
- def name(self) -> str:
- take_profit = "none" if self.take_profit_pct is None else f"{self.take_profit_pct:g}"
- max_hold = "none" if self.max_hold_bars is None else str(self.max_hold_bars)
- return (
- "live-bb-squeeze-risk-exit"
- f"-mxbuf{self.middle_exit_buffer_pct:g}-mxc{self.middle_exit_confirm_bars}"
- f"-sl{self.stop_loss_pct:g}-tp{take_profit}"
- f"-vc{self.eth_vol_cap:g}-{self.risk_filter}-mh{max_hold}"
- )
- def align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]:
- right_by_ts = {candle.ts: candle for candle in right}
- left_out: list[Candle] = []
- right_out: list[Candle] = []
- for candle in left:
- other = right_by_ts.get(candle.ts)
- if other is not None:
- left_out.append(candle)
- right_out.append(other)
- return left_out, right_out
- def close_position(
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- candle: Candle,
- exit_price: float,
- reason: str,
- ) -> tuple[float, bool]:
- margin_used = float(position["margin_used"])
- exit_equity = trade_equity(
- side=str(position["side"]),
- margin_used=margin_used,
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - margin_used
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": _format_ts(int(position["entry_time"])),
- "exit_time": _format_ts(candle.ts),
- "exit_ts": candle.ts,
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / margin_used * 100.0, 4),
- "cost_weight": 1.0,
- "exit_reason": reason,
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- return exit_equity, pnl > 0.0
- def risk_filter_passes(
- *,
- variant: Variant,
- side: str,
- index: int,
- eth_close: pd.Series,
- btc_close: pd.Series,
- eth_24h_return: list[float],
- btc_24h_return: list[float],
- btc_sma: list[float],
- ) -> bool:
- if variant.risk_filter == "none":
- return True
- if variant.risk_filter == "eth_24h_not_crash":
- return float(eth_24h_return[index]) >= -0.03
- if variant.risk_filter == "btc_24h_not_crash":
- return float(btc_24h_return[index]) >= -0.02
- if variant.risk_filter == "trend_aligned":
- if side == "long":
- return float(eth_close.iloc[index]) >= float(eth_close.rolling(192).mean().iloc[index])
- return float(eth_close.iloc[index]) <= float(eth_close.rolling(192).mean().iloc[index])
- if variant.risk_filter == "btc_trend_aligned":
- if side == "long":
- return float(btc_close.iloc[index]) >= float(btc_sma[index])
- return float(btc_close.iloc[index]) <= float(btc_sma[index])
- raise ValueError(f"unknown risk filter: {variant.risk_filter}")
- def run_variant(eth: list[Candle], btc: list[Candle], variant: Variant) -> SegmentResult:
- eth_close = pd.Series([candle.close for candle in eth], dtype=float)
- btc_close = pd.Series([candle.close for candle in btc], dtype=float)
- middle_series = eth_close.rolling(48).mean()
- stdev = eth_close.rolling(48).std(ddof=0)
- upper = (middle_series + 2.0 * stdev).tolist()
- lower = (middle_series - 2.0 * stdev).tolist()
- middle = middle_series.tolist()
- bandwidth = ((pd.Series(upper) - pd.Series(lower)) / middle_series).tolist()
- threshold = pd.Series(bandwidth, dtype=float).rolling(960).quantile(0.25).tolist()
- eth_vol = eth_close.pct_change().rolling(96).std(ddof=0).tolist()
- eth_24h_return = (eth_close / eth_close.shift(96) - 1.0).tolist()
- btc_24h_return = (btc_close / btc_close.shift(96) - 1.0).tolist()
- btc_sma = btc_close.rolling(480).mean().tolist()
- equity = INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown_mark = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_entry_side: str | None = None
- pending_exit_reason: str | None = None
- middle_exit_streak = 0
- cooldown_until = -1
- for index in range(960, len(eth)):
- candle = eth[index]
- if pending_exit_reason is not None and position is not None:
- equity, won = close_position(trades, exits, position, candle, candle.open, pending_exit_reason)
- wins += int(won)
- position = None
- pending_exit_reason = None
- middle_exit_streak = 0
- cooldown_until = index + 24
- if pending_entry_side is not None and position is None and equity > 0.0:
- position = {
- "side": pending_entry_side,
- "entry_time": candle.ts,
- "entry_index": index,
- "entry_price": candle.open,
- "margin_used": equity,
- "stop_price": candle.open * (1.0 - variant.stop_loss_pct if pending_entry_side == "long" else 1.0 + variant.stop_loss_pct),
- "take_price": None
- if variant.take_profit_pct is None
- else candle.open * (1.0 + variant.take_profit_pct if pending_entry_side == "long" else 1.0 - variant.take_profit_pct),
- }
- entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side})
- pending_entry_side = None
- current_equity = equity
- if position is not None:
- side = str(position["side"])
- stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or (
- side == "short" and candle.high >= float(position["stop_price"])
- )
- take_price = position["take_price"]
- take_hit = take_price is not None and (
- (side == "long" and candle.high >= float(take_price)) or (side == "short" and candle.low <= float(take_price))
- )
- if stop_hit or take_hit:
- exit_price = float(position["stop_price"] if stop_hit else take_price)
- equity, won = close_position(trades, exits, position, candle, exit_price, "stop" if stop_hit else "take_profit")
- wins += int(won)
- current_equity = equity
- position = None
- middle_exit_streak = 0
- cooldown_until = index + 24
- if position is not None:
- current_equity = mark_to_market(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown_mark = max(max_drawdown_mark, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(eth) - 1 or equity <= 0.0:
- continue
- values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], eth_vol[index])
- if any(value != value for value in values):
- continue
- if position is not None:
- middle_exit = (
- position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct)
- ) or (
- position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct)
- )
- middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0
- max_hold_exit = variant.max_hold_bars is not None and index - int(position["entry_index"]) >= variant.max_hold_bars
- if middle_exit_streak >= variant.middle_exit_confirm_bars:
- pending_exit_reason = "middle_exit"
- elif max_hold_exit:
- pending_exit_reason = "max_hold"
- continue
- if index < cooldown_until:
- continue
- if float(eth_vol[index]) > variant.eth_vol_cap:
- continue
- if bandwidth[index] <= threshold[index]:
- if candle.close > float(upper[index]) and risk_filter_passes(
- variant=variant,
- side="long",
- index=index,
- eth_close=eth_close,
- btc_close=btc_close,
- eth_24h_return=eth_24h_return,
- btc_24h_return=btc_24h_return,
- btc_sma=btc_sma,
- ):
- pending_entry_side = "long"
- elif candle.close < float(lower[index]) and risk_filter_passes(
- variant=variant,
- side="short",
- index=index,
- eth_close=eth_close,
- btc_close=btc_close,
- eth_24h_return=eth_24h_return,
- btc_24h_return=btc_24h_return,
- btc_sma=btc_sma,
- ):
- pending_entry_side = "short"
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
- win_rate=wins / trade_count if trade_count else 0.0,
- max_drawdown=max_drawdown_mark,
- trades=trades,
- open_position=position,
- candles=eth[960:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def build_variants() -> list[Variant]:
- variants: list[Variant] = []
- for buffer in (0.0, 0.0005, 0.001, 0.0015):
- for confirm in (1, 2, 3):
- variants.append(Variant(buffer, confirm, 0.01, None, 0.006, "none", None))
- for buffer, confirm in ((0.0005, 1), (0.001, 1), (0.001, 3), (0.0015, 2)):
- for stop_loss in (0.008, 0.01, 0.012):
- for take_profit in (None, 0.02, 0.03):
- variants.append(Variant(buffer, confirm, stop_loss, take_profit, 0.006, "none", None))
- for buffer, confirm in ((0.0005, 1), (0.001, 1), (0.001, 3), (0.0015, 2)):
- for risk_filter in ("eth_24h_not_crash", "btc_24h_not_crash", "trend_aligned", "btc_trend_aligned"):
- variants.append(Variant(buffer, confirm, 0.01, None, 0.006, risk_filter, None))
- variants.append(Variant(buffer, confirm, 0.01, None, 0.005, risk_filter, None))
- for buffer, confirm in ((0.0005, 1), (0.001, 1), (0.0015, 2)):
- for max_hold in (96, 192):
- variants.append(Variant(buffer, confirm, 0.01, None, 0.006, "none", max_hold))
- return sorted(set(variants), key=lambda variant: variant.name)
- def window_frame(frame: pd.DataFrame, label: str, offset: pd.DateOffset | None, last_ts: int) -> tuple[pd.DataFrame, int]:
- if offset is None:
- return frame[["ts", "equity"]].copy(), int(pd.Timestamp(frame["ts"].iloc[0]).timestamp() * 1000)
- end_time = pd.to_datetime(last_ts, unit="ms", utc=True)
- cutoff = end_time - offset
- before = frame[frame["ts"] <= cutoff]
- if len(before):
- start_equity = float(before["equity"].iloc[-1])
- after = frame[frame["ts"] > cutoff]
- scoped = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True)
- else:
- scoped = frame[["ts", "equity"]].copy()
- return scoped, int(pd.Timestamp(scoped["ts"].iloc[0]).timestamp() * 1000)
- def trade_count_in_window(trades: list[dict[str, object]], start_ts: int, last_ts: int) -> int:
- return sum(1 for trade in trades if start_ts < int(trade["exit_ts"]) <= last_ts)
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def write_report(
- *,
- summary: pd.DataFrame,
- windows: pd.DataFrame,
- output_files: list[Path],
- command: str,
- first_ts: int,
- last_ts: int,
- ) -> str:
- baseline_name = "live-bb-squeeze-risk-exit-mxbuf0.0005-mxc1-sl0.01-tpnone-vc0.006-none-mhnone"
- baseline = windows[windows["name"] == baseline_name].copy()
- eligible = summary[summary["eligible_frequency"]].copy()
- recent_rank = eligible.sort_values(
- ["recent_min_calmar", "recent_min_return", "full_net_max_drawdown", "full_trades_per_year"],
- ascending=[False, False, True, False],
- ).head(10)
- full_rank = eligible.sort_values(
- ["full_net_calmar", "recent_min_calmar", "full_net_max_drawdown"],
- ascending=[False, False, True],
- ).head(10)
- baseline_pivot = baseline[
- ["window", "window_trades", "trades_per_year", "net_total_return", "net_max_drawdown", "net_calmar"]
- ]
- candidate = recent_rank.iloc[0] if len(recent_rank) else None
- replace = "No"
- reason = "No frequency-eligible variant was found."
- if candidate is not None:
- candidate_windows = windows[windows["name"] == candidate["name"]].set_index("window")
- baseline_windows = baseline.set_index("window")
- recent_better = all(
- float(candidate_windows.loc[window, "net_calmar"]) > float(baseline_windows.loc[window, "net_calmar"])
- and float(candidate_windows.loc[window, "net_total_return"]) >= float(baseline_windows.loc[window, "net_total_return"])
- for window in RECENT_WINDOWS
- )
- full_not_worse = float(candidate["full_net_max_drawdown"]) <= float(summary.loc[summary["name"] == baseline_name, "full_net_max_drawdown"].iloc[0])
- if recent_better and full_not_worse:
- replace = "Yes, for paper observation only"
- reason = "Best eligible candidate improved all recent windows without increasing full-window closed-equity drawdown."
- else:
- reason = "Best eligible candidate did not clear every recent window versus the current live parameter set."
- return (
- "# Live BB squeeze recent risk/exit exploration\n\n"
- "Scope: offline local-candle research only. Live executor, deploy files, and order paths were not changed.\n\n"
- f"Run command: `{command}`\n"
- f"Local aligned history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.\n"
- f"Primary cost model: `{PRIMARY_COST}` from existing BB squeeze evaluation helpers.\n"
- f"Frequency rule: eligible variants require at least {MIN_TRADES_PER_YEAR:g} closed trades/year in full, 1y, 6m, and 3m windows.\n\n"
- "Output files:\n"
- + "\n".join(f"- `{path}`" for path in output_files)
- + "\n\n"
- "## Current live baseline\n\n"
- + markdown_table(baseline_pivot)
- + "\n\n"
- "## Top recent-ranked eligible variants\n\n"
- + markdown_table(
- recent_rank[
- [
- "name",
- "full_trades_per_year",
- "recent_min_return",
- "recent_min_calmar",
- "full_net_total_return",
- "full_net_max_drawdown",
- "full_net_calmar",
- ]
- ]
- )
- + "\n\n"
- "## Top full-window eligible variants\n\n"
- + markdown_table(
- full_rank[
- [
- "name",
- "full_trades_per_year",
- "full_net_total_return",
- "full_net_max_drawdown",
- "full_net_calmar",
- "recent_min_return",
- "recent_min_calmar",
- ]
- ]
- )
- + "\n\n"
- "## Replacement verdict\n\n"
- f"- Recommend replacing live strategy now: {replace}.\n"
- f"- Reason: {reason}\n"
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bar", default=BAR)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- eth, btc = align_pair(_load_candles(ETH_SYMBOL, args.bar), _load_candles(BTC_SYMBOL, args.bar))
- cost = dict(COSTS)[PRIMARY_COST]
- summary_rows: list[dict[str, object]] = []
- window_rows: list[dict[str, object]] = []
- variants = build_variants()
- for index, variant in enumerate(variants, start=1):
- result = run_variant(eth, btc, variant)
- frame = cost_equity_frame(result, cost)
- month, month_return = worst_month(frame)
- per_window: dict[str, dict[str, float | int | str]] = {}
- for label, offset in WINDOWS:
- scoped_frame, start_ts = window_frame(frame, label, offset, eth[-1].ts)
- metrics = equity_metrics(scoped_frame, start_ts, eth[-1].ts)
- trades = trade_count_in_window(result.trades, start_ts, eth[-1].ts)
- years = (eth[-1].ts - start_ts) / 86_400_000 / 365
- row = {
- "cost": PRIMARY_COST,
- "symbol": ETH_SYMBOL,
- "bar": args.bar,
- "name": variant.name,
- "window": label,
- "window_start": pd.to_datetime(start_ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M"),
- "window_end": _format_ts(eth[-1].ts),
- "window_trades": trades,
- "trades_per_year": trades / years if years > 0.0 else 0.0,
- **metrics,
- }
- window_rows.append(row)
- per_window[label] = row
- eligible_frequency = all(float(per_window[label]["trades_per_year"]) >= MIN_TRADES_PER_YEAR for label, _ in WINDOWS)
- recent_returns = [float(per_window[label]["net_total_return"]) for label in RECENT_WINDOWS]
- recent_calmars = [float(per_window[label]["net_calmar"]) for label in RECENT_WINDOWS]
- summary_rows.append(
- {
- "cost": PRIMARY_COST,
- "symbol": ETH_SYMBOL,
- "bar": args.bar,
- "name": variant.name,
- "middle_exit_buffer_pct": variant.middle_exit_buffer_pct,
- "middle_exit_confirm_bars": variant.middle_exit_confirm_bars,
- "stop_loss_pct": variant.stop_loss_pct,
- "take_profit_pct": variant.take_profit_pct,
- "eth_vol_cap": variant.eth_vol_cap,
- "risk_filter": variant.risk_filter,
- "max_hold_bars": variant.max_hold_bars,
- "first_candle": _format_ts(eth[0].ts),
- "last_candle": _format_ts(eth[-1].ts),
- "trades": result.trade_count,
- "full_trades_per_year": float(per_window["full"]["trades_per_year"]),
- "eligible_frequency": eligible_frequency,
- "gross_total_return": result.total_return,
- "gross_max_drawdown_mark_to_market": result.max_drawdown,
- "worst_month": month,
- "worst_month_return": month_return,
- "recent_min_return": min(recent_returns),
- "recent_min_calmar": min(recent_calmars),
- "recent_avg_return": sum(recent_returns) / len(recent_returns),
- "full_net_total_return": per_window["full"]["net_total_return"],
- "full_net_annualized_return": per_window["full"]["net_annualized_return"],
- "full_net_max_drawdown": per_window["full"]["net_max_drawdown"],
- "full_net_calmar": per_window["full"]["net_calmar"],
- }
- )
- print(f"done {index}/{len(variants)} {variant.name}")
- summary = pd.DataFrame(summary_rows).sort_values(
- ["eligible_frequency", "recent_min_calmar", "recent_min_return", "full_net_calmar"],
- ascending=[False, False, False, False],
- )
- windows = pd.DataFrame(window_rows)
- windows["window"] = pd.Categorical(windows["window"], categories=[label for label, _ in WINDOWS], ordered=True)
- windows = windows.sort_values(["window", "net_calmar", "net_total_return"], ascending=[True, False, False])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- prefix = "live-bb-squeeze-recent-risk-exit"
- summary_path = args.output_dir / f"{prefix}-summary.csv"
- window_path = args.output_dir / f"{prefix}-windows.csv"
- report_path = args.output_dir / f"{prefix}-report.md"
- output_files = [summary_path, window_path, report_path]
- summary.to_csv(summary_path, index=False)
- windows.to_csv(window_path, index=False)
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --output-dir {args.output_dir.as_posix()}"
- report_path.write_text(
- write_report(summary=summary, windows=windows, output_files=output_files, command=command, first_ts=eth[0].ts, last_ts=eth[-1].ts),
- encoding="utf-8",
- )
- print(summary.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|