from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.candles import align_candles_by_ts, load_candles_csv from okx_codex_trader.models import Candle from okx_codex_trader.research_metrics import ( DEFAULT_INITIAL_EQUITY, DEFAULT_PRIMARY_COST, equity_metrics, format_utc_ts, max_drawdown, trade_stats, ) from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from okx_codex_trader.time_rules import entry_allowed, is_us_open_window ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = DEFAULT_INITIAL_EQUITY DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PRIMARY_COST = DEFAULT_PRIMARY_COST PRIMARY_COST_RATE = 0.0021 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("4w", pd.DateOffset(weeks=4)), ) @dataclass(frozen=True) class FastFailRule: kind: str bars: int threshold_pct: float | None = None @property def label(self) -> str: if self.kind == "none": return "baseline" if self.kind == "band_reclaim": return f"band-reclaim-n{self.bars}" threshold = 0.0 if self.threshold_pct is None else self.threshold_pct return f"entry-adverse-n{self.bars}-x{threshold:g}" @dataclass(frozen=True) class Variant: band_length: int = 96 bandwidth_lookback: int = 960 bandwidth_quantile: float = 0.25 stop_loss_pct: float = 0.01 reward_risk: float = 3.0 exit_mode: str = "hybrid_signal_rr" side_mode: str = "both" btc_filter: str = "btc-up" eth_vol_cap: float = 0.006 dd_overlay: float = 0.25 cooldown_bars: int = 24 middle_exit_buffer_pct: float = 0.001 middle_exit_confirm_bars: int = 1 fast_fail: FastFailRule = FastFailRule("none", 0) @property def take_profit_pct(self) -> float: return self.stop_loss_pct * self.reward_risk @property def name(self) -> str: return ( f"bb-squeeze-rr-l{self.band_length}-bw{self.bandwidth_lookback}" f"-q{self.bandwidth_quantile:g}-sl{self.stop_loss_pct:g}-rr{self.reward_risk:g}" f"-{self.exit_mode}-{self.side_mode}-{self.btc_filter}-vc{self.eth_vol_cap:g}" f"-dd{self.dd_overlay:g}-cd{self.cooldown_bars}-mxbuf{self.middle_exit_buffer_pct:g}" f"-mxc{self.middle_exit_confirm_bars}-{self.fast_fail.label}" ) def _format_ts(ts: int) -> str: return format_utc_ts(ts) def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_ts": int(position["entry_time"]), "exit_ts": candle.ts, "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, "exit_reason": reason, "bars_held": int(position["bars_held"]), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def risk_exit_price(position: dict[str, object], candle: Candle) -> tuple[float, str] | None: side = str(position["side"]) stop = float(position["stop_price"]) take = float(position["take_price"]) if side == "long": if candle.open <= stop: return candle.open, "stop_gap" if candle.open >= take: return candle.open, "take_gap" stop_hit = candle.low <= stop take_hit = candle.high >= take else: if candle.open >= stop: return candle.open, "stop_gap" if candle.open <= take: return candle.open, "take_gap" stop_hit = candle.high >= stop take_hit = candle.low <= take if stop_hit: return stop, "stop" if take_hit: return take, "take_profit" return None def fast_fail_exit_price( *, position: dict[str, object], candle: Candle, upper: float, lower: float, rule: FastFailRule, ) -> tuple[float, str] | None: if rule.kind == "none" or int(position["bars_held"]) > rule.bars: return None side = str(position["side"]) entry_price = float(position["entry_price"]) if rule.kind == "band_reclaim": if side == "long" and candle.close < upper: return candle.close, "fast_fail_band" if side == "short" and candle.close > lower: return candle.close, "fast_fail_band" return None threshold = 0.0 if rule.threshold_pct is None else rule.threshold_pct if side == "long" and candle.close < entry_price * (1.0 - threshold): return candle.close, "fast_fail_price" if side == "short" and candle.close > entry_price * (1.0 + threshold): return candle.close, "fast_fail_price" return None def run_variant(eth: list[Candle], btc: list[Candle], variant: Variant) -> tuple[SegmentResult, dict[str, int]]: eth_close = pd.Series([candle.close for candle in eth], dtype=float) btc_close = pd.Series([candle.close for candle in btc], dtype=float) middle_series = eth_close.rolling(variant.band_length).mean() stdev_series = eth_close.rolling(variant.band_length).std(ddof=0) upper_values = middle_series + 2.0 * stdev_series lower_values = middle_series - 2.0 * stdev_series middle = middle_series.tolist() upper = upper_values.tolist() lower = lower_values.tolist() bandwidth = ((upper_values - lower_values) / middle_series).tolist() threshold = pd.Series(bandwidth, dtype=float).rolling(variant.bandwidth_lookback).quantile(variant.bandwidth_quantile).tolist() btc_sma = btc_close.rolling(480).mean().tolist() eth_realized_vol = eth_close.pct_change().rolling(96).std(ddof=0).tolist() warmup_bars = max(variant.band_length, variant.bandwidth_lookback, 480, 96) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity gross_max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False middle_exit_streak = 0 cooldown_until = -1 exit_counts = { "stop_exits": 0, "take_profit_exits": 0, "signal_exits": 0, "fast_fail_exits": 0, "fast_fail_band_exits": 0, "fast_fail_price_exits": 0, } for index in range(warmup_bars, len(eth)): candle = eth[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, reason="signal_middle", ) wins += int(won) exit_counts["signal_exits"] += 1 position = None pending_exit = False middle_exit_streak = 0 cooldown_until = index + variant.cooldown_bars if pending_entry_side is not None and position is None and equity > 0.0: entry_price = candle.open position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": entry_price, "margin_used": equity, "stop_price": entry_price * (1.0 - variant.stop_loss_pct if pending_entry_side == "long" else 1.0 + variant.stop_loss_pct), "take_price": entry_price * (1.0 + variant.take_profit_pct if pending_entry_side == "long" else 1.0 - variant.take_profit_pct), "bars_held": 1, } entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: risk_exit = risk_exit_price(position, candle) if risk_exit is not None: exit_price, reason = risk_exit equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, reason=reason, ) wins += int(won) if reason.startswith("stop"): exit_counts["stop_exits"] += 1 else: exit_counts["take_profit_exits"] += 1 current_equity = equity position = None middle_exit_streak = 0 cooldown_until = index + variant.cooldown_bars if position is not None: values = (upper[index], lower[index]) if all(value == value for value in values): fast_fail_exit = fast_fail_exit_price( position=position, candle=candle, upper=float(upper[index]), lower=float(lower[index]), rule=variant.fast_fail, ) if fast_fail_exit is not None: exit_price, reason = fast_fail_exit equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, reason=reason, ) wins += int(won) exit_counts["fast_fail_exits"] += 1 if reason == "fast_fail_band": exit_counts["fast_fail_band_exits"] += 1 else: exit_counts["fast_fail_price_exits"] += 1 current_equity = equity position = None middle_exit_streak = 0 cooldown_until = index + variant.cooldown_bars if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) gross_max_drawdown = max(gross_max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], btc_sma[index], eth_realized_vol[index]) if any(value != value for value in values): if position is not None: position["bars_held"] = int(position["bars_held"]) + 1 continue if position is not None: middle_exit = ( position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct) ) or ( position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct) ) if middle_exit and is_us_open_window(candle.ts): middle_exit = False middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 if middle_exit_streak >= variant.middle_exit_confirm_bars: pending_exit = True position["bars_held"] = int(position["bars_held"]) + 1 continue if index < cooldown_until: continue if float(eth_realized_vol[index]) > variant.eth_vol_cap: continue if (peak_equity - current_equity) / peak_equity > variant.dd_overlay: continue if not entry_allowed(candle.ts, "weekday"): continue if not (btc_close.iloc[index] > float(btc_sma[index])): continue if bandwidth[index] <= threshold[index]: if candle.close > float(upper[index]): pending_entry_side = "long" elif candle.close < float(lower[index]): pending_entry_side = "short" trade_count = len(trades) result = SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=gross_max_drawdown, trades=trades, open_position=position, candles=eth[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) return result, exit_counts def build_variants() -> list[Variant]: rules = [FastFailRule("none", 0)] rules.extend(FastFailRule("band_reclaim", bars) for bars in (1, 2, 3, 4, 6)) rules.extend(FastFailRule("entry_adverse", bars, threshold) for bars in (1, 2, 3, 4, 6) for threshold in (0.0, 0.001, 0.002, 0.003)) return [Variant(fast_fail=rule) for rule in rules] def net_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(int(trade["exit_ts"]), unit="ms", utc=True), "equity": equity}) return pd.DataFrame(rows) def rows_for_horizons(*, result: SegmentResult, frame: pd.DataFrame, variant: Variant, first_ts: int, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: start_time = pd.to_datetime(first_ts, unit="ms", utc=True) if offset is None else end_time - offset before = frame[frame["ts"] <= start_time] start_equity = float(before["equity"].iloc[-1]) if len(before) else float(frame["equity"].iloc[0]) after = frame[frame["ts"] > start_time] horizon_frame = pd.concat([pd.DataFrame([{"ts": start_time, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True) trades = [trade for trade in result.trades if int(trade["exit_ts"]) > int(start_time.timestamp() * 1000)] stats = trade_stats(trades) returns = [float(trade["return_pct"]) for trade in trades] rows.append( { "family": "bb_squeeze_fast_fail_exit", "cost": PRIMARY_COST, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL, "bar": BAR, "name": variant.name, "fast_fail_rule": variant.fast_fail.label, "fast_fail_kind": variant.fast_fail.kind, "fast_fail_bars": variant.fast_fail.bars, "fast_fail_threshold_pct": variant.fast_fail.threshold_pct, "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), "trades": len(trades), "win_rate": sum(1 for value in returns if value > 0.0) / len(returns) if returns else 0.0, **stats, **equity_metrics(horizon_frame, int(start_time.timestamp() * 1000), last_ts), } ) return rows def exit_reason_rows(*, result: SegmentResult, variant: Variant) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for reason, trades in pd.DataFrame(result.trades).groupby("exit_reason"): records = trades.to_dict("records") stats = trade_stats(records) returns = [float(trade["return_pct"]) for trade in records] rows.append( { "family": "bb_squeeze_fast_fail_exit", "name": variant.name, "fast_fail_rule": variant.fast_fail.label, "exit_reason": reason, "trades": len(records), "win_rate": sum(1 for value in returns if value > 0.0) / len(returns) if returns else 0.0, "gross_return_sum_pct": sum(returns), **stats, } ) return rows def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report(*, horizon: pd.DataFrame, exits: pd.DataFrame, first_ts: int, last_ts: int, command: str) -> str: full = horizon[horizon["horizon"] == "full"].sort_values(["net_calmar", "net_annualized_return"], ascending=[False, False]) baseline = horizon[(horizon["horizon"] == "full") & (horizon["fast_fail_rule"] == "baseline")] top = full.head(5) compare_cols = [ "fast_fail_rule", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "avg_return_pct", "payoff_ratio", "profit_factor", ] top_rules = set(top["fast_fail_rule"].tolist()) | {"baseline"} top_horizons = horizon[horizon["fast_fail_rule"].isin(top_rules)].sort_values(["fast_fail_rule", "horizon"]) fast_fail_exits = exits[exits["exit_reason"].astype(str).str.startswith("fast_fail")].sort_values( ["name", "gross_return_sum_pct"], ascending=[True, True] ) lines = [ "# ETH BB squeeze fast-fail exit exploration", "", f"Run command: `{command}`", f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.", "", "Baseline parameters: ETH 15m, band_length=96, bandwidth_lookback=960, bandwidth_quantile=0.25, stop_loss=1%, take_profit=3%, middle_exit_buffer=0.1%, middle_confirm=1, eth_vol_cap=0.006, cooldown=24, btc_up filter, weekday entry, us_open_exit skip, both sides.", "", "Top 5 full-history maker_taker results:", markdown_table(top[compare_cols]), "", "Baseline full-history row:", markdown_table(baseline[compare_cols]), "", "Full/3y/1y/6m/3m/4w metrics for Top 5 plus baseline:", markdown_table( top_horizons[ [ "fast_fail_rule", "horizon", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "avg_return_pct", "payoff_ratio", "profit_factor", ] ] ), "", "Fast-fail exit reason contribution:", markdown_table( fast_fail_exits[ [ "fast_fail_rule", "exit_reason", "trades", "win_rate", "avg_return_pct", "gross_return_sum_pct", "profit_factor", ] ].head(20) ), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth = load_candles_csv(DATA_DIR, ETH_SYMBOL, args.bar) btc = load_candles_csv(DATA_DIR, BTC_SYMBOL, args.bar) eth, btc = align_candles_by_ts(eth, btc) requested_bars = int(args.years * 365 * 24 * 60 / 15) eth = eth[-requested_bars:] btc = btc[-requested_bars:] summary_rows: list[dict[str, object]] = [] horizon_rows_out: list[dict[str, object]] = [] exit_rows: list[dict[str, object]] = [] variants = build_variants() for index, variant in enumerate(variants, start=1): result, exit_counts = run_variant(eth, btc, variant) frame = net_equity_frame(result, PRIMARY_COST_RATE) metrics = equity_metrics(frame, eth[0].ts, eth[-1].ts) stats = trade_stats(result.trades) summary_rows.append( { "family": "bb_squeeze_fast_fail_exit", "cost": PRIMARY_COST, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL, "bar": args.bar, "name": variant.name, "fast_fail_rule": variant.fast_fail.label, "fast_fail_kind": variant.fast_fail.kind, "fast_fail_bars": variant.fast_fail.bars, "fast_fail_threshold_pct": variant.fast_fail.threshold_pct, "band_length": variant.band_length, "bandwidth_lookback": variant.bandwidth_lookback, "bandwidth_quantile": variant.bandwidth_quantile, "stop_loss_pct": variant.stop_loss_pct, "take_profit_pct": variant.take_profit_pct, "reward_risk": variant.reward_risk, "exit_mode": variant.exit_mode, "side_mode": variant.side_mode, "btc_filter": variant.btc_filter, "eth_vol_cap": variant.eth_vol_cap, "dd_overlay": variant.dd_overlay, "cooldown_bars": variant.cooldown_bars, "middle_exit_buffer_pct": variant.middle_exit_buffer_pct, "middle_exit_confirm_bars": variant.middle_exit_confirm_bars, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "trades": result.trade_count, "win_rate": result.win_rate, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, **stats, **exit_counts, **metrics, } ) horizon_rows_out.extend(rows_for_horizons(result=result, frame=frame, variant=variant, first_ts=eth[0].ts, last_ts=eth[-1].ts)) exit_rows.extend(exit_reason_rows(result=result, variant=variant)) print(f"done {index}/{len(variants)} {variant.name}") summary = pd.DataFrame(summary_rows).sort_values(["net_calmar", "net_annualized_return"], ascending=[False, False]) horizon = pd.DataFrame(horizon_rows_out) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizon = horizon.sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) exits = pd.DataFrame(exit_rows).sort_values(["fast_fail_rule", "exit_reason"]) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / "eth-bb-squeeze-fast-fail-exit-summary.csv" horizon_path = args.output_dir / "eth-bb-squeeze-fast-fail-exit-horizon.csv" exits_path = args.output_dir / "eth-bb-squeeze-fast-fail-exit-reasons.csv" report_path = args.output_dir / "eth-bb-squeeze-fast-fail-exit-report.md" summary.to_csv(summary_path, index=False) horizon.to_csv(horizon_path, index=False) exits.to_csv(exits_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}" report_path.write_text(write_report(horizon=horizon, exits=exits, first_ts=eth[0].ts, last_ts=eth[-1].ts, command=command), encoding="utf-8") print(summary.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())