from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts.search_live_bb_squeeze_exit_variants import ( COSTS, DATA_DIR, INITIAL_EQUITY, LEVERAGE, OUTPUT_DIR, PRIMARY_COST, _format_ts, _load_candles, cost_equity_frame, equity_metrics, worst_month, ) ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" MIN_TRADES_PER_YEAR = 60.0 RECENT_WINDOWS = ("1y", "6m", "3m") WINDOWS = ( ("full", None), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Variant: middle_exit_buffer_pct: float middle_exit_confirm_bars: int stop_loss_pct: float take_profit_pct: float | None eth_vol_cap: float risk_filter: str max_hold_bars: int | None @property def name(self) -> str: take_profit = "none" if self.take_profit_pct is None else f"{self.take_profit_pct:g}" max_hold = "none" if self.max_hold_bars is None else str(self.max_hold_bars) return ( "live-bb-squeeze-risk-exit" f"-mxbuf{self.middle_exit_buffer_pct:g}-mxc{self.middle_exit_confirm_bars}" f"-sl{self.stop_loss_pct:g}-tp{take_profit}" f"-vc{self.eth_vol_cap:g}-{self.risk_filter}-mh{max_hold}" ) def align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_out: list[Candle] = [] right_out: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is not None: left_out.append(candle) right_out.append(other) return left_out, right_out def close_position( trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "exit_ts": candle.ts, "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, "exit_reason": reason, } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def risk_filter_passes( *, variant: Variant, side: str, index: int, eth_close: pd.Series, btc_close: pd.Series, eth_24h_return: list[float], btc_24h_return: list[float], btc_sma: list[float], ) -> bool: if variant.risk_filter == "none": return True if variant.risk_filter == "eth_24h_not_crash": return float(eth_24h_return[index]) >= -0.03 if variant.risk_filter == "btc_24h_not_crash": return float(btc_24h_return[index]) >= -0.02 if variant.risk_filter == "trend_aligned": if side == "long": return float(eth_close.iloc[index]) >= float(eth_close.rolling(192).mean().iloc[index]) return float(eth_close.iloc[index]) <= float(eth_close.rolling(192).mean().iloc[index]) if variant.risk_filter == "btc_trend_aligned": if side == "long": return float(btc_close.iloc[index]) >= float(btc_sma[index]) return float(btc_close.iloc[index]) <= float(btc_sma[index]) raise ValueError(f"unknown risk filter: {variant.risk_filter}") def run_variant(eth: list[Candle], btc: list[Candle], variant: Variant) -> SegmentResult: eth_close = pd.Series([candle.close for candle in eth], dtype=float) btc_close = pd.Series([candle.close for candle in btc], dtype=float) middle_series = eth_close.rolling(48).mean() stdev = eth_close.rolling(48).std(ddof=0) upper = (middle_series + 2.0 * stdev).tolist() lower = (middle_series - 2.0 * stdev).tolist() middle = middle_series.tolist() bandwidth = ((pd.Series(upper) - pd.Series(lower)) / middle_series).tolist() threshold = pd.Series(bandwidth, dtype=float).rolling(960).quantile(0.25).tolist() eth_vol = eth_close.pct_change().rolling(96).std(ddof=0).tolist() eth_24h_return = (eth_close / eth_close.shift(96) - 1.0).tolist() btc_24h_return = (btc_close / btc_close.shift(96) - 1.0).tolist() btc_sma = btc_close.rolling(480).mean().tolist() equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown_mark = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit_reason: str | None = None middle_exit_streak = 0 cooldown_until = -1 for index in range(960, len(eth)): candle = eth[index] if pending_exit_reason is not None and position is not None: equity, won = close_position(trades, exits, position, candle, candle.open, pending_exit_reason) wins += int(won) position = None pending_exit_reason = None middle_exit_streak = 0 cooldown_until = index + 24 if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_index": index, "entry_price": candle.open, "margin_used": equity, "stop_price": candle.open * (1.0 - variant.stop_loss_pct if pending_entry_side == "long" else 1.0 + variant.stop_loss_pct), "take_price": None if variant.take_profit_pct is None else candle.open * (1.0 + variant.take_profit_pct if pending_entry_side == "long" else 1.0 - variant.take_profit_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: side = str(position["side"]) stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or ( side == "short" and candle.high >= float(position["stop_price"]) ) take_price = position["take_price"] take_hit = take_price is not None and ( (side == "long" and candle.high >= float(take_price)) or (side == "short" and candle.low <= float(take_price)) ) if stop_hit or take_hit: exit_price = float(position["stop_price"] if stop_hit else take_price) equity, won = close_position(trades, exits, position, candle, exit_price, "stop" if stop_hit else "take_profit") wins += int(won) current_equity = equity position = None middle_exit_streak = 0 cooldown_until = index + 24 if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown_mark = max(max_drawdown_mark, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], eth_vol[index]) if any(value != value for value in values): continue if position is not None: middle_exit = ( position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct) ) or ( position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct) ) middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 max_hold_exit = variant.max_hold_bars is not None and index - int(position["entry_index"]) >= variant.max_hold_bars if middle_exit_streak >= variant.middle_exit_confirm_bars: pending_exit_reason = "middle_exit" elif max_hold_exit: pending_exit_reason = "max_hold" continue if index < cooldown_until: continue if float(eth_vol[index]) > variant.eth_vol_cap: continue if bandwidth[index] <= threshold[index]: if candle.close > float(upper[index]) and risk_filter_passes( variant=variant, side="long", index=index, eth_close=eth_close, btc_close=btc_close, eth_24h_return=eth_24h_return, btc_24h_return=btc_24h_return, btc_sma=btc_sma, ): pending_entry_side = "long" elif candle.close < float(lower[index]) and risk_filter_passes( variant=variant, side="short", index=index, eth_close=eth_close, btc_close=btc_close, eth_24h_return=eth_24h_return, btc_24h_return=btc_24h_return, btc_sma=btc_sma, ): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown_mark, trades=trades, open_position=position, candles=eth[960:], equity_curve=equity_curve, entries=entries, exits=exits, ) def build_variants() -> list[Variant]: variants: list[Variant] = [] for buffer in (0.0, 0.0005, 0.001, 0.0015): for confirm in (1, 2, 3): variants.append(Variant(buffer, confirm, 0.01, None, 0.006, "none", None)) for buffer, confirm in ((0.0005, 1), (0.001, 1), (0.001, 3), (0.0015, 2)): for stop_loss in (0.008, 0.01, 0.012): for take_profit in (None, 0.02, 0.03): variants.append(Variant(buffer, confirm, stop_loss, take_profit, 0.006, "none", None)) for buffer, confirm in ((0.0005, 1), (0.001, 1), (0.001, 3), (0.0015, 2)): for risk_filter in ("eth_24h_not_crash", "btc_24h_not_crash", "trend_aligned", "btc_trend_aligned"): variants.append(Variant(buffer, confirm, 0.01, None, 0.006, risk_filter, None)) variants.append(Variant(buffer, confirm, 0.01, None, 0.005, risk_filter, None)) for buffer, confirm in ((0.0005, 1), (0.001, 1), (0.0015, 2)): for max_hold in (96, 192): variants.append(Variant(buffer, confirm, 0.01, None, 0.006, "none", max_hold)) return sorted(set(variants), key=lambda variant: variant.name) def window_frame(frame: pd.DataFrame, label: str, offset: pd.DateOffset | None, last_ts: int) -> tuple[pd.DataFrame, int]: if offset is None: return frame[["ts", "equity"]].copy(), int(pd.Timestamp(frame["ts"].iloc[0]).timestamp() * 1000) end_time = pd.to_datetime(last_ts, unit="ms", utc=True) cutoff = end_time - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) after = frame[frame["ts"] > cutoff] scoped = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True) else: scoped = frame[["ts", "equity"]].copy() return scoped, int(pd.Timestamp(scoped["ts"].iloc[0]).timestamp() * 1000) def trade_count_in_window(trades: list[dict[str, object]], start_ts: int, last_ts: int) -> int: return sum(1 for trade in trades if start_ts < int(trade["exit_ts"]) <= last_ts) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report( *, summary: pd.DataFrame, windows: pd.DataFrame, output_files: list[Path], command: str, first_ts: int, last_ts: int, ) -> str: baseline_name = "live-bb-squeeze-risk-exit-mxbuf0.0005-mxc1-sl0.01-tpnone-vc0.006-none-mhnone" baseline = windows[windows["name"] == baseline_name].copy() eligible = summary[summary["eligible_frequency"]].copy() recent_rank = eligible.sort_values( ["recent_min_calmar", "recent_min_return", "full_net_max_drawdown", "full_trades_per_year"], ascending=[False, False, True, False], ).head(10) full_rank = eligible.sort_values( ["full_net_calmar", "recent_min_calmar", "full_net_max_drawdown"], ascending=[False, False, True], ).head(10) baseline_pivot = baseline[ ["window", "window_trades", "trades_per_year", "net_total_return", "net_max_drawdown", "net_calmar"] ] candidate = recent_rank.iloc[0] if len(recent_rank) else None replace = "No" reason = "No frequency-eligible variant was found." if candidate is not None: candidate_windows = windows[windows["name"] == candidate["name"]].set_index("window") baseline_windows = baseline.set_index("window") recent_better = all( float(candidate_windows.loc[window, "net_calmar"]) > float(baseline_windows.loc[window, "net_calmar"]) and float(candidate_windows.loc[window, "net_total_return"]) >= float(baseline_windows.loc[window, "net_total_return"]) for window in RECENT_WINDOWS ) full_not_worse = float(candidate["full_net_max_drawdown"]) <= float(summary.loc[summary["name"] == baseline_name, "full_net_max_drawdown"].iloc[0]) if recent_better and full_not_worse: replace = "Yes, for paper observation only" reason = "Best eligible candidate improved all recent windows without increasing full-window closed-equity drawdown." else: reason = "Best eligible candidate did not clear every recent window versus the current live parameter set." return ( "# Live BB squeeze recent risk/exit exploration\n\n" "Scope: offline local-candle research only. Live executor, deploy files, and order paths were not changed.\n\n" f"Run command: `{command}`\n" f"Local aligned history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.\n" f"Primary cost model: `{PRIMARY_COST}` from existing BB squeeze evaluation helpers.\n" f"Frequency rule: eligible variants require at least {MIN_TRADES_PER_YEAR:g} closed trades/year in full, 1y, 6m, and 3m windows.\n\n" "Output files:\n" + "\n".join(f"- `{path}`" for path in output_files) + "\n\n" "## Current live baseline\n\n" + markdown_table(baseline_pivot) + "\n\n" "## Top recent-ranked eligible variants\n\n" + markdown_table( recent_rank[ [ "name", "full_trades_per_year", "recent_min_return", "recent_min_calmar", "full_net_total_return", "full_net_max_drawdown", "full_net_calmar", ] ] ) + "\n\n" "## Top full-window eligible variants\n\n" + markdown_table( full_rank[ [ "name", "full_trades_per_year", "full_net_total_return", "full_net_max_drawdown", "full_net_calmar", "recent_min_return", "recent_min_calmar", ] ] ) + "\n\n" "## Replacement verdict\n\n" f"- Recommend replacing live strategy now: {replace}.\n" f"- Reason: {reason}\n" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth, btc = align_pair(_load_candles(ETH_SYMBOL, args.bar), _load_candles(BTC_SYMBOL, args.bar)) cost = dict(COSTS)[PRIMARY_COST] summary_rows: list[dict[str, object]] = [] window_rows: list[dict[str, object]] = [] variants = build_variants() for index, variant in enumerate(variants, start=1): result = run_variant(eth, btc, variant) frame = cost_equity_frame(result, cost) month, month_return = worst_month(frame) per_window: dict[str, dict[str, float | int | str]] = {} for label, offset in WINDOWS: scoped_frame, start_ts = window_frame(frame, label, offset, eth[-1].ts) metrics = equity_metrics(scoped_frame, start_ts, eth[-1].ts) trades = trade_count_in_window(result.trades, start_ts, eth[-1].ts) years = (eth[-1].ts - start_ts) / 86_400_000 / 365 row = { "cost": PRIMARY_COST, "symbol": ETH_SYMBOL, "bar": args.bar, "name": variant.name, "window": label, "window_start": pd.to_datetime(start_ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M"), "window_end": _format_ts(eth[-1].ts), "window_trades": trades, "trades_per_year": trades / years if years > 0.0 else 0.0, **metrics, } window_rows.append(row) per_window[label] = row eligible_frequency = all(float(per_window[label]["trades_per_year"]) >= MIN_TRADES_PER_YEAR for label, _ in WINDOWS) recent_returns = [float(per_window[label]["net_total_return"]) for label in RECENT_WINDOWS] recent_calmars = [float(per_window[label]["net_calmar"]) for label in RECENT_WINDOWS] summary_rows.append( { "cost": PRIMARY_COST, "symbol": ETH_SYMBOL, "bar": args.bar, "name": variant.name, "middle_exit_buffer_pct": variant.middle_exit_buffer_pct, "middle_exit_confirm_bars": variant.middle_exit_confirm_bars, "stop_loss_pct": variant.stop_loss_pct, "take_profit_pct": variant.take_profit_pct, "eth_vol_cap": variant.eth_vol_cap, "risk_filter": variant.risk_filter, "max_hold_bars": variant.max_hold_bars, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "trades": result.trade_count, "full_trades_per_year": float(per_window["full"]["trades_per_year"]), "eligible_frequency": eligible_frequency, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month": month, "worst_month_return": month_return, "recent_min_return": min(recent_returns), "recent_min_calmar": min(recent_calmars), "recent_avg_return": sum(recent_returns) / len(recent_returns), "full_net_total_return": per_window["full"]["net_total_return"], "full_net_annualized_return": per_window["full"]["net_annualized_return"], "full_net_max_drawdown": per_window["full"]["net_max_drawdown"], "full_net_calmar": per_window["full"]["net_calmar"], } ) print(f"done {index}/{len(variants)} {variant.name}") summary = pd.DataFrame(summary_rows).sort_values( ["eligible_frequency", "recent_min_calmar", "recent_min_return", "full_net_calmar"], ascending=[False, False, False, False], ) windows = pd.DataFrame(window_rows) windows["window"] = pd.Categorical(windows["window"], categories=[label for label, _ in WINDOWS], ordered=True) windows = windows.sort_values(["window", "net_calmar", "net_total_return"], ascending=[True, False, False]) args.output_dir.mkdir(parents=True, exist_ok=True) prefix = "live-bb-squeeze-recent-risk-exit" summary_path = args.output_dir / f"{prefix}-summary.csv" window_path = args.output_dir / f"{prefix}-windows.csv" report_path = args.output_dir / f"{prefix}-report.md" output_files = [summary_path, window_path, report_path] summary.to_csv(summary_path, index=False) windows.to_csv(window_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --output-dir {args.output_dir.as_posix()}" report_path.write_text( write_report(summary=summary, windows=windows, output_files=output_files, command=command, first_ts=eth[0].ts, last_ts=eth[-1].ts), encoding="utf-8", ) print(summary.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())