from __future__ import annotations import argparse import sys from itertools import product from pathlib import Path from typing import Iterable import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts.explore_ultrashort import ( INITIAL_EQUITY, LEVERAGE, _format_ts, _compute_rsi, annualized_metrics_from_equity, get_candles_cached, history_bars_for_years, recent_horizon_metrics_from_equity, ) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient from okx_codex_trader.sampled_report import SegmentResult, mark_to_market SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 3.0 OUTPUT_DIR = Path("reports/eth-exploration") COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) ENTRY_OFFSET_SETS = ( (0.001, 0.003, 0.005), (0.003, 0.006, 0.009), ) def candidate_specs() -> Iterable[dict[str, object]]: for ( trend_sma, rsi_threshold, exit_rsi, stop_loss_pct, max_hold_bars, entry_offsets, entry_valid_bars, fill_buffer, ) in product( (80, 160), (5.0, 8.0, 10.0), (50.0, 55.0), (0.008, 0.012), (48, 96), ENTRY_OFFSET_SETS, (2, 4), (0.0, 0.0002), ): yield { "trend_sma": trend_sma, "rsi_threshold": rsi_threshold, "exit_rsi": exit_rsi, "stop_loss_pct": stop_loss_pct, "max_hold_bars": max_hold_bars, "entry_offsets": entry_offsets, "entry_valid_bars": entry_valid_bars, "fill_buffer": fill_buffer, } def annualized_return(total_return: float, first_ts: int, last_ts: int) -> float: years = (last_ts - first_ts) / 86_400_000 / 365 return (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 def strategy_name(spec: dict[str, object]) -> str: offsets = "-".join(f"{offset:.4f}" for offset in tuple(spec["entry_offsets"])) buffer_label = f"-fb{float(spec['fill_buffer']):.4f}" if float(spec["fill_buffer"]) else "" return ( f"rsi2-long-guarded-price-twap-o{offsets}-v{spec['entry_valid_bars']}{buffer_label}" f"-t{spec['trend_sma']}-l{spec['rsi_threshold']}-x{spec['exit_rsi']}" f"-sl{spec['stop_loss_pct']}-mh{spec['max_hold_bars']}" ) def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: Candle, exit_price: float, roundtrip_cost_on_margin: float, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = mark_to_market( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), mark_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used cost = margin_used * roundtrip_cost_on_margin net_pnl = pnl - cost trades.append( { "side": "Long", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(net_pnl, 4), "return_pct": round(net_pnl / account_equity * 100, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return account_equity + net_pnl, net_pnl > 0.0 def run_price_twap_segment( *, candles: list[Candle], spec: dict[str, object], roundtrip_cost_on_margin: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(int(spec["trend_sma"])).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False warmup_bars = max(int(spec["trend_sma"]), 3) entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open, roundtrip_cost_on_margin=roundtrip_cost_on_margin, ) wins += 1 if won else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - float(spec["fill_buffer"])) and equity > 0.0: slice_margin = equity / len(entry_offsets) if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": limit_price, "entry_index": index, "margin_used": slice_margin, "stop_price": limit_price * (1 - float(spec["stop_loss_pct"])), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1 - float(spec["stop_loss_pct"])) entries.append({"ts": candle.ts, "price": limit_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]), roundtrip_cost_on_margin=roundtrip_cost_on_margin, ) wins += 1 if won else 0 current_equity = equity position = None pending_limits = [] if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= float(spec["exit_rsi"]) or held_bars >= int(spec["max_hold_bars"]): pending_exit = True pending_limits = [] continue if not pending_limits and candle.close > float(current_trend) and current_rsi <= float(spec["rsi_threshold"]): pending_limits = [ { "price": candle.close * (1.0 - offset), "expires_index": index + int(spec["entry_valid_bars"]), } for offset in entry_offsets ] trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def equity_frame(result: SegmentResult) -> pd.DataFrame: frame = pd.DataFrame(result.equity_curve) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame[["ts", "equity"]] def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] header = "| " + " | ".join(columns) + " |" separator = "| " + " | ".join("---" for _ in columns) + " |" body = ["| " + " | ".join(row) + " |" for row in rows] return "\n".join([header, separator, *body]) def run_search(max_candidates: int | None) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: client = OkxClient() candles = get_candles_cached(client, SYMBOL, BAR, history_bars_for_years(BAR, YEARS)) rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] specs = list(candidate_specs()) if max_candidates is not None: specs = specs[:max_candidates] for index, spec in enumerate(specs, start=1): name = strategy_name(spec) gross_result = run_price_twap_segment(candles=candles, spec=spec, roundtrip_cost_on_margin=0.0) gross_annualized = annualized_return(gross_result.total_return, candles[0].ts, candles[-1].ts) for cost_label, roundtrip_cost in COSTS.items(): result = run_price_twap_segment(candles=candles, spec=spec, roundtrip_cost_on_margin=roundtrip_cost) net_equity = equity_frame(result) metrics = annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts) rows.append( { "symbol": SYMBOL, "bar": BAR, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, "name": name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "actual_bars": len(candles), "trades": result.trade_count, "gross_total_return": gross_result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": gross_result.max_drawdown, **spec, "entry_offsets": "-".join(f"{value:.4f}" for value in tuple(spec["entry_offsets"])), **metrics, } ) horizon_frame = recent_horizon_metrics_from_equity(net_equity, candles[-1].ts, HORIZONS) for horizon_row in horizon_frame.to_dict("records"): horizon_rows.append( { "symbol": SYMBOL, "bar": BAR, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, "name": name, "trades": result.trade_count, **spec, "entry_offsets": "-".join(f"{value:.4f}" for value in tuple(spec["entry_offsets"])), **horizon_row, } ) print(f"{index}/{len(specs)} {name} trades={gross_result.trade_count}") all_results = pd.DataFrame(rows) horizons = pd.DataFrame(horizon_rows) all_results = all_results.sort_values( ["cost_model", "net_calmar", "net_annualized_return"], ascending=[True, False, False], ) maker_taker = all_results[all_results["cost_model"] == "maker_taker"].sort_values( ["net_calmar", "net_annualized_return"], ascending=False, ) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizons = horizons.sort_values( ["cost_model", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False], ) return all_results, maker_taker, horizons def markdown_summary(maker_taker: pd.DataFrame, horizons: pd.DataFrame) -> str: top10 = maker_taker.head(10) top_names = set(top10["name"]) horizon_top = horizons[ (horizons["cost_model"] == "maker_taker") & (horizons["name"].isin(top_names)) ].sort_values(["name", "horizon"]) columns = [ "name", "trades", "trend_sma", "rsi_threshold", "exit_rsi", "stop_loss_pct", "max_hold_bars", "entry_offsets", "entry_valid_bars", "fill_buffer", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ] return "\n".join( [ "# ETH price-TWAP variant search", "", "Primary sort: maker_taker by net_calmar, then net_annualized_return.", "", "## Top 10 maker_taker candidates", "", markdown_table(top10, columns), "", "## Recent horizons for top 10", "", markdown_table( horizon_top, [ "name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ], ), "", "## Next narrowing direction", "", "Favor the parameter cluster shared by the top maker_taker rows: keep the strongest trend_sma/rsi_threshold/offset combinations, then rerun a narrower grid around adjacent stop_loss_pct, max_hold_bars, entry_valid_bars, and fill_buffer values.", "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--max-candidates", type=int, default=None) args = parser.parse_args() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) all_results, maker_taker, horizons = run_search(args.max_candidates) all_path = OUTPUT_DIR / "eth-price-twap-search.csv" top_path = OUTPUT_DIR / "eth-price-twap-top10.csv" horizon_path = OUTPUT_DIR / "eth-price-twap-horizons.csv" summary_path = OUTPUT_DIR / "eth-price-twap-summary.md" all_results.to_csv(all_path, index=False) maker_taker.head(10).to_csv(top_path, index=False) horizons.to_csv(horizon_path, index=False) summary_path.write_text(markdown_summary(maker_taker, horizons), encoding="utf-8") print(f"wrote {all_path}") print(f"wrote {top_path}") print(f"wrote {horizon_path}") print(f"wrote {summary_path}") print(maker_taker.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())