| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- from __future__ import annotations
- import argparse
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.candles import align_candles_by_ts, load_candles_csv
- from okx_codex_trader.models import Candle
- from okx_codex_trader.research_metrics import (
- DEFAULT_COSTS,
- DEFAULT_HORIZONS,
- DEFAULT_INITIAL_EQUITY,
- DEFAULT_PRIMARY_COST,
- cost_equity_frame,
- equity_metrics,
- format_utc_ts,
- horizon_rows,
- worst_month,
- )
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- ETH_SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- BAR = "15m"
- YEARS = 10.0
- LEVERAGE = 3
- INITIAL_EQUITY = DEFAULT_INITIAL_EQUITY
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/eth-exploration")
- PRIMARY_COST = DEFAULT_PRIMARY_COST
- COSTS = DEFAULT_COSTS
- HORIZONS = DEFAULT_HORIZONS
- @dataclass(frozen=True)
- class Variant:
- band_length: int
- bandwidth_lookback: int
- bandwidth_quantile: float
- side_mode: str
- btc_filter: str
- eth_vol_cap: float | None
- cooldown_bars: int
- stop_loss_pct: float
- middle_exit_buffer_pct: float
- middle_exit_confirm_bars: int
- breakeven_trigger_pct: float | None
- breakeven_lock_pct: float
- trail_trigger_pct: float | None
- trail_giveback_pct: float | None
- max_giveback_trigger_pct: float | None
- max_giveback_pct: float | None
- @property
- def name(self) -> str:
- vol = "none" if self.eth_vol_cap is None else f"{self.eth_vol_cap:g}"
- be = "none" if self.breakeven_trigger_pct is None else f"{self.breakeven_trigger_pct:g}-{self.breakeven_lock_pct:g}"
- trail = "none" if self.trail_trigger_pct is None else f"{self.trail_trigger_pct:g}-{self.trail_giveback_pct:g}"
- gb = "none" if self.max_giveback_trigger_pct is None else f"{self.max_giveback_trigger_pct:g}-{self.max_giveback_pct:g}"
- return (
- f"bb-squeeze-protect-l{self.band_length}-bw{self.bandwidth_lookback}"
- f"-q{self.bandwidth_quantile:g}-sl{self.stop_loss_pct:g}"
- f"-{self.side_mode}-{self.btc_filter}-vc{vol}-cd{self.cooldown_bars}"
- f"-mxbuf{self.middle_exit_buffer_pct:g}-mxc{self.middle_exit_confirm_bars}"
- f"-be{be}-tr{trail}-gb{gb}"
- )
- def _format_ts(ts: int) -> str:
- return format_utc_ts(ts)
- def close_position(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- candle: Candle,
- exit_price: float,
- reason: str,
- ) -> tuple[float, bool]:
- margin_used = float(position["margin_used"])
- exit_equity = trade_equity(
- side=str(position["side"]),
- margin_used=margin_used,
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - margin_used
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": _format_ts(int(position["entry_time"])),
- "exit_time": _format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / margin_used * 100.0, 4),
- "cost_weight": 1.0,
- "exit_reason": reason,
- "mfe_pct": round(float(position["mfe_pct"]) * 100.0, 4),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- return exit_equity, pnl > 0.0
- def favorable_move(side: str, entry_price: float, candle: Candle) -> float:
- if side == "long":
- return candle.high / entry_price - 1.0
- return entry_price / candle.low - 1.0
- def protection_exit(position: dict[str, object], candle: Candle, variant: Variant) -> tuple[float, str] | None:
- side = str(position["side"])
- entry_price = float(position["entry_price"])
- mfe = float(position["mfe_pct"])
- stop_price = float(position["stop_price"])
- if variant.breakeven_trigger_pct is not None and mfe >= variant.breakeven_trigger_pct:
- be_stop = entry_price * (1.0 + variant.breakeven_lock_pct if side == "long" else 1.0 - variant.breakeven_lock_pct)
- stop_price = max(stop_price, be_stop) if side == "long" else min(stop_price, be_stop)
- if variant.trail_trigger_pct is not None and variant.trail_giveback_pct is not None and mfe >= variant.trail_trigger_pct:
- if side == "long":
- trail_stop = entry_price * (1.0 + mfe - variant.trail_giveback_pct)
- stop_price = max(stop_price, trail_stop)
- else:
- trail_stop = entry_price * (1.0 - mfe + variant.trail_giveback_pct)
- stop_price = min(stop_price, trail_stop)
- if side == "long":
- if candle.open <= stop_price:
- return candle.open, "protect_gap" if stop_price != float(position["stop_price"]) else "stop_gap"
- if candle.low <= stop_price:
- return stop_price, "profit_protect" if stop_price != float(position["stop_price"]) else "stop"
- else:
- if candle.open >= stop_price:
- return candle.open, "protect_gap" if stop_price != float(position["stop_price"]) else "stop_gap"
- if candle.high >= stop_price:
- return stop_price, "profit_protect" if stop_price != float(position["stop_price"]) else "stop"
- if variant.max_giveback_trigger_pct is None or variant.max_giveback_pct is None or mfe < variant.max_giveback_trigger_pct:
- return None
- if side == "long":
- close_profit = candle.close / entry_price - 1.0
- else:
- close_profit = entry_price / candle.close - 1.0
- if close_profit <= mfe - variant.max_giveback_pct:
- return candle.close, "giveback_close"
- return None
- def run_variant(eth: list[Candle], btc: list[Candle], variant: Variant) -> tuple[SegmentResult, dict[str, int]]:
- eth_close = pd.Series([candle.close for candle in eth], dtype=float)
- btc_close = pd.Series([candle.close for candle in btc], dtype=float)
- middle_series = eth_close.rolling(variant.band_length).mean()
- stdev_series = eth_close.rolling(variant.band_length).std(ddof=0)
- upper_values = middle_series + 2.0 * stdev_series
- lower_values = middle_series - 2.0 * stdev_series
- middle = middle_series.tolist()
- upper = upper_values.tolist()
- lower = lower_values.tolist()
- bandwidth = ((upper_values - lower_values) / middle_series).tolist()
- threshold = pd.Series(bandwidth, dtype=float).rolling(variant.bandwidth_lookback).quantile(variant.bandwidth_quantile).tolist()
- btc_sma = btc_close.rolling(480).mean().tolist()
- btc_momentum = (btc_close / btc_close.shift(96) - 1.0).tolist()
- eth_realized_vol = eth_close.pct_change().rolling(96).std(ddof=0).tolist()
- warmup_bars = max(variant.band_length, variant.bandwidth_lookback, 480, 96)
- equity = INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_entry_side: str | None = None
- pending_exit = False
- middle_exit_streak = 0
- cooldown_until = -1
- exit_counts = {"stop_exits": 0, "protect_exits": 0, "giveback_exits": 0, "signal_exits": 0}
- for index in range(warmup_bars, len(eth)):
- candle = eth[index]
- if pending_exit and position is not None:
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- candle=candle,
- exit_price=candle.open,
- reason="signal_middle",
- )
- wins += int(won)
- exit_counts["signal_exits"] += 1
- position = None
- pending_exit = False
- middle_exit_streak = 0
- cooldown_until = index + variant.cooldown_bars
- if pending_entry_side is not None and position is None and equity > 0.0:
- entry_price = candle.open
- position = {
- "side": pending_entry_side,
- "entry_time": candle.ts,
- "entry_price": entry_price,
- "margin_used": equity,
- "stop_price": entry_price * (1.0 - variant.stop_loss_pct if pending_entry_side == "long" else 1.0 + variant.stop_loss_pct),
- "mfe_pct": 0.0,
- }
- entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side})
- pending_entry_side = None
- current_equity = equity
- if position is not None:
- risk_exit = protection_exit(position, candle, variant)
- if risk_exit is not None:
- exit_price, reason = risk_exit
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- candle=candle,
- exit_price=exit_price,
- reason=reason,
- )
- wins += int(won)
- if reason.startswith("stop"):
- exit_counts["stop_exits"] += 1
- elif reason == "giveback_close":
- exit_counts["giveback_exits"] += 1
- else:
- exit_counts["protect_exits"] += 1
- current_equity = equity
- position = None
- middle_exit_streak = 0
- cooldown_until = index + variant.cooldown_bars
- if position is not None:
- position["mfe_pct"] = max(float(position["mfe_pct"]), favorable_move(str(position["side"]), float(position["entry_price"]), candle))
- if position is not None:
- current_equity = mark_to_market(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(eth) - 1 or equity <= 0.0:
- continue
- values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], btc_sma[index], btc_momentum[index], eth_realized_vol[index])
- if any(value != value for value in values):
- continue
- if position is not None:
- middle_exit = (
- position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct)
- ) or (
- position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct)
- )
- middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0
- if middle_exit_streak >= variant.middle_exit_confirm_bars:
- pending_exit = True
- continue
- if index < cooldown_until:
- continue
- if variant.eth_vol_cap is not None and float(eth_realized_vol[index]) > variant.eth_vol_cap:
- continue
- if variant.btc_filter == "btc-up" and not (btc_close.iloc[index] > float(btc_sma[index])):
- continue
- if variant.btc_filter == "btc-up-momo" and not (
- btc_close.iloc[index] > float(btc_sma[index]) and float(btc_momentum[index]) > 0.0
- ):
- continue
- if bandwidth[index] <= threshold[index]:
- if candle.close > float(upper[index]):
- pending_entry_side = "long"
- elif variant.side_mode == "both" and candle.close < float(lower[index]):
- pending_entry_side = "short"
- result = SegmentResult(
- trade_count=len(trades),
- total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
- win_rate=wins / len(trades) if trades else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=eth[warmup_bars:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- return result, exit_counts
- def trade_stats(trades: list[dict[str, object]]) -> dict[str, float]:
- if not trades:
- return {"avg_mfe_pct": 0.0, "avg_return_pct": 0.0, "payoff_ratio": 0.0, "profit_factor": 0.0}
- returns = [float(trade["return_pct"]) for trade in trades]
- wins = [value for value in returns if value > 0.0]
- losses = [-value for value in returns if value < 0.0]
- return {
- "avg_mfe_pct": sum(float(trade["mfe_pct"]) for trade in trades) / len(trades),
- "avg_return_pct": sum(returns) / len(returns),
- "payoff_ratio": (sum(wins) / len(wins)) / (sum(losses) / len(losses)) if wins and losses else 0.0,
- "profit_factor": sum(wins) / sum(losses) if losses else 0.0,
- }
- def build_variants() -> list[Variant]:
- base_specs = (
- (48, 960, 0.25, "both", "none", 0.006, 0.01, 0.0005, 1),
- (48, 960, 0.25, "both", "none", 0.006, 0.01, 0.0010, 1),
- (48, 960, 0.25, "both", "none", 0.006, 0.012, 0.0005, 1),
- (96, 960, 0.25, "both", "btc-up", 0.006, 0.012, 0.001, 1),
- (96, 960, 0.25, "both", "btc-up-momo", 0.006, 0.012, 0.001, 1),
- (96, 480, 0.15, "both", "none", 0.006, 0.01, 0.001, 1),
- )
- protections = [
- (None, 0.0, None, None, None, None),
- (0.004, 0.000, None, None, None, None),
- (0.006, 0.000, None, None, None, None),
- (0.008, 0.001, None, None, None, None),
- (0.004, 0.000, 0.010, 0.006, None, None),
- (0.006, 0.000, 0.012, 0.006, None, None),
- (0.008, 0.001, 0.015, 0.008, None, None),
- (0.004, 0.000, None, None, 0.010, 0.006),
- (0.006, 0.000, None, None, 0.012, 0.007),
- (0.008, 0.001, None, None, 0.015, 0.010),
- (0.004, 0.000, 0.010, 0.006, 0.012, 0.008),
- (0.006, 0.000, 0.012, 0.006, 0.015, 0.010),
- (0.008, 0.001, 0.015, 0.008, 0.020, 0.012),
- ]
- variants: list[Variant] = []
- for band_length, lookback, quantile, side_mode, btc_filter, vol_cap, stop_loss, mxbuf, mxc in base_specs:
- for be_trigger, be_lock, trail_trigger, trail_giveback, gb_trigger, gb_pct in protections:
- variants.append(
- Variant(
- band_length=band_length,
- bandwidth_lookback=lookback,
- bandwidth_quantile=quantile,
- side_mode=side_mode,
- btc_filter=btc_filter,
- eth_vol_cap=vol_cap,
- cooldown_bars=24,
- stop_loss_pct=stop_loss,
- middle_exit_buffer_pct=mxbuf,
- middle_exit_confirm_bars=mxc,
- breakeven_trigger_pct=be_trigger,
- breakeven_lock_pct=be_lock,
- trail_trigger_pct=trail_trigger,
- trail_giveback_pct=trail_giveback,
- max_giveback_trigger_pct=gb_trigger,
- max_giveback_pct=gb_pct,
- )
- )
- return variants
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_table(frame: pd.DataFrame) -> str:
- columns = list(frame.columns)
- rows = [columns, ["---" for _ in columns]]
- for record in frame.to_dict("records"):
- rows.append([record[column] for column in columns])
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def write_report(summary: pd.DataFrame, horizon: pd.DataFrame, first_ts: int, last_ts: int, command: str) -> str:
- primary = summary[summary["cost"] == PRIMARY_COST]
- top = primary.head(10)
- baseline = primary[primary["breakeven_trigger_pct"].isna() & primary["trail_trigger_pct"].isna() & primary["max_giveback_trigger_pct"].isna()]
- baseline_top = baseline.sort_values(["net_calmar", "net_annualized_return"], ascending=[False, False]).head(5)
- horizon_top = (
- horizon[horizon["cost"] == PRIMARY_COST]
- .sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False])
- .groupby("horizon", observed=True)
- .head(3)
- )
- return "\n".join(
- [
- "# ETH BB squeeze profit-protection exploration",
- "",
- f"Run command: `{command}`",
- f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.",
- "",
- "The entry logic matches existing BB squeeze families. Variants only change exits after a trade has floating profit: breakeven stop, trailing protection, and close-based giveback.",
- "",
- "Top 10 by maker_taker Calmar:",
- markdown_table(
- top[
- [
- "name",
- "trades",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "avg_mfe_pct",
- "avg_return_pct",
- "profit_factor",
- "stop_exits",
- "protect_exits",
- "giveback_exits",
- "signal_exits",
- ]
- ]
- ),
- "",
- "Best no-protection baselines in same grid:",
- markdown_table(
- baseline_top[
- [
- "name",
- "trades",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "avg_mfe_pct",
- "avg_return_pct",
- "profit_factor",
- ]
- ]
- ),
- "",
- "Recent horizon leaders:",
- markdown_table(
- horizon_top[
- [
- "horizon",
- "name",
- "trades",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- ]
- ]
- ),
- ]
- ) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bar", default=BAR)
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- eth = load_candles_csv(DATA_DIR, ETH_SYMBOL, args.bar)
- btc = load_candles_csv(DATA_DIR, BTC_SYMBOL, args.bar)
- eth, btc = align_candles_by_ts(eth, btc)
- requested_bars = int(args.years * 365 * 24 * 60 / 15)
- eth = eth[-requested_bars:]
- btc = btc[-requested_bars:]
- summary_rows: list[dict[str, object]] = []
- horizon_rows_out: list[dict[str, object]] = []
- variants = build_variants()
- for index, variant in enumerate(variants, start=1):
- result, exit_counts = run_variant(eth, btc, variant)
- if not result.equity_curve:
- continue
- stats = trade_stats(result.trades)
- for cost_name, cost in COSTS:
- frame = cost_equity_frame(result, cost)
- metrics = equity_metrics(frame, eth[0].ts, eth[-1].ts)
- month, month_return = worst_month(frame)
- row = {
- "family": "bb_squeeze_profit_protection",
- "cost": cost_name,
- "symbol": ETH_SYMBOL,
- "signal_symbol": BTC_SYMBOL if variant.btc_filter != "none" else "",
- "bar": args.bar,
- "name": variant.name,
- "band_length": variant.band_length,
- "bandwidth_lookback": variant.bandwidth_lookback,
- "bandwidth_quantile": variant.bandwidth_quantile,
- "side_mode": variant.side_mode,
- "btc_filter": variant.btc_filter,
- "eth_vol_cap": variant.eth_vol_cap,
- "cooldown_bars": variant.cooldown_bars,
- "stop_loss_pct": variant.stop_loss_pct,
- "middle_exit_buffer_pct": variant.middle_exit_buffer_pct,
- "middle_exit_confirm_bars": variant.middle_exit_confirm_bars,
- "breakeven_trigger_pct": variant.breakeven_trigger_pct,
- "breakeven_lock_pct": variant.breakeven_lock_pct,
- "trail_trigger_pct": variant.trail_trigger_pct,
- "trail_giveback_pct": variant.trail_giveback_pct,
- "max_giveback_trigger_pct": variant.max_giveback_trigger_pct,
- "max_giveback_pct": variant.max_giveback_pct,
- "first_candle": _format_ts(eth[0].ts),
- "last_candle": _format_ts(eth[-1].ts),
- "years": (eth[-1].ts - eth[0].ts) / 86_400_000 / 365,
- "trades": result.trade_count,
- "gross_total_return": result.total_return,
- "gross_max_drawdown_mark_to_market": result.max_drawdown,
- "worst_month": month,
- "worst_month_return": month_return,
- **exit_counts,
- **stats,
- **metrics,
- }
- summary_rows.append(row)
- for horizon_row in horizon_rows(frame, eth[-1].ts, HORIZONS):
- horizon_rows_out.append(
- {
- "family": "bb_squeeze_profit_protection",
- "cost": cost_name,
- "symbol": ETH_SYMBOL,
- "bar": args.bar,
- "name": variant.name,
- "trades": result.trade_count,
- **horizon_row,
- }
- )
- print(f"done {index}/{len(variants)} {variant.name}", flush=True)
- summary = pd.DataFrame(summary_rows).sort_values(
- ["cost", "net_calmar", "net_annualized_return", "profit_factor"],
- ascending=[True, False, False, False],
- )
- primary = summary[summary["cost"] == PRIMARY_COST]
- summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True)
- horizon = pd.DataFrame(horizon_rows_out)
- horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True)
- horizon = horizon.sort_values(["cost", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- summary_path = args.output_dir / "eth-bb-squeeze-profit-protection-summary.csv"
- horizon_path = args.output_dir / "eth-bb-squeeze-profit-protection-horizon.csv"
- report_path = args.output_dir / "eth-bb-squeeze-profit-protection-report.md"
- summary.to_csv(summary_path, index=False)
- horizon.to_csv(horizon_path, index=False)
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}"
- report_path.write_text(write_report(summary, horizon, eth[0].ts, eth[-1].ts, command), encoding="utf-8")
- print(primary.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|