from __future__ import annotations import argparse import multiprocessing import sys from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass from itertools import product from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market from scripts.explore_ultrashort import ( CANDLE_CACHE_DIR, INITIAL_EQUITY, LEVERAGE, _compute_rsi, _format_ts, annualized_metrics_from_equity, cost_adjusted_trade_equity_frame, history_bars_for_years, load_cached_candles, max_drawdown_from_equity, ) SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 MAX_HOLD_BARS = 48 OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-twap-conservative" COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) TREND_SMAS = (50, 60, 80, 120) RSI_THRESHOLDS = (2.0, 3.0, 4.0) EXIT_RSIS = (45.0, 50.0, 55.0) STOP_LOSSES = (0.010, 0.012, 0.015, 0.018) ENTRY_OFFSET_SETS = ( (0.004, 0.008, 0.012), (0.005, 0.009, 0.013), (0.006, 0.010, 0.015), (0.005, 0.010), ) ENTRY_VALID_BARS = (2, 3, 4) FILL_BUFFER = 0.001 PRICE_SLIPPAGE = 0.0005 MAKER_MISS_RATIO = 0.25 CANDLES = None @dataclass(frozen=True) class ConservativeResult: result: SegmentResult missed_fills: int fill_attempts: int def init_worker(candles: list[Candle]) -> None: global CANDLES CANDLES = candles def offset_label(entry_offsets: tuple[float, ...]) -> str: return "-".join(f"{value:.4f}" for value in entry_offsets) def strategy_name(spec: dict[str, object]) -> str: return ( f"rsi2-long-guarded-price-twap-o{offset_label(tuple(spec['entry_offsets']))}" f"-v{spec['entry_valid_bars']}-t{spec['trend_sma']}-l{spec['rsi_threshold']}" f"-x{spec['exit_rsi']}-sl{spec['stop_loss_pct']}-mh{MAX_HOLD_BARS}" f"-fb{FILL_BUFFER:.4f}-ps{PRICE_SLIPPAGE:.4f}-mm25" ) def candidate_specs() -> list[dict[str, object]]: specs: list[dict[str, object]] = [] for trend_sma, rsi_threshold, exit_rsi, stop_loss_pct, entry_offsets, entry_valid_bars in product( TREND_SMAS, RSI_THRESHOLDS, EXIT_RSIS, STOP_LOSSES, ENTRY_OFFSET_SETS, ENTRY_VALID_BARS, ): specs.append( { "trend_sma": trend_sma, "rsi_threshold": rsi_threshold, "exit_rsi": exit_rsi, "stop_loss_pct": stop_loss_pct, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": entry_offsets, "entry_valid_bars": entry_valid_bars, "fill_buffer": FILL_BUFFER, "price_slippage": PRICE_SLIPPAGE, "maker_miss_ratio": MAKER_MISS_RATIO, } ) return specs def should_miss_fill(fill_attempt: int) -> bool: return fill_attempt % 4 == 0 def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: Candle, exit_price: float, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = mark_to_market( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), mark_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return account_equity + pnl, pnl > 0.0 def run_conservative_twap_segment(candles: list[Candle], spec: dict[str, object]) -> ConservativeResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(int(spec["trend_sma"])).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False warmup_bars = max(int(spec["trend_sma"]), 3) entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) fill_attempts = 0 missed_fills = 0 for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open * (1.0 - PRICE_SLIPPAGE), ) wins += 1 if won else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - FILL_BUFFER) and equity > 0.0: fill_attempts += 1 if should_miss_fill(fill_attempts): missed_fills += 1 continue slice_margin = equity / len(entry_offsets) actual_entry_price = limit_price * (1.0 + PRICE_SLIPPAGE) if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": actual_entry_price, "entry_index": index, "margin_used": slice_margin, "stop_price": actual_entry_price * (1.0 - float(spec["stop_loss_pct"])), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + actual_entry_price * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1.0 - float(spec["stop_loss_pct"])) entries.append({"ts": candle.ts, "price": actual_entry_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]) * (1.0 - PRICE_SLIPPAGE), ) wins += 1 if won else 0 current_equity = equity position = None pending_limits = [] if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= float(spec["exit_rsi"]) or held_bars >= MAX_HOLD_BARS: pending_exit = True pending_limits = [] continue if not pending_limits and candle.close > float(current_trend) and current_rsi <= float(spec["rsi_threshold"]): pending_limits = [ { "price": candle.close * (1.0 - offset), "expires_index": index + int(spec["entry_valid_bars"]), } for offset in entry_offsets ] trade_count = len(trades) return ConservativeResult( result=SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ), missed_fills=missed_fills, fill_attempts=fill_attempts, ) def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) start_time = cutoff horizon_frame = pd.concat( [ pd.DataFrame([{"ts": start_time, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **annualized_metrics_from_equity(horizon_frame, int(start_time.timestamp() * 1000), last_ts), } ) return rows def rolling_365_worst(frame: pd.DataFrame, last_ts: int) -> dict[str, object]: daily = frame.set_index("ts")["equity"].resample("1D").last().ffill().dropna() windows: list[dict[str, object]] = [] for end_index in range(365, len(daily)): window = daily.iloc[end_index - 365 : end_index + 1] total_return = float(window.iloc[-1] / window.iloc[0] - 1.0) max_drawdown = max_drawdown_from_equity([float(value) for value in window]) windows.append( { "worst_365_start": window.index[0].strftime("%Y-%m-%d"), "worst_365_end": window.index[-1].strftime("%Y-%m-%d"), "worst_365_total_return": total_return, "worst_365_max_drawdown": max_drawdown, } ) worst = min(windows, key=lambda row: row["worst_365_total_return"]) return {"sample_end": _format_ts(last_ts), **worst} def evaluate_spec(spec: dict[str, object]) -> tuple[list[dict[str, object]], list[dict[str, object]], list[dict[str, object]], str, int]: if CANDLES is None: raise RuntimeError("candles are not initialized") candles = CANDLES conservative = run_conservative_twap_segment(candles, spec) result = conservative.result gross_years = (candles[-1].ts - candles[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / gross_years) - 1.0 if result.total_return > -1.0 else 0.0 entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) name = strategy_name(spec) base_row = { "symbol": SYMBOL, "bar": BAR, "name": name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "actual_bars": len(candles), "trades": result.trade_count, "fill_attempts": conservative.fill_attempts, "missed_fills": conservative.missed_fills, "actual_miss_ratio": conservative.missed_fills / conservative.fill_attempts if conservative.fill_attempts else 0.0, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **spec, "entry_offsets": offset_label(entry_offsets), } total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] rolling_rows: list[dict[str, object]] = [] for cost_label, roundtrip_cost in COSTS.items(): net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost) cost_row = { **base_row, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, } total_rows.append({**cost_row, **annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts)}) for horizon_row in horizon_metrics(net_equity, candles[-1].ts): horizon_rows.append({**cost_row, **horizon_row}) rolling_rows.append({**cost_row, **rolling_365_worst(net_equity, candles[-1].ts)}) return total_rows, horizon_rows, rolling_rows, name, result.trade_count def run_search(workers: int) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: cached, _ = load_cached_candles(CANDLE_CACHE_DIR, SYMBOL, BAR) candles = cached[-history_bars_for_years(BAR, YEARS) :] specs = candidate_specs() total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] rolling_rows: list[dict[str, object]] = [] with ProcessPoolExecutor(max_workers=workers, mp_context=multiprocessing.get_context("fork"), initializer=init_worker, initargs=(candles,)) as executor: futures = [executor.submit(evaluate_spec, spec) for spec in specs] for index, future in enumerate(as_completed(futures), start=1): spec_totals, spec_horizons, spec_rolling, name, trade_count = future.result() total_rows.extend(spec_totals) horizon_rows.extend(spec_horizons) rolling_rows.extend(spec_rolling) print(f"{index}/{len(specs)} {name} trades={trade_count}", flush=True) totals = pd.DataFrame(total_rows) horizons = pd.DataFrame(horizon_rows) rolling = pd.DataFrame(rolling_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) ranked = rank_candidates(totals, horizons, rolling) totals = totals.sort_values(["cost_model", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) horizons = horizons.sort_values(["cost_model", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) rolling = rolling.sort_values(["cost_model", "worst_365_total_return"], ascending=[True, False]) return totals, horizons, rolling, ranked def rank_candidates(totals: pd.DataFrame, horizons: pd.DataFrame, rolling: pd.DataFrame) -> pd.DataFrame: key_columns = ["name"] maker_totals = totals[totals["cost_model"] == "maker_taker"].copy() horizon_pivot = horizons[horizons["cost_model"] == "maker_taker"].pivot_table( index=key_columns, columns="horizon", values=["net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar"], aggfunc="first", observed=False, ) horizon_pivot.columns = [f"{metric}_{horizon}" for metric, horizon in horizon_pivot.columns] maker_rolling = rolling[rolling["cost_model"] == "maker_taker"][ key_columns + ["worst_365_start", "worst_365_end", "worst_365_total_return", "worst_365_max_drawdown"] ] ranked = maker_totals.merge(horizon_pivot.reset_index(), on=key_columns).merge(maker_rolling, on=key_columns) ranked["all_horizons_positive"] = ranked[ ["net_total_return_3y", "net_total_return_1y", "net_total_return_6m", "net_total_return_3m"] ].min(axis=1) > 0.0 ranked["rolling_365_pass"] = ranked["worst_365_total_return"] >= -0.05 ranked["qualified"] = ranked["all_horizons_positive"] & ranked["rolling_365_pass"] ranked["max_horizon_drawdown"] = ranked[ ["net_max_drawdown_3y", "net_max_drawdown_1y", "net_max_drawdown_6m", "net_max_drawdown_3m"] ].max(axis=1) ranked["min_horizon_total_return"] = ranked[ ["net_total_return_3y", "net_total_return_1y", "net_total_return_6m", "net_total_return_3m"] ].min(axis=1) ranked = ranked.sort_values( [ "qualified", "all_horizons_positive", "rolling_365_pass", "min_horizon_total_return", "net_max_drawdown", "max_horizon_drawdown", "worst_365_max_drawdown", "worst_365_total_return", "net_annualized_return", ], ascending=[False, False, False, False, True, True, True, False, False], ) return ranked def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] return "\n".join( [ "| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |", *["| " + " | ".join(row) + " |" for row in rows], ] ) def markdown_summary(totals: pd.DataFrame, horizons: pd.DataFrame, rolling: pd.DataFrame, ranked: pd.DataFrame) -> str: qualified = ranked[ranked["qualified"]].copy() top = qualified.head(10) if len(qualified) else ranked.head(10) top_names = set(top["name"]) horizon_top = horizons[(horizons["cost_model"] == "maker_taker") & (horizons["name"].isin(top_names))].sort_values(["name", "horizon"]) rolling_top = rolling[(rolling["cost_model"] == "maker_taker") & (rolling["name"].isin(top_names))].sort_values("name") if len(qualified): best = qualified.iloc[0] decision = ( f"Found {len(qualified)} qualified maker_taker candidates. Top candidate `{best['name']}`: " f"10y annualized={float(best['net_annualized_return']):.4f}, 10y maxDD={float(best['net_max_drawdown']):.4f}, " f"worst rolling365={float(best['worst_365_total_return']):.4f}, max horizon DD={float(best['max_horizon_drawdown']):.4f}." ) else: all_horizon_count = int(ranked["all_horizons_positive"].sum()) rolling_count = int(ranked["rolling_365_pass"].sum()) best = ranked.iloc[0] decision = ( "No maker_taker candidate met both requirements: 3y/1y/6m/3m all positive and worst rolling365 >= -5%. " f"Counts: all-horizon-positive={all_horizon_count}, rolling365-pass={rolling_count}. " f"Nearest miss `{best['name']}` has min horizon total return={float(best['min_horizon_total_return']):.4f}, " f"worst rolling365={float(best['worst_365_total_return']):.4f}, 10y maxDD={float(best['net_max_drawdown']):.4f}." ) return "\n".join( [ "# ETH TWAP Conservative Variant Search", "", "Scope: continuous 10y ETH 15m backtest, sliced into 3y/1y/6m/3m from one equity curve.", "", "Fixed conservative fill scenario: fill_buffer=0.001, price_slippage=0.0005 on entries/exits, deterministic maker_miss=25% by skipping every fourth fill attempt.", "", "Grid: trend 50/60/80/120; rsi 2/3/4; exit 45/50/55; stop 0.010/0.012/0.015/0.018; offsets 0.004/0.008/0.012, 0.005/0.009/0.013, 0.006/0.010/0.015, and 2-slice 0.005/0.010; valid 2/3/4.", "", f"Decision: {decision}", "", "## Top maker_taker candidates", "", markdown_table( top, [ "qualified", "name", "trades", "fill_attempts", "missed_fills", "actual_miss_ratio", "trend_sma", "rsi_threshold", "exit_rsi", "stop_loss_pct", "entry_offsets", "entry_valid_bars", "net_annualized_return", "net_max_drawdown", "net_calmar", "min_horizon_total_return", "max_horizon_drawdown", "worst_365_start", "worst_365_end", "worst_365_total_return", "worst_365_max_drawdown", ], ), "", "## Top maker_taker horizons", "", markdown_table( horizon_top, [ "name", "horizon", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ], ), "", "## Top maker_taker rolling365", "", markdown_table( rolling_top, [ "name", "worst_365_start", "worst_365_end", "worst_365_total_return", "worst_365_max_drawdown", ], ), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--workers", type=int, default=6) args = parser.parse_args() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) totals, horizons, rolling, ranked = run_search(args.workers) totals_path = OUTPUT_DIR / f"{PREFIX}-totals.csv" horizons_path = OUTPUT_DIR / f"{PREFIX}-horizons.csv" rolling_path = OUTPUT_DIR / f"{PREFIX}-rolling365.csv" ranked_path = OUTPUT_DIR / f"{PREFIX}-ranked.csv" summary_path = OUTPUT_DIR / f"{PREFIX}-summary.md" totals.to_csv(totals_path, index=False) horizons.to_csv(horizons_path, index=False) rolling.to_csv(rolling_path, index=False) ranked.to_csv(ranked_path, index=False) summary_path.write_text(markdown_summary(totals, horizons, rolling, ranked), encoding="utf-8") print(f"wrote {totals_path}") print(f"wrote {horizons_path}") print(f"wrote {rolling_path}") print(f"wrote {ranked_path}") print(f"wrote {summary_path}") print(ranked.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())