from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.candles import align_candles_by_ts, load_candles_csv from okx_codex_trader.models import Candle from okx_codex_trader.research_metrics import ( DEFAULT_COSTS, DEFAULT_INITIAL_EQUITY, DEFAULT_PRIMARY_COST, cost_equity_frame, equity_metrics, format_utc_ts, trade_stats, ) from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from okx_codex_trader.time_rules import entry_allowed, is_us_open_window ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = DEFAULT_INITIAL_EQUITY DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PRIMARY_COST = DEFAULT_PRIMARY_COST COSTS = DEFAULT_COSTS HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("21d", pd.DateOffset(days=21)), ) @dataclass(frozen=True) class ExitRule: breakeven_trigger_pct: float | None breakeven_lock_pct: float trail_trigger_pct: float | None trail_giveback_pct: float | None @property def name(self) -> str: be = "none" if self.breakeven_trigger_pct is None else f"{self.breakeven_trigger_pct:g}-{self.breakeven_lock_pct:g}" tr = "none" if self.trail_trigger_pct is None else f"{self.trail_trigger_pct:g}-{self.trail_giveback_pct:g}" return f"be{be}-trail{tr}" def _format_ts(ts: int) -> str: return format_utc_ts(ts) def favorable_move(side: str, entry_price: float, candle: Candle) -> float: if side == "long": return candle.high / entry_price - 1.0 return entry_price / candle.low - 1.0 def stop_price_for_rule(position: dict[str, object], rule: ExitRule) -> tuple[float, str]: side = str(position["side"]) entry = float(position["entry_price"]) base_stop = float(position["stop_price"]) mfe = float(position["mfe_pct"]) protected = base_stop reason = "stop" if rule.breakeven_trigger_pct is not None and mfe >= rule.breakeven_trigger_pct: be_stop = entry * (1.0 + rule.breakeven_lock_pct if side == "long" else 1.0 - rule.breakeven_lock_pct) protected = max(protected, be_stop) if side == "long" else min(protected, be_stop) reason = "breakeven" if rule.trail_trigger_pct is not None and rule.trail_giveback_pct is not None and mfe >= rule.trail_trigger_pct: trail_stop = entry * (1.0 + mfe - rule.trail_giveback_pct if side == "long" else 1.0 - mfe + rule.trail_giveback_pct) improved = trail_stop > protected if side == "long" else trail_stop < protected if improved: protected = trail_stop reason = "trailing" return protected, reason if protected != base_stop else "stop" def exit_price_and_reason(position: dict[str, object], candle: Candle, rule: ExitRule) -> tuple[float, str] | None: side = str(position["side"]) protected_stop, stop_reason = stop_price_for_rule(position, rule) take = float(position["take_price"]) if side == "long": if candle.open <= protected_stop: return candle.open, f"{stop_reason}_gap" if stop_reason != "stop" else "stop_gap" if candle.open >= take: return candle.open, "take_gap" if candle.low <= protected_stop: return protected_stop, stop_reason if candle.high >= take: return take, "take_profit" else: if candle.open >= protected_stop: return candle.open, f"{stop_reason}_gap" if stop_reason != "stop" else "stop_gap" if candle.open <= take: return candle.open, "take_gap" if candle.high >= protected_stop: return protected_stop, stop_reason if candle.low <= take: return take, "take_profit" return None def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "exit_ts": candle.ts, "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, "exit_reason": reason, "mfe_pct": round(float(position["mfe_pct"]) * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def run_variant(eth: list[Candle], btc: list[Candle], rule: ExitRule) -> SegmentResult: eth_close = pd.Series([candle.close for candle in eth], dtype=float) btc_close = pd.Series([candle.close for candle in btc], dtype=float) middle_series = eth_close.rolling(96).mean() stdev_series = eth_close.rolling(96).std(ddof=0) upper_values = middle_series + 2.0 * stdev_series lower_values = middle_series - 2.0 * stdev_series bandwidth = (upper_values - lower_values) / middle_series threshold = bandwidth.rolling(960).quantile(0.25) btc_sma = btc_close.rolling(480).mean() eth_vol = eth_close.pct_change().rolling(96).std(ddof=0) warmup_bars = 960 equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False middle_exit_streak = 0 cooldown_until = -1 for index in range(warmup_bars, len(eth)): candle = eth[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, reason="signal_middle", ) wins += int(won) position = None pending_exit = False middle_exit_streak = 0 cooldown_until = index + 24 if pending_entry_side is not None and position is None and equity > 0.0: entry_price = candle.open position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": entry_price, "margin_used": equity, "stop_price": entry_price * (0.99 if pending_entry_side == "long" else 1.01), "take_price": entry_price * (1.03 if pending_entry_side == "long" else 0.97), "mfe_pct": 0.0, } entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: risk_exit = exit_price_and_reason(position, candle, rule) if risk_exit is not None: exit_price, reason = risk_exit equity, won = close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, reason=reason) wins += int(won) current_equity = equity position = None middle_exit_streak = 0 cooldown_until = index + 24 if position is not None: position["mfe_pct"] = max(float(position["mfe_pct"]), favorable_move(str(position["side"]), float(position["entry_price"]), candle)) current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue values = (middle_series.iloc[index], upper_values.iloc[index], lower_values.iloc[index], bandwidth.iloc[index], threshold.iloc[index], btc_sma.iloc[index], eth_vol.iloc[index]) if any(value != value for value in values): continue if position is not None: side = str(position["side"]) middle_exit = (side == "long" and candle.close < float(middle_series.iloc[index]) * 0.999) or ( side == "short" and candle.close > float(middle_series.iloc[index]) * 1.001 ) if middle_exit and is_us_open_window(candle.ts): middle_exit = False middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 if middle_exit_streak >= 1: pending_exit = True continue if index < cooldown_until: continue if float(eth_vol.iloc[index]) > 0.006: continue if not entry_allowed(candle.ts, "weekday"): continue if not btc_close.iloc[index] > float(btc_sma.iloc[index]): continue if bandwidth.iloc[index] <= threshold.iloc[index]: if candle.close > float(upper_values.iloc[index]): pending_entry_side = "long" elif candle.close < float(lower_values.iloc[index]): pending_entry_side = "short" return SegmentResult( trade_count=len(trades), total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / len(trades) if trades else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def build_rules() -> list[ExitRule]: rules = [ExitRule(None, 0.0, None, None)] for trigger in (0.003, 0.005, 0.008, 0.01): for lock in (0.0, 0.001): rules.append(ExitRule(trigger, lock, None, None)) for trigger in (0.003, 0.005, 0.008, 0.01): for giveback in (0.002, 0.003, 0.005): if giveback < trigger: rules.append(ExitRule(None, 0.0, trigger, giveback)) for be_trigger in (0.003, 0.005, 0.008): for trail_trigger in (0.008, 0.01, 0.015): if trail_trigger > be_trigger: for giveback in (0.003, 0.005): rules.append(ExitRule(be_trigger, 0.0, trail_trigger, giveback)) rules.append(ExitRule(be_trigger, 0.001, trail_trigger, giveback)) return sorted(set(rules), key=lambda rule: rule.name) def window_frame(frame: pd.DataFrame, label: str, offset: pd.DateOffset | None, last_ts: int) -> tuple[pd.DataFrame, int]: if offset is None: start_ts = int(pd.Timestamp(frame["ts"].iloc[0]).timestamp() * 1000) return frame[["ts", "equity"]].copy(), start_ts end_time = pd.to_datetime(last_ts, unit="ms", utc=True) cutoff = end_time - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) after = frame[frame["ts"] > cutoff] scoped = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True) else: scoped = frame[["ts", "equity"]].copy() return scoped, int(pd.Timestamp(scoped["ts"].iloc[0]).timestamp() * 1000) def window_trades(trades: list[dict[str, object]], start_ts: int, last_ts: int) -> list[dict[str, object]]: return [trade for trade in trades if start_ts < int(trade["exit_ts"]) <= last_ts] def exit_reason_rows(name: str, trades: list[dict[str, object]]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for reason, group in pd.DataFrame(trades).groupby("exit_reason") if trades else []: group_trades = group.to_dict("records") stats = trade_stats(group_trades) rows.append( { "name": name, "exit_reason": reason, "trades": len(group_trades), "wins": int((group["return_pct"].astype(float) > 0.0).sum()), "losses": int((group["return_pct"].astype(float) < 0.0).sum()), "sum_return_pct": float(group["return_pct"].astype(float).sum()), **stats, } ) return rows def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report(summary: pd.DataFrame, horizons: pd.DataFrame, reasons: pd.DataFrame, command: str, first_ts: int, last_ts: int) -> str: primary = summary[summary["cost"] == PRIMARY_COST] top = primary.head(5) baseline = primary[primary["name"] == "benone-trailnone"].iloc[0] top_names = set(top["name"]) top_horizons = horizons[(horizons["cost"] == PRIMARY_COST) & (horizons["name"].isin(top_names))] top_reasons = reasons[reasons["name"].isin(top_names | {"benone-trailnone"})] return ( "# ETH BB squeeze exit management task C\n\n" f"Run command: `{command}`\n" f"Aligned local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.\n" "Entry logic is fixed to the current live baseline: ETH 15m, band 96, bandwidth 960 q0.25, 1% stop, 3% take, middle exit 0.1% x1, vol cap 0.006, cooldown 24, BTC-up, NY weekday entries, skip US-open middle exits, both sides.\n\n" "Exit priority: pending middle exit at next open, then entry fill; while holding, open gaps are checked before intrabar checks, protected stop/stop-loss is conservative before take-profit, then middle exit can schedule next-open exit. Dynamic protection uses MFE confirmed before the current candle.\n\n" "## Baseline\n\n" + markdown_table(pd.DataFrame([baseline])[["name", "trades", "win_rate", "avg_return_pct", "payoff_ratio", "profit_factor", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar"]]) + "\n\n## Top 5 by full maker_taker Calmar\n\n" + markdown_table(top[["name", "trades", "win_rate", "avg_return_pct", "payoff_ratio", "profit_factor", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar"]]) + "\n\n## Top 5 horizons\n\n" + markdown_table(top_horizons[["name", "horizon", "trades", "win_rate", "avg_return_pct", "payoff_ratio", "profit_factor", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar"]]) + "\n\n## Exit reason distribution\n\n" + markdown_table(top_reasons[["name", "exit_reason", "trades", "wins", "losses", "sum_return_pct", "avg_return_pct", "payoff_ratio", "profit_factor"]]) + "\n\n## Recommendation\n\n" "Do not update live from this task unless a candidate beats the baseline on full-window Calmar and does not degrade 1y, 6m, 3m, and 21d net total return versus baseline. See CSV outputs for the strict comparison.\n" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth = load_candles_csv(DATA_DIR, ETH_SYMBOL, args.bar) btc = load_candles_csv(DATA_DIR, BTC_SYMBOL, args.bar) eth, btc = align_candles_by_ts(eth, btc) requested_bars = int(args.years * 365 * 24 * 60 / 15) eth = eth[-requested_bars:] btc = btc[-requested_bars:] summary_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] reason_rows: list[dict[str, object]] = [] for index, rule in enumerate(build_rules(), start=1): result = run_variant(eth, btc, rule) stats = trade_stats(result.trades) reason_rows.extend(exit_reason_rows(rule.name, result.trades)) for cost_name, cost in COSTS: frame = cost_equity_frame(result, cost) full_metrics = equity_metrics(frame, eth[0].ts, eth[-1].ts) summary_rows.append( { "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL, "bar": args.bar, "name": rule.name, "breakeven_trigger_pct": rule.breakeven_trigger_pct, "breakeven_lock_pct": rule.breakeven_lock_pct, "trail_trigger_pct": rule.trail_trigger_pct, "trail_giveback_pct": rule.trail_giveback_pct, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "trades": result.trade_count, "win_rate": result.win_rate, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, **stats, **full_metrics, } ) for label, offset in HORIZONS: scoped, start_ts = window_frame(frame, label, offset, eth[-1].ts) scoped_trades = window_trades(result.trades, start_ts, eth[-1].ts) horizon_rows.append( { "cost": cost_name, "symbol": ETH_SYMBOL, "bar": args.bar, "name": rule.name, "horizon": label, "horizon_start": pd.to_datetime(start_ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M"), "horizon_end": _format_ts(eth[-1].ts), "trades": len(scoped_trades), "win_rate": sum(1 for trade in scoped_trades if float(trade["return_pct"]) > 0.0) / len(scoped_trades) if scoped_trades else 0.0, **trade_stats(scoped_trades), **equity_metrics(scoped, start_ts, eth[-1].ts), } ) print(f"done {index}/{len(build_rules())} {rule.name}", flush=True) summary = pd.DataFrame(summary_rows).sort_values(["cost", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) primary = summary[summary["cost"] == PRIMARY_COST] summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True) horizons = pd.DataFrame(horizon_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizons = horizons.sort_values(["cost", "horizon", "net_calmar", "net_total_return"], ascending=[True, True, False, False]) reasons = pd.DataFrame(reason_rows).sort_values(["name", "exit_reason"]) args.output_dir.mkdir(parents=True, exist_ok=True) prefix = "eth-bb-squeeze-exit-management" summary_path = args.output_dir / f"{prefix}-summary.csv" horizon_path = args.output_dir / f"{prefix}-horizons.csv" reason_path = args.output_dir / f"{prefix}-exit-reasons.csv" report_path = args.output_dir / f"{prefix}-report.md" summary.to_csv(summary_path, index=False) horizons.to_csv(horizon_path, index=False) reasons.to_csv(reason_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years} --output-dir {args.output_dir.as_posix()}" report_path.write_text(write_report(summary, horizons, reasons, command, eth[0].ts, eth[-1].ts), encoding="utf-8") print(primary.head(5).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())