from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PRIMARY_COST = "maker_taker" COSTS = ( ("maker_maker", 0.0012), ("maker_taker", 0.0021), ("taker_taker", 0.0030), ) @dataclass(frozen=True) class Variant: band_length: int bandwidth_lookback: int bandwidth_quantile: float stop_loss_pct: float extreme_take_profit_pct: float side_mode: str entry_btc_filter: str entry_eth_vol_cap: float | None dd_overlay: float | None cooldown_bars: int reentry_bars: int gate_mode: str middle_exit_buffer_pct: float middle_exit_confirm_bars: int high_eth_vol_floor: float far_band_extension: float recent_profit_threshold: float @property def name(self) -> str: vol_cap = "none" if self.entry_eth_vol_cap is None else f"{self.entry_eth_vol_cap:g}" dd = "none" if self.dd_overlay is None else f"{self.dd_overlay:g}" return ( f"bb-squeeze-t-l{self.band_length}-bw{self.bandwidth_lookback}" f"-q{self.bandwidth_quantile:g}-sl{self.stop_loss_pct:g}" f"-xtp{self.extreme_take_profit_pct:g}-{self.side_mode}-{self.entry_btc_filter}" f"-vc{vol_cap}-dd{dd}-cd{self.cooldown_bars}" f"-tre{self.reentry_bars}-{self.gate_mode}" f"-mxbuf{self.middle_exit_buffer_pct:g}-mxc{self.middle_exit_confirm_bars}" f"-hv{self.high_eth_vol_floor:g}-fb{self.far_band_extension:g}" f"-rp{self.recent_profit_threshold:g}" ) def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def _load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def _align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_out: list[Candle] = [] right_out: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is not None: left_out.append(candle) right_out.append(other) return left_out, right_out def _close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, "entry_kind": position["entry_kind"], "exit_reason": reason, } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def _gate_passes( *, variant: Variant, side: str, anchor_price: float, candle: Candle, btc_momentum: float, eth_realized_vol: float, upper: float, lower: float, middle: float, ) -> tuple[bool, dict[str, bool]]: band_half_width = upper - middle btc_against = (side == "long" and btc_momentum < 0.0) or (side == "short" and btc_momentum > 0.0) high_eth_vol = eth_realized_vol >= variant.high_eth_vol_floor far_band_close = (side == "long" and candle.close >= upper + band_half_width * variant.far_band_extension) or ( side == "short" and candle.close <= lower - band_half_width * variant.far_band_extension ) recent_profit_spike = (side == "long" and candle.close / anchor_price - 1.0 >= variant.recent_profit_threshold) or ( side == "short" and anchor_price / candle.close - 1.0 >= variant.recent_profit_threshold ) flags = { "btc_against": btc_against, "high_eth_vol": high_eth_vol, "far_band_close": far_band_close, "recent_profit_spike": recent_profit_spike, } required = variant.gate_mode.split("+") return all(flags[name] for name in required), flags def run_variant(eth: list[Candle], btc: list[Candle], variant: Variant) -> tuple[SegmentResult, dict[str, int]]: eth_close = pd.Series([candle.close for candle in eth], dtype=float) btc_close = pd.Series([candle.close for candle in btc], dtype=float) middle_series = eth_close.rolling(variant.band_length).mean() stdev_series = eth_close.rolling(variant.band_length).std(ddof=0) upper_values = middle_series + 2.0 * stdev_series lower_values = middle_series - 2.0 * stdev_series middle = middle_series.tolist() upper = upper_values.tolist() lower = lower_values.tolist() bandwidth = ((upper_values - lower_values) / middle_series).tolist() threshold = pd.Series(bandwidth, dtype=float).rolling(variant.bandwidth_lookback).quantile(variant.bandwidth_quantile).tolist() btc_sma = btc_close.rolling(480).mean().tolist() btc_momentum = (btc_close / btc_close.shift(96) - 1.0).tolist() eth_realized_vol = eth_close.pct_change().rolling(96).std(ddof=0).tolist() warmup_bars = max(variant.band_length, variant.bandwidth_lookback, 480, 96) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry: dict[str, str] | None = None pending_exit = False middle_exit_streak = 0 reentry: dict[str, object] | None = None cooldown_until = -1 gate_stats = { "extreme_take_exits": 0, "reentry_windows": 0, "reentry_entries": 0, "btc_against_hits": 0, "high_eth_vol_hits": 0, "far_band_close_hits": 0, "recent_profit_spike_hits": 0, "all_gate_hits": 0, } for index in range(warmup_bars, len(eth)): candle = eth[index] if pending_exit and position is not None: equity, won = _close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, reason="middle_exit", ) wins += int(won) position = None pending_exit = False middle_exit_streak = 0 cooldown_until = index + variant.cooldown_bars if pending_entry is not None and position is None and equity > 0.0: side = pending_entry["side"] position = { "side": side, "entry_kind": pending_entry["kind"], "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, "stop_price": candle.open * (1.0 - variant.stop_loss_pct if side == "long" else 1.0 + variant.stop_loss_pct), "extreme_take_price": candle.open * (1.0 + variant.extreme_take_profit_pct if side == "long" else 1.0 - variant.extreme_take_profit_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": side}) if pending_entry["kind"] == "reentry": gate_stats["reentry_entries"] += 1 pending_entry = None current_equity = equity if position is not None: side = str(position["side"]) stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or ( side == "short" and candle.high >= float(position["stop_price"]) ) extreme_take_hit = (side == "long" and candle.high >= float(position["extreme_take_price"])) or ( side == "short" and candle.low <= float(position["extreme_take_price"]) ) if stop_hit or extreme_take_hit: reason = "stop" if stop_hit else "extreme_take_profit" exit_price = float(position["stop_price"] if stop_hit else position["extreme_take_price"]) exit_side = side equity, won = _close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, reason=reason, ) wins += int(won) current_equity = equity position = None middle_exit_streak = 0 if extreme_take_hit and not stop_hit: gate_stats["extreme_take_exits"] += 1 gate_stats["reentry_windows"] += 1 reentry = { "side": exit_side, "until": index + variant.reentry_bars, "anchor_price": exit_price, } else: cooldown_until = index + variant.cooldown_bars if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], btc_sma[index], btc_momentum[index], eth_realized_vol[index]) if any(value != value for value in values): continue if position is None and reentry is not None: if index > int(reentry["until"]): reentry = None else: passed, flags = _gate_passes( variant=variant, side=str(reentry["side"]), anchor_price=float(reentry["anchor_price"]), candle=candle, btc_momentum=float(btc_momentum[index]), eth_realized_vol=float(eth_realized_vol[index]), upper=float(upper[index]), lower=float(lower[index]), middle=float(middle[index]), ) for key, value in flags.items(): gate_stats[f"{key}_hits"] += int(value) gate_stats["all_gate_hits"] += int(passed) if passed: pending_entry = {"side": str(reentry["side"]), "kind": "reentry"} reentry = None continue if position is not None: middle_exit = ( position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct) ) or ( position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct) ) middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 if middle_exit_streak >= variant.middle_exit_confirm_bars: pending_exit = True continue if reentry is not None or index < cooldown_until: continue if variant.entry_eth_vol_cap is not None and float(eth_realized_vol[index]) > variant.entry_eth_vol_cap: continue if variant.dd_overlay is not None and (peak_equity - current_equity) / peak_equity > variant.dd_overlay: continue if variant.entry_btc_filter == "btc-up" and not (btc_close.iloc[index] > float(btc_sma[index])): continue if variant.entry_btc_filter == "btc-up-momo" and not ( btc_close.iloc[index] > float(btc_sma[index]) and float(btc_momentum[index]) > 0.0 ): continue if bandwidth[index] <= threshold[index]: if candle.close > float(upper[index]): pending_entry = {"side": "long", "kind": "initial"} elif variant.side_mode == "both" and candle.close < float(lower[index]): pending_entry = {"side": "short", "kind": "initial"} trade_count = len(trades) result = SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) return result, gate_stats def cost_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) return pd.DataFrame(rows) def max_drawdown(values: list[float]) -> float: peak = values[0] dd = 0.0 for value in values: peak = max(peak, value) dd = max(dd, (peak - value) / peak if peak else 0.0) return dd def equity_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 dd = max_drawdown([float(value) for value in frame["equity"]]) return { "net_total_return": total_return, "net_annualized_return": annualized, "net_max_drawdown": dd, "net_calmar": annualized / dd if dd else 0.0, } def worst_month(frame: pd.DataFrame) -> tuple[str, float]: monthly = frame.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna() if not len(monthly): return "", 0.0 idx = monthly.idxmin() return idx.strftime("%Y-%m"), float(monthly.loc[idx]) def build_variants() -> list[Variant]: bases = ( (96, 960, 0.25, "long", "btc-up-momo", 0.006, 0.25), (96, 960, 0.25, "both", "btc-up-momo", 0.006, 0.25), (96, 960, 0.25, "both", "btc-up", 0.006, 0.25), (48, 960, 0.25, "long", "btc-up-momo", None, 0.25), (48, 960, 0.25, "long", "btc-up", 0.006, 0.25), (96, 480, 0.15, "both", "none", 0.006, None), ) gate_modes = ( "btc_against", "high_eth_vol", "far_band_close", "recent_profit_spike", "btc_against+high_eth_vol", "far_band_close+recent_profit_spike", "btc_against+far_band_close+recent_profit_spike", ) variants: list[Variant] = [] for length, bandwidth_lookback, quantile, side_mode, btc_filter, vol_cap, dd_overlay in bases: for extreme_take_profit_pct in (0.025, 0.035): for reentry_bars in (48, 96, 192): for gate_mode in gate_modes: for middle_exit_buffer_pct, middle_exit_confirm_bars in ( (0.0, 1), (0.001, 1), (0.002, 1), (0.0, 2), (0.001, 2), ): variants.append( Variant( band_length=length, bandwidth_lookback=bandwidth_lookback, bandwidth_quantile=quantile, stop_loss_pct=0.01, extreme_take_profit_pct=extreme_take_profit_pct, side_mode=side_mode, entry_btc_filter=btc_filter, entry_eth_vol_cap=vol_cap, dd_overlay=dd_overlay, cooldown_bars=24, reentry_bars=reentry_bars, gate_mode=gate_mode, middle_exit_buffer_pct=middle_exit_buffer_pct, middle_exit_confirm_bars=middle_exit_confirm_bars, high_eth_vol_floor=0.006, far_band_extension=0.25, recent_profit_threshold=0.008, ) ) return variants def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report(*, summary: pd.DataFrame, command: str, first_ts: int, last_ts: int, requested_years: float) -> str: primary = summary[summary["cost"] == PRIMARY_COST] top_calmar = primary.head(10) top_reentry = primary.sort_values( ["reentry_entries", "net_calmar", "net_annualized_return"], ascending=[False, False, False], ).head(10) best = primary.iloc[0] if len(primary) else None lines = [ "# ETH BB squeeze conditional T gate exploration", "", f"Run command: `{command}`", f"Requested years: {requested_years:g}", f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.", "", "Output files:", "- `reports/eth-exploration/eth-bb-squeeze-t-gates-summary.csv`", "- `reports/eth-exploration/eth-bb-squeeze-t-gates-report.md`", "", "Gate semantics: after an extreme take-profit exit, the script opens a same-side reentry window. A reentry is placed only when the selected gate expression is true.", "", "Top 10 by maker_taker Calmar:", markdown_table( top_calmar[ [ "name", "trades", "reentry_entries", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "worst_month", "worst_month_return", ] ] ), "", "Top 10 by actual reentry count:", markdown_table( top_reentry[ [ "name", "trades", "reentry_windows", "reentry_entries", "all_gate_hits", "net_annualized_return", "net_max_drawdown", "net_calmar", ] ] ), "", "Verdict:", ] if best is None: lines.append("- No result rows were produced.") else: lines.append( f"- Best maker_taker Calmar: `{best['name']}` with Calmar {format_cell(best['net_calmar'])}, " f"annualized {format_cell(best['net_annualized_return'])}, MDD {format_cell(best['net_max_drawdown'])}, " f"worst month {best['worst_month']} {format_cell(best['worst_month_return'])}, " f"reentries {best['reentry_entries']}." ) return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth = _load_candles(ETH_SYMBOL, args.bar) btc = _load_candles(BTC_SYMBOL, args.bar) eth, btc = _align_pair(eth, btc) requested_bars = int(args.years * 365 * 24 * 60 / 15) eth = eth[-requested_bars:] btc = btc[-requested_bars:] summary_rows: list[dict[str, object]] = [] variants = build_variants() for index, variant in enumerate(variants, start=1): result, gate_stats = run_variant(eth, btc, variant) if not result.equity_curve: print(f"skip {index}/{len(variants)} {variant.name}") continue for cost_name, cost in COSTS: frame = cost_equity_frame(result, cost) metrics = equity_metrics(frame, eth[0].ts, eth[-1].ts) month, month_return = worst_month(frame) summary_rows.append( { "family": "bb_squeeze_conditional_t_gates", "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL, "bar": args.bar, "name": variant.name, "band_length": variant.band_length, "bandwidth_lookback": variant.bandwidth_lookback, "bandwidth_quantile": variant.bandwidth_quantile, "stop_loss_pct": variant.stop_loss_pct, "extreme_take_profit_pct": variant.extreme_take_profit_pct, "side_mode": variant.side_mode, "entry_btc_filter": variant.entry_btc_filter, "entry_eth_vol_cap": variant.entry_eth_vol_cap, "dd_overlay": variant.dd_overlay, "cooldown_bars": variant.cooldown_bars, "reentry_bars": variant.reentry_bars, "gate_mode": variant.gate_mode, "middle_exit_buffer_pct": variant.middle_exit_buffer_pct, "middle_exit_confirm_bars": variant.middle_exit_confirm_bars, "high_eth_vol_floor": variant.high_eth_vol_floor, "far_band_extension": variant.far_band_extension, "recent_profit_threshold": variant.recent_profit_threshold, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": (eth[-1].ts - eth[0].ts) / 86_400_000 / 365, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month": month, "worst_month_return": month_return, **gate_stats, **metrics, } ) print(f"done {index}/{len(variants)} {variant.name}") summary = pd.DataFrame(summary_rows).sort_values( ["cost", "net_calmar", "worst_month_return", "net_annualized_return"], ascending=[True, False, False, False], ) primary = summary[summary["cost"] == PRIMARY_COST] summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / "eth-bb-squeeze-t-gates-summary.csv" report_path = args.output_dir / "eth-bb-squeeze-t-gates-report.md" summary.to_csv(summary_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}" report_path.write_text( write_report( summary=summary, command=command, first_ts=eth[0].ts, last_ts=eth[-1].ts, requested_years=args.years, ), encoding="utf-8", ) print(primary.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())