from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import trade_equity ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 ROUNDTRIP_COST = 0.0021 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-partial-take-runner" HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ) @dataclass(frozen=True) class Variant: name: str band_length: int bandwidth_lookback: int bandwidth_quantile: float side_mode: str btc_filter: str eth_vol_cap: float stop_loss_pct: float cooldown_bars: int take_trigger_pct: float take_fraction: float trail_trigger_pct: float trail_giveback_pct: float readd_drawdown_pct: float | None reentry_after_exit: bool def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]: btc_by_ts = {candle.ts: candle for candle in btc} eth_out: list[Candle] = [] btc_out: list[Candle] = [] for candle in eth: other = btc_by_ts.get(candle.ts) if other is not None: eth_out.append(candle) btc_out.append(other) return eth_out, btc_out def build_frame(eth: list[Candle], btc: list[Candle], variant: Variant) -> pd.DataFrame: frame = pd.DataFrame( { "ts": [candle.ts for candle in eth], "open": [candle.open for candle in eth], "high": [candle.high for candle in eth], "low": [candle.low for candle in eth], "close": [candle.close for candle in eth], "btc_close": [candle.close for candle in btc], } ) close = frame["close"].astype(float) btc_close = frame["btc_close"].astype(float) middle = close.rolling(variant.band_length).mean() stdev = close.rolling(variant.band_length).std(ddof=0) upper = middle + 2.0 * stdev lower = middle - 2.0 * stdev bandwidth = (upper - lower) / middle frame["middle"] = middle frame["upper"] = upper frame["lower"] = lower frame["bandwidth"] = bandwidth frame["bandwidth_threshold"] = bandwidth.rolling(variant.bandwidth_lookback).quantile(variant.bandwidth_quantile) frame["btc_sma"] = btc_close.rolling(480).mean() frame["btc_momentum"] = btc_close / btc_close.shift(96) - 1.0 frame["eth_vol_96"] = close.pct_change().rolling(96).std(ddof=0) return frame def position_margin(position: dict[str, object] | None) -> float: if position is None: return 0.0 return sum(float(leg["margin"]) for leg in position["legs"]) # type: ignore[index] def weighted_entry(position: dict[str, object]) -> float: margin = position_margin(position) if margin <= 0.0: return 0.0 return sum(float(leg["margin"]) * float(leg["entry_price"]) for leg in position["legs"]) / margin # type: ignore[index] def position_value(position: dict[str, object] | None, price: float) -> float: if position is None: return 0.0 side = str(position["side"]) return sum( trade_equity( side=side, margin_used=float(leg["margin"]), entry_price=float(leg["entry_price"]), exit_price=price, leverage=LEVERAGE, ) for leg in position["legs"] # type: ignore[index] ) def account_equity(cash: float, position: dict[str, object] | None, price: float) -> float: return cash + position_value(position, price) def close_fraction( *, cash: float, position: dict[str, object], fraction: float, exit_price: float, ts: int, reason: str, trades: list[dict[str, object]], ) -> tuple[float, dict[str, object] | None]: side = str(position["side"]) entry_price = weighted_entry(position) closed_margin = 0.0 gross_exit = 0.0 remaining: list[dict[str, float]] = [] for leg in position["legs"]: # type: ignore[index] margin = float(leg["margin"]) close_margin = margin * fraction keep_margin = margin - close_margin if close_margin > 0.0: closed_margin += close_margin gross_exit += trade_equity( side=side, margin_used=close_margin, entry_price=float(leg["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) if keep_margin > 1e-9: remaining.append({"margin": keep_margin, "entry_price": float(leg["entry_price"])}) if closed_margin <= 0.0: return cash, position net_exit = gross_exit - closed_margin * ROUNDTRIP_COST trades.append( { "side": "Long" if side == "long" else "Short", "entry_time": _format_ts(int(position["entry_ts"])), "exit_time": _format_ts(ts), "exit_ts": ts, "entry_price": round(entry_price, 4), "exit_price": round(exit_price, 4), "margin": round(closed_margin, 4), "pnl": round(net_exit - closed_margin, 4), "return_pct": (net_exit / closed_margin - 1.0) * 100.0, "exit_reason": reason, } ) cash += net_exit if not remaining: return cash, None updated = dict(position) updated["legs"] = remaining return cash, updated def add_leg(position: dict[str, object], margin: float, price: float) -> dict[str, object]: updated = dict(position) legs = list(updated["legs"]) # type: ignore[arg-type] legs.append({"margin": margin, "entry_price": price}) updated["legs"] = legs updated["partial_taken"] = False updated["peak_profit_pct"] = 0.0 return updated def profit_pct(position: dict[str, object], price: float) -> float: margin = position_margin(position) if margin <= 0.0: return 0.0 return position_value(position, price) / margin - 1.0 def intrabar_profit_pct(position: dict[str, object], row: pd.Series) -> float: side = str(position["side"]) price = float(row["high"]) if side == "long" else float(row["low"]) return profit_pct(position, price) def stop_hit(position: dict[str, object], row: pd.Series) -> tuple[bool, float]: side = str(position["side"]) stop = float(position["stop_price"]) if side == "long": if float(row["open"]) <= stop: return True, float(row["open"]) return float(row["low"]) <= stop, stop if float(row["open"]) >= stop: return True, float(row["open"]) return float(row["high"]) >= stop, stop def trail_hit(position: dict[str, object], row: pd.Series, variant: Variant) -> tuple[bool, float]: peak = float(position["peak_profit_pct"]) if peak < variant.trail_trigger_pct: return False, 0.0 side = str(position["side"]) entry = weighted_entry(position) if side == "long": trail_price = entry * (1.0 + (peak - variant.trail_giveback_pct) / LEVERAGE) if float(row["open"]) <= trail_price: return True, float(row["open"]) return float(row["low"]) <= trail_price, trail_price trail_price = entry * (1.0 - (peak - variant.trail_giveback_pct) / LEVERAGE) if float(row["open"]) >= trail_price: return True, float(row["open"]) return float(row["high"]) >= trail_price, trail_price def pass_btc_filter(row: pd.Series, variant: Variant) -> bool: if variant.btc_filter == "none": return True if variant.btc_filter == "btc-up": return float(row["btc_close"]) > float(row["btc_sma"]) if variant.btc_filter == "btc-up-momo": return float(row["btc_close"]) > float(row["btc_sma"]) and float(row["btc_momentum"]) > 0.0 raise ValueError(f"unknown btc_filter: {variant.btc_filter}") def run_variant(frame: pd.DataFrame, variant: Variant) -> dict[str, object]: warmup = max(variant.band_length, variant.bandwidth_lookback, 480, 96) cash = INITIAL_EQUITY position: dict[str, object] | None = None pending_entry_side: str | None = None pending_partial_take = False pending_readd = False pending_reentry_side: str | None = None cooldown_until = -1 trades: list[dict[str, object]] = [] events: list[dict[str, object]] = [] equity_rows: list[dict[str, object]] = [] for index in range(warmup, len(frame)): row = frame.iloc[index] ts = int(row["ts"]) open_price = float(row["open"]) close_price = float(row["close"]) acted = False if pending_reentry_side is not None and position is None and index >= cooldown_until and cash > 1.0: margin = cash cash -= margin position = { "side": pending_reentry_side, "entry_ts": ts, "legs": [{"margin": margin, "entry_price": open_price}], "stop_price": open_price * (1.0 - variant.stop_loss_pct if pending_reentry_side == "long" else 1.0 + variant.stop_loss_pct), "partial_taken": False, "peak_profit_pct": 0.0, } events.append({"ts": ts, "event": f"reentry_{pending_reentry_side}", "margin": margin}) pending_reentry_side = None acted = True if not acted and pending_partial_take and position is not None: before = position_margin(position) cash, position = close_fraction( cash=cash, position=position, fraction=variant.take_fraction, exit_price=open_price, ts=ts, reason="partial_take", trades=trades, ) if position is not None: position["partial_taken"] = True events.append({"ts": ts, "event": "partial_take", "margin": before * variant.take_fraction}) pending_partial_take = False acted = True if not acted and pending_readd and position is not None: equity = account_equity(cash, position, open_price) missing = max(0.0, equity - position_margin(position)) add = min(cash, missing) if add > 1.0: cash -= add position = add_leg(position, add, open_price) events.append({"ts": ts, "event": "t_readd", "margin": add}) pending_readd = False acted = True if not acted and pending_entry_side is not None and position is None and cash > 1.0: margin = cash cash -= margin position = { "side": pending_entry_side, "entry_ts": ts, "legs": [{"margin": margin, "entry_price": open_price}], "stop_price": open_price * (1.0 - variant.stop_loss_pct if pending_entry_side == "long" else 1.0 + variant.stop_loss_pct), "partial_taken": False, "peak_profit_pct": 0.0, } events.append({"ts": ts, "event": f"entry_{pending_entry_side}", "margin": margin}) pending_entry_side = None acted = True if not acted and position is not None: hit, exit_price = stop_hit(position, row) if hit: side = str(position["side"]) before = position_margin(position) cash, position = close_fraction( cash=cash, position=position, fraction=1.0, exit_price=exit_price, ts=ts, reason="stop", trades=trades, ) events.append({"ts": ts, "event": "stop", "margin": before}) cooldown_until = index + variant.cooldown_bars if variant.reentry_after_exit: pending_reentry_side = side acted = True if not acted and position is not None: position["peak_profit_pct"] = max(float(position["peak_profit_pct"]), intrabar_profit_pct(position, row)) hit, exit_price = trail_hit(position, row, variant) if hit: side = str(position["side"]) before = position_margin(position) cash, position = close_fraction( cash=cash, position=position, fraction=1.0, exit_price=exit_price, ts=ts, reason="trail", trades=trades, ) events.append({"ts": ts, "event": "trail", "margin": before}) cooldown_until = index + variant.cooldown_bars if variant.reentry_after_exit: pending_reentry_side = side acted = True equity = account_equity(cash, position, close_price) equity_rows.append( { "ts": ts, "time": pd.to_datetime(ts, unit="ms", utc=True), "equity": equity, "cash": cash, "position_margin": position_margin(position), "position_side": "flat" if position is None else str(position["side"]), } ) if index == len(frame) - 1 or equity <= 0.0: continue needed = ("middle", "upper", "lower", "bandwidth", "bandwidth_threshold", "btc_sma", "btc_momentum", "eth_vol_96") if any(float(row[key]) != float(row[key]) for key in needed): continue if acted: continue if position is not None: if not bool(position["partial_taken"]) and profit_pct(position, close_price) >= variant.take_trigger_pct: pending_partial_take = True continue if ( variant.readd_drawdown_pct is not None and bool(position["partial_taken"]) and profit_pct(position, close_price) <= float(position["peak_profit_pct"]) - variant.readd_drawdown_pct ): pending_readd = True continue if index < cooldown_until or float(row["eth_vol_96"]) > variant.eth_vol_cap or not pass_btc_filter(row, variant): continue if float(row["bandwidth"]) > float(row["bandwidth_threshold"]): continue if close_price > float(row["upper"]): pending_entry_side = "long" elif variant.side_mode == "both" and close_price < float(row["lower"]): pending_entry_side = "short" return { "variant": variant, "equity": pd.DataFrame(equity_rows), "trades": pd.DataFrame(trades), "events": pd.DataFrame(events), } def max_drawdown(equity: pd.Series) -> float: if equity.empty: return 0.0 peak = equity.cummax() return float(((peak - equity) / peak).max()) def scoped_metrics(equity: pd.DataFrame, trades: pd.DataFrame, label: str, offset: pd.DateOffset | None) -> dict[str, object]: end = pd.Timestamp(equity["time"].iloc[-1]) if offset is None: scoped = equity[["time", "equity"]].copy() else: cutoff = end - offset before = equity[equity["time"] <= cutoff] start_equity = float(before["equity"].iloc[-1]) if len(before) else float(equity["equity"].iloc[0]) after = equity[equity["time"] > cutoff] scoped = pd.concat([pd.DataFrame([{"time": cutoff, "equity": start_equity}]), after[["time", "equity"]]], ignore_index=True) start = pd.Timestamp(scoped["time"].iloc[0]) years = max((end - start).total_seconds() / 86_400 / 365, 1e-9) total_return = float(scoped["equity"].iloc[-1] / scoped["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0 scoped_trades = trades.iloc[0:0] if not trades.empty: exit_times = pd.to_datetime(trades["exit_ts"], unit="ms", utc=True) scoped_trades = trades[(exit_times > start) & (exit_times <= end)] returns = scoped_trades["return_pct"].astype(float) / 100.0 if len(scoped_trades) else pd.Series(dtype=float) wins = returns[returns > 0.0] losses = returns[returns < 0.0] gains = float(wins.sum()) loss_sum = abs(float(losses.sum())) avg_win = float(wins.mean()) if len(wins) else 0.0 avg_loss = abs(float(losses.mean())) if len(losses) else 0.0 dd = max_drawdown(scoped["equity"].astype(float)) reasons = scoped_trades["exit_reason"].value_counts().to_dict() if len(scoped_trades) else {} return { "horizon": label, "start": start.strftime("%Y-%m-%d %H:%M"), "end": end.strftime("%Y-%m-%d %H:%M"), "total_return": total_return, "annualized_return": annualized, "max_drawdown": dd, "calmar": annualized / dd if dd else 0.0, "trades": int(len(scoped_trades)), "win_rate": float((returns > 0.0).mean()) if len(returns) else 0.0, "profit_factor": gains / loss_sum if loss_sum > 0.0 else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss > 0.0 else 0.0, "partial_take_exits": int(reasons.get("partial_take", 0)), "trail_exits": int(reasons.get("trail", 0)), "stop_exits": int(reasons.get("stop", 0)), } def build_variants() -> list[Variant]: variants: list[Variant] = [] entries = ( (48, 960, 0.25, "both", "none", 0.006, 0.010), (48, 960, 0.20, "both", "none", 0.006, 0.010), (96, 960, 0.25, "both", "btc-up", 0.006, 0.012), (96, 960, 0.25, "both", "btc-up-momo", 0.006, 0.012), ) overlays = ( (0.012, 0.30, 0.018, 0.010, None, False), (0.012, 0.30, 0.018, 0.010, 0.006, False), (0.012, 0.30, 0.018, 0.010, 0.006, True), (0.018, 0.25, 0.024, 0.012, None, False), (0.018, 0.25, 0.024, 0.012, 0.008, False), (0.018, 0.25, 0.024, 0.012, 0.008, True), (0.024, 0.20, 0.030, 0.015, None, False), (0.024, 0.20, 0.030, 0.015, 0.010, True), ) for band, lookback, quantile, side_mode, btc_filter, vol_cap, stop in entries: for take, fraction, trail, giveback, readd, reentry in overlays: name = ( f"bb-l{band}-bw{lookback}-q{quantile:g}-{side_mode}-{btc_filter}" f"-sl{stop:g}-pt{take:g}-f{fraction:g}-tr{trail:g}-{giveback:g}" f"-readd{'none' if readd is None else f'{readd:g}'}-reentry{int(reentry)}" ) variants.append( Variant( name=name, band_length=band, bandwidth_lookback=lookback, bandwidth_quantile=quantile, side_mode=side_mode, btc_filter=btc_filter, eth_vol_cap=vol_cap, stop_loss_pct=stop, cooldown_bars=24, take_trigger_pct=take, take_fraction=fraction, trail_trigger_pct=trail, trail_giveback_pct=giveback, readd_drawdown_pct=readd, reentry_after_exit=reentry, ) ) return variants def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def write_report(summary: pd.DataFrame, horizons: pd.DataFrame, events: pd.DataFrame, paths: list[Path], command: str) -> str: ranked = summary.sort_values(["recent_min_return", "recent_min_calmar", "full_calmar"], ascending=[False, False, False]) best_name = str(ranked.iloc[0]["variant"]) best_horizons = horizons[horizons["variant"] == best_name] return ( "# ETH partial-take runner exploration\n\n" "Scope: offline local OKX 15m candle research only. No live executor, private API, env, or service file was touched.\n\n" f"Run command: `{command}`\n\n" "Model: BB squeeze entry, single-position contract-equity approximation, partial profit take, retained runner with trailing exit, optional drawdown re-add, optional post-exit re-entry. Each candle can execute at most one pending or risk action.\n\n" "Output files:\n" + "\n".join(f"- `{path}`" for path in paths) + "\n\n" "## Ranking\n\n" + markdown_table( ranked[ [ "variant", "full_total_return", "full_annualized_return", "full_max_drawdown", "full_calmar", "recent_min_return", "recent_min_calmar", "trades", "win_rate", "profit_factor", "payoff_ratio", "partial_take_events", "t_readd_events", "reentry_events", ] ].head(12) ) + "\n\n" "## Best Variant Horizons\n\n" + markdown_table( best_horizons[ [ "horizon", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "win_rate", "profit_factor", "payoff_ratio", "partial_take_exits", "trail_exits", "stop_exits", ] ] ) + "\n\n" "## Event Counts\n\n" + markdown_table(events.sort_values(["partial_take_events", "t_readd_events"], ascending=[False, False]).head(12)) + "\n" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth, btc = align_pair(load_candles(ETH_SYMBOL, args.bar), load_candles(BTC_SYMBOL, args.bar)) requested_bars = int(args.years * 365 * 24 * 60 / 15) eth = eth[-requested_bars:] btc = btc[-requested_bars:] summary_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] event_rows: list[dict[str, object]] = [] trade_frames: list[pd.DataFrame] = [] variants = build_variants() for index, variant in enumerate(variants, start=1): frame = build_frame(eth, btc, variant) result = run_variant(frame, variant) equity = result["equity"] trades = result["trades"] events = result["events"] if equity.empty: continue if not trades.empty: trades = trades.copy() trades["variant"] = variant.name trade_frames.append(trades) horizon_metrics = [scoped_metrics(equity, trades, label, offset) for label, offset in HORIZONS] for row in horizon_metrics: horizon_rows.append({"variant": variant.name, **row}) by_horizon = {row["horizon"]: row for row in horizon_metrics} event_counts = events["event"].value_counts().to_dict() if not events.empty else {} event_row = { "variant": variant.name, "entry_events": int(sum(value for key, value in event_counts.items() if str(key).startswith("entry_"))), "partial_take_events": int(event_counts.get("partial_take", 0)), "t_readd_events": int(event_counts.get("t_readd", 0)), "reentry_events": int(sum(value for key, value in event_counts.items() if str(key).startswith("reentry_"))), "trail_events": int(event_counts.get("trail", 0)), "stop_events": int(event_counts.get("stop", 0)), } event_rows.append(event_row) recent_returns = [float(by_horizon[label]["total_return"]) for label in ("1y", "6m", "3m", "30d")] recent_calmars = [float(by_horizon[label]["calmar"]) for label in ("1y", "6m", "3m", "30d")] summary_rows.append( { "variant": variant.name, "bar": args.bar, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "band_length": variant.band_length, "bandwidth_lookback": variant.bandwidth_lookback, "bandwidth_quantile": variant.bandwidth_quantile, "side_mode": variant.side_mode, "btc_filter": variant.btc_filter, "eth_vol_cap": variant.eth_vol_cap, "stop_loss_pct": variant.stop_loss_pct, "take_trigger_pct": variant.take_trigger_pct, "take_fraction": variant.take_fraction, "trail_trigger_pct": variant.trail_trigger_pct, "trail_giveback_pct": variant.trail_giveback_pct, "readd_drawdown_pct": variant.readd_drawdown_pct, "reentry_after_exit": variant.reentry_after_exit, "full_total_return": by_horizon["full"]["total_return"], "full_annualized_return": by_horizon["full"]["annualized_return"], "full_max_drawdown": by_horizon["full"]["max_drawdown"], "full_calmar": by_horizon["full"]["calmar"], "trades": by_horizon["full"]["trades"], "win_rate": by_horizon["full"]["win_rate"], "profit_factor": by_horizon["full"]["profit_factor"], "payoff_ratio": by_horizon["full"]["payoff_ratio"], "recent_min_return": min(recent_returns), "recent_min_calmar": min(recent_calmars), **event_row, } ) print(f"done {index}/{len(variants)} {variant.name}", flush=True) summary = pd.DataFrame(summary_rows).sort_values(["recent_min_return", "recent_min_calmar", "full_calmar"], ascending=[False, False, False]) horizons = pd.DataFrame(horizon_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizons = horizons.sort_values(["variant", "horizon"]) events = pd.DataFrame(event_rows) trades_out = pd.concat(trade_frames, ignore_index=True) if trade_frames else pd.DataFrame() args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / f"{PREFIX}-summary.csv" horizons_path = args.output_dir / f"{PREFIX}-horizons.csv" events_path = args.output_dir / f"{PREFIX}-events.csv" trades_path = args.output_dir / f"{PREFIX}-trades.csv" report_path = args.output_dir / f"{PREFIX}-report.md" paths = [summary_path, horizons_path, events_path, trades_path, report_path] summary.to_csv(summary_path, index=False) horizons.to_csv(horizons_path, index=False) events.to_csv(events_path, index=False) trades_out.to_csv(trades_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years} --output-dir {args.output_dir.as_posix()}" report_path.write_text(write_report(summary, horizons, events, paths, command), encoding="utf-8") print(summary.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())