from __future__ import annotations import argparse from dataclasses import dataclass from math import sqrt from pathlib import Path import pandas as pd from okx_codex_trader.models import Candle from okx_codex_trader.rsi2_report import _compute_rsi from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity SYMBOL = "ETH-USDT-SWAP" BAR = "15m" LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class RiskConfig: entry_mode: str trend_sma: int rsi_threshold: float exit_rsi: float stop_mode: str stop_loss_pct: float atr_length: int atr_multiple: float atr_floor_pct: float atr_ceiling_pct: float first_bar_stop_pct: float max_hold_bars: int take_profit_pct: float take_profit_fraction: float max_gap_pct: float max_range_pct: float entry_offsets: tuple[float, ...] entry_valid_bars: int def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def _load_candles(symbol: str, bar: str) -> list[Candle]: path = DATA_DIR / symbol / f"{bar}.csv" frame = pd.read_csv(path) return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def _atr(candles: list[Candle], length: int) -> list[float]: rows = [] previous_close = None for candle in candles: if previous_close is None: rows.append(candle.high - candle.low) else: rows.append(max(candle.high - candle.low, abs(candle.high - previous_close), abs(candle.low - previous_close))) previous_close = candle.close return pd.Series(rows, dtype=float).rolling(length).mean().tolist() def _stop_pct(config: RiskConfig, atr_value: float, entry_price: float) -> float: if config.stop_mode == "none": return 0.0 if config.stop_mode == "fixed": return config.stop_loss_pct raw = config.atr_multiple * atr_value / entry_price return min(max(raw, config.atr_floor_pct), config.atr_ceiling_pct) def _append_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, margin_used: float, account_equity: float, ) -> tuple[float, bool]: exit_equity = trade_equity( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return account_equity + pnl, pnl > 0.0 def run_risk_exit_segment(candles: list[Candle], config: RiskConfig) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(config.trend_sma).mean().tolist() rsi_values = _compute_rsi(closes, 2) atr_values = _atr(candles, config.atr_length) warmup_bars = max(config.trend_sma, config.atr_length, 3) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_market_entry = False pending_limits: list[dict[str, float | int]] = [] pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _append_trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, margin_used=float(position["margin_used"]), account_equity=equity, ) wins += int(won) position = None pending_exit = False pending_limits = [] if pending_market_entry and position is None and equity > 0.0: stop_pct = _stop_pct(config, atr_values[index], candle.open) position = { "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_pct": stop_pct, "stop_price": candle.open * (1.0 - stop_pct) if stop_pct else 0.0, "first_bar_stop_price": candle.open * (1.0 - config.first_bar_stop_pct) if config.first_bar_stop_pct else 0.0, "took_profit": False, } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_market_entry = False active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price and equity > 0.0: slice_margin = equity / len(config.entry_offsets) if position is None: stop_pct = _stop_pct(config, atr_values[index], limit_price) position = { "entry_time": candle.ts, "entry_price": limit_price, "entry_index": index, "margin_used": slice_margin, "stop_pct": stop_pct, "stop_price": limit_price * (1.0 - stop_pct) if stop_pct else 0.0, "first_bar_stop_price": limit_price * (1.0 - config.first_bar_stop_pct) if config.first_bar_stop_pct else 0.0, "took_profit": False, } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin stop_pct = _stop_pct(config, atr_values[index], entry_price) position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_pct"] = stop_pct position["stop_price"] = entry_price * (1.0 - stop_pct) if stop_pct else 0.0 position["first_bar_stop_price"] = entry_price * (1.0 - config.first_bar_stop_pct) if config.first_bar_stop_pct else 0.0 entries.append({"ts": candle.ts, "price": limit_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None: stop_price = float(position["stop_price"]) first_bar_stop = float(position["first_bar_stop_price"]) if first_bar_stop and index == int(position["entry_index"]): stop_price = max(stop_price, first_bar_stop) if stop_price else first_bar_stop take_profit_price = float(position["entry_price"]) * (1.0 + config.take_profit_pct) if config.take_profit_pct else 0.0 if stop_price and candle.low <= stop_price: equity, won = _append_trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=stop_price, margin_used=float(position["margin_used"]), account_equity=equity, ) wins += int(won) current_equity = equity position = None pending_limits = [] elif take_profit_price and not bool(position["took_profit"]) and candle.high >= take_profit_price: close_margin = float(position["margin_used"]) * config.take_profit_fraction equity, won = _append_trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=take_profit_price, margin_used=close_margin, account_equity=equity, ) wins += int(won) position["margin_used"] = float(position["margin_used"]) - close_margin position["took_profit"] = True current_equity = equity if float(position["margin_used"]) <= 0.0: position = None pending_limits = [] if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] current_atr = atr_values[index] if current_rsi != current_rsi or current_trend != current_trend or current_atr != current_atr: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= config.exit_rsi or held_bars >= config.max_hold_bars: pending_exit = True pending_limits = [] continue if pending_limits: continue previous_close = candles[index - 1].close gap_pct = abs(candle.open / previous_close - 1.0) range_pct = (candle.high - candle.low) / candle.close filter_passed = (not config.max_gap_pct or gap_pct <= config.max_gap_pct) and ( not config.max_range_pct or range_pct <= config.max_range_pct ) if filter_passed and candle.close > float(current_trend) and current_rsi <= config.rsi_threshold: if config.entry_mode == "market": pending_market_entry = True else: pending_limits = [ {"price": candle.close * (1.0 - offset), "expires_index": index + config.entry_valid_bars} for offset in config.entry_offsets ] return SegmentResult( trade_count=len(trades), total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / len(trades) if trades else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def cost_adjusted_equity_frame(result: SegmentResult, roundtrip_cost: float) -> pd.DataFrame: frame = pd.DataFrame(result.equity_curve) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) frame["gross_equity"] = frame["equity"].astype(float) frame["cost_factor"] = 1.0 if result.trades: trade_frame = pd.DataFrame(result.trades) trade_frame["ts"] = pd.to_datetime(trade_frame["exit_time"], utc=True) trade_frame["factor"] = 1.0 - roundtrip_cost * trade_frame["cost_weight"].astype(float) cost_by_ts = trade_frame.groupby("ts")["factor"].prod() frame["cost_factor"] = frame["ts"].map(cost_by_ts).fillna(1.0) frame["cost_factor"] = frame["cost_factor"].cumprod() frame["equity"] = frame["gross_equity"] * frame["cost_factor"] return frame[["ts", "equity"]] def max_drawdown(values: list[float]) -> float: peak = values[0] drawdown = 0.0 for value in values: peak = max(peak, value) drawdown = max(drawdown, (peak - value) / peak) return drawdown def annualized_metrics(frame: pd.DataFrame) -> dict[str, float]: years = (frame["ts"].iloc[-1] - frame["ts"].iloc[0]).total_seconds() / 86_400 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 daily = frame.set_index("ts")["equity"].resample("1D").last().ffill() daily_returns = daily.pct_change().dropna() daily_std = float(daily_returns.std(ddof=1)) if len(daily_returns) > 1 else 0.0 sharpe = float(daily_returns.mean()) / daily_std * sqrt(365) if daily_std else 0.0 drawdown = max_drawdown([float(value) for value in frame["equity"]]) return { "net_total_return": total_return, "net_annualized_return": annualized_return, "net_max_drawdown": drawdown, "net_calmar": annualized_return / drawdown if drawdown else 0.0, "net_sharpe_daily": sharpe, } def horizon_metrics(frame: pd.DataFrame) -> list[dict[str, object]]: rows = [] end_time = frame["ts"].iloc[-1] for label, offset in HORIZONS: cutoff = end_time - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) segment = pd.concat( [ pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) else: segment = frame[["ts", "equity"]].copy() cutoff = segment["ts"].iloc[0] rows.append( { "horizon": label, "horizon_start": cutoff.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), "horizon_days": (end_time - cutoff).total_seconds() / 86_400, **annualized_metrics(segment), } ) return rows def build_configs() -> list[RiskConfig]: base = { "trend_sma": 50, "rsi_threshold": 3.0, "atr_length": 14, "entry_offsets": (0.001, 0.002, 0.003), "entry_valid_bars": 4, } configs: list[RiskConfig] = [] for entry_mode in ("market", "price_twap"): configs.append( RiskConfig( entry_mode=entry_mode, exit_rsi=45.0, stop_mode="none", stop_loss_pct=0.0, atr_multiple=0.0, atr_floor_pct=0.0, atr_ceiling_pct=0.0, first_bar_stop_pct=0.0, max_hold_bars=10_000, take_profit_pct=0.0, take_profit_fraction=0.0, max_gap_pct=0.0, max_range_pct=0.0, **base, ) ) for exit_rsi in (35.0, 45.0): for max_hold in (24, 48, 96): for stop_mode, stop, atr_mult, floor, ceiling in ( ("fixed", 0.006, 0.0, 0.0, 0.0), ("atr", 0.0, 1.2, 0.004, 0.012), ("atr", 0.0, 1.6, 0.005, 0.016), ): configs.append( RiskConfig( entry_mode=entry_mode, exit_rsi=exit_rsi, stop_mode=stop_mode, stop_loss_pct=stop, atr_multiple=atr_mult, atr_floor_pct=floor, atr_ceiling_pct=ceiling, first_bar_stop_pct=0.0, max_hold_bars=max_hold, take_profit_pct=0.0, take_profit_fraction=0.0, max_gap_pct=0.0, max_range_pct=0.0, **base, ) ) for first_bar_stop in (0.004, 0.006): configs.append( RiskConfig( entry_mode=entry_mode, exit_rsi=45.0, stop_mode="atr", stop_loss_pct=0.0, atr_multiple=1.2, atr_floor_pct=0.004, atr_ceiling_pct=0.012, first_bar_stop_pct=first_bar_stop, max_hold_bars=48, take_profit_pct=0.0, take_profit_fraction=0.0, max_gap_pct=0.0, max_range_pct=0.0, **base, ) ) for take_profit in (0.008, 0.012): configs.append( RiskConfig( entry_mode=entry_mode, exit_rsi=45.0, stop_mode="atr", stop_loss_pct=0.0, atr_multiple=1.2, atr_floor_pct=0.004, atr_ceiling_pct=0.012, first_bar_stop_pct=0.004, max_hold_bars=48, take_profit_pct=take_profit, take_profit_fraction=0.5, max_gap_pct=0.0, max_range_pct=0.0, **base, ) ) for max_gap, max_range in ((0.006, 0.018), (0.010, 0.024)): configs.append( RiskConfig( entry_mode=entry_mode, exit_rsi=45.0, stop_mode="atr", stop_loss_pct=0.0, atr_multiple=1.2, atr_floor_pct=0.004, atr_ceiling_pct=0.012, first_bar_stop_pct=0.004, max_hold_bars=48, take_profit_pct=0.0, take_profit_fraction=0.0, max_gap_pct=max_gap, max_range_pct=max_range, **base, ) ) return configs def config_name(config: RiskConfig) -> str: stop = config.stop_mode if config.stop_mode == "fixed": stop += f"{config.stop_loss_pct:.4f}" if config.stop_mode == "atr": stop += f"{config.atr_multiple:.1f}-{config.atr_floor_pct:.4f}-{config.atr_ceiling_pct:.4f}" parts = [ config.entry_mode, f"t{config.trend_sma}", f"r{config.rsi_threshold:.1f}", f"x{config.exit_rsi:.1f}", f"stop-{stop}", f"mh{config.max_hold_bars}", ] if config.first_bar_stop_pct: parts.append(f"fb{config.first_bar_stop_pct:.4f}") if config.take_profit_pct: parts.append(f"tp{config.take_profit_pct:.4f}p{config.take_profit_fraction:.2f}") if config.max_gap_pct or config.max_range_pct: parts.append(f"gap{config.max_gap_pct:.4f}range{config.max_range_pct:.4f}") return "-".join(parts) def run_search() -> tuple[pd.DataFrame, pd.DataFrame]: candles = _load_candles(SYMBOL, BAR) total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] configs = build_configs() for index, config in enumerate(configs, start=1): name = config_name(config) result = run_risk_exit_segment(candles, config) for cost_name, cost in COSTS.items(): equity = cost_adjusted_equity_frame(result, cost) metrics = annualized_metrics(equity) total_rows.append( { "symbol": SYMBOL, "bar": BAR, "name": name, "cost_scenario": cost_name, "roundtrip_cost_on_margin": cost, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "trades": result.trade_count, "gross_total_return": result.total_return, "gross_max_drawdown": result.max_drawdown, "win_rate": result.win_rate, **metrics, } ) for row in horizon_metrics(equity): horizon_rows.append( { "symbol": SYMBOL, "bar": BAR, "name": name, "cost_scenario": cost_name, "roundtrip_cost_on_margin": cost, "trades": result.trade_count, **row, } ) print(f"done {index}/{len(configs)} {name}") totals = pd.DataFrame(total_rows).sort_values( ["cost_scenario", "net_calmar", "net_annualized_return"], ascending=[True, False, False], ) horizons = pd.DataFrame(horizon_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], ["3y", "1y", "6m", "3m"], ordered=True) horizons = horizons.sort_values( ["cost_scenario", "horizon", "net_annualized_return"], ascending=[True, True, False], ) return totals, horizons def _markdown_table(frame: pd.DataFrame) -> str: columns = [str(column) for column in frame.columns] rows = [columns, ["---"] * len(columns)] for row in frame.itertuples(index=False): rows.append([str(value) for value in row]) return "\n".join("| " + " | ".join(row) + " |" for row in rows) def write_summary(totals: pd.DataFrame, horizons: pd.DataFrame, output_dir: Path) -> None: main = totals[totals["cost_scenario"] == "maker_taker"].copy() top = main.sort_values(["net_calmar", "net_annualized_return"], ascending=False).head(10) market_baseline = main[main["name"] == "market-t50-r3.0-x45.0-stop-none-mh10000"].iloc[0] twap_baseline = main[main["name"] == "price_twap-t50-r3.0-x45.0-stop-none-mh10000"].iloc[0] lines = [ "# ETH risk/exit exploration", "", "Fixed entry baseline: ETH 15m RSI2 long, trend SMA 50, RSI <= 3, exit RSI baseline 45. Variants only change risk/exit rules or optional gap/range risk filters. Main ranking uses maker_taker cost 0.0021.", "", "## Baselines", "", f"- market: net_annualized_return={market_baseline['net_annualized_return']:.6f}, net_max_drawdown={market_baseline['net_max_drawdown']:.6f}, net_calmar={market_baseline['net_calmar']:.6f}", f"- price_twap: net_annualized_return={twap_baseline['net_annualized_return']:.6f}, net_max_drawdown={twap_baseline['net_max_drawdown']:.6f}, net_calmar={twap_baseline['net_calmar']:.6f}", "", "## Top 10 maker_taker candidates", "", _markdown_table(top[ [ "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "gross_max_drawdown", "win_rate", ] ]), "", "## Drawdown/recent-return read", "", ] best = top.iloc[0] baseline = twap_baseline if str(best["name"]).startswith("price_twap") else market_baseline drawdown_lower = float(best["net_max_drawdown"]) < float(baseline["net_max_drawdown"]) recent = horizons[ (horizons["cost_scenario"] == "maker_taker") & (horizons["name"].isin([best["name"], baseline["name"]])) & (horizons["horizon"].isin(["1y", "6m", "3m"])) ][["name", "horizon", "net_total_return", "net_annualized_return", "net_max_drawdown"]] lines.append( f"Best candidate versus its entry-mode baseline: drawdown_lower={drawdown_lower}; " f"annualized_return_delta={float(best['net_annualized_return']) - float(baseline['net_annualized_return']):.6f}." ) lines.extend(["", _markdown_table(recent), ""]) output_dir.mkdir(parents=True, exist_ok=True) (output_dir / "eth-risk-exit-summary.md").write_text("\n".join(lines), encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() output_dir = args.output_dir output_dir.mkdir(parents=True, exist_ok=True) totals, horizons = run_search() totals.to_csv(output_dir / "eth-risk-exit-total.csv", index=False) horizons.to_csv(output_dir / "eth-risk-exit-horizon.csv", index=False) top = totals[totals["cost_scenario"] == "maker_taker"].sort_values( ["net_calmar", "net_annualized_return"], ascending=False ) top.head(10).to_csv(output_dir / "eth-risk-exit-top10.csv", index=False) write_summary(totals, horizons, output_dir) print(top.head(10)[["name", "trades", "net_annualized_return", "net_max_drawdown", "net_calmar"]].to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())