from __future__ import annotations import argparse import multiprocessing import sys from concurrent.futures import ProcessPoolExecutor, as_completed from itertools import product from pathlib import Path from typing import Iterable import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.okx_client import OkxClient from scripts.explore_ultrashort import ( LEVERAGE, _format_ts, annualized_metrics_from_equity, build_rsi2_long_guarded_price_twap_candidate, cost_adjusted_trade_equity_frame, get_candles_cached, history_bars_for_years, ) SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 MAX_HOLD_BARS = 48 OUTPUT_DIR = Path("reports/eth-exploration") COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) ENTRY_OFFSET_SETS = ( (0.0015, 0.004, 0.007), (0.002, 0.005, 0.008), (0.003, 0.006, 0.009), (0.004, 0.007, 0.010), ) CANDLES = None def init_worker(candles: list[object]) -> None: global CANDLES CANDLES = candles def candidate_specs() -> Iterable[dict[str, object]]: for trend_sma, rsi_threshold, exit_rsi, stop_loss_pct, entry_offsets, entry_valid_bars, fill_buffer in product( (40, 50, 60, 80), (2.0, 3.0, 4.0, 5.0), (45.0, 50.0, 55.0), (0.008, 0.010, 0.012, 0.015), ENTRY_OFFSET_SETS, (2, 3, 4), (0.0, 0.0002), ): yield { "trend_sma": trend_sma, "rsi_threshold": rsi_threshold, "exit_rsi": exit_rsi, "stop_loss_pct": stop_loss_pct, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": entry_offsets, "entry_valid_bars": entry_valid_bars, "fill_buffer": fill_buffer, } def offset_label(entry_offsets: tuple[float, ...]) -> str: return "-".join(f"{value:.4f}" for value in entry_offsets) def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] return "\n".join( [ "| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |", *["| " + " | ".join(row) + " |" for row in rows], ] ) def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) start_time = cutoff horizon_frame = pd.concat( [ pd.DataFrame([{"ts": start_time, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **annualized_metrics_from_equity( horizon_frame, int(start_time.timestamp() * 1000), last_ts, ), } ) return rows def evaluate_spec(spec: dict[str, object]) -> tuple[list[dict[str, object]], list[dict[str, object]], str, int]: if CANDLES is None: raise RuntimeError("candles are not initialized") candles = CANDLES entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) candidate = build_rsi2_long_guarded_price_twap_candidate( int(spec["trend_sma"]), float(spec["rsi_threshold"]), float(spec["exit_rsi"]), float(spec["stop_loss_pct"]), int(spec["max_hold_bars"]), entry_offsets, int(spec["entry_valid_bars"]), float(spec["fill_buffer"]), ) result = candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) gross_years = (candles[-1].ts - candles[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / gross_years) - 1.0 if result.total_return > -1.0 else 0.0 total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] for cost_label, roundtrip_cost in COSTS.items(): net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost) total_metrics = annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts) base_row = { "symbol": SYMBOL, "bar": BAR, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, "name": candidate.name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "actual_bars": len(candles), "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **spec, "entry_offsets": offset_label(entry_offsets), } total_rows.append({**base_row, **total_metrics}) for horizon_row in horizon_metrics(net_equity, candles[-1].ts): horizon_rows.append({**base_row, **horizon_row}) return total_rows, horizon_rows, candidate.name, result.trade_count def run_search(max_candidates: int | None, workers: int) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: candles = get_candles_cached(OkxClient(), SYMBOL, BAR, history_bars_for_years(BAR, YEARS)) specs = list(candidate_specs()) if max_candidates is not None: specs = specs[:max_candidates] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] with ProcessPoolExecutor(max_workers=workers, mp_context=multiprocessing.get_context("fork"), initializer=init_worker, initargs=(candles,)) as executor: futures = [executor.submit(evaluate_spec, spec) for spec in specs] for index, future in enumerate(as_completed(futures), start=1): spec_totals, spec_horizons, name, trade_count = future.result() total_rows.extend(spec_totals) horizon_rows.extend(spec_horizons) print(f"{index}/{len(specs)} {name} trades={trade_count}", flush=True) totals = pd.DataFrame(total_rows) horizons = pd.DataFrame(horizon_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) maker_taker_horizons = horizons[horizons["cost_model"] == "maker_taker"].pivot_table( index="name", columns="horizon", values="net_calmar", aggfunc="first", observed=False, ) eligible_names = maker_taker_horizons[ (maker_taker_horizons["1y"] >= 0.0) & (maker_taker_horizons["6m"] >= 0.0) & (maker_taker_horizons["3m"] >= 0.0) ].index ranked = horizons[(horizons["cost_model"] == "maker_taker") & (horizons["horizon"] == "3y")].copy() ranked["eligible_recent_nonnegative"] = ranked["name"].isin(eligible_names) ranked = ranked[ranked["eligible_recent_nonnegative"]].sort_values( ["net_calmar", "net_annualized_return"], ascending=False, ) totals = totals.sort_values(["cost_model", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) horizons = horizons.sort_values(["cost_model", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) return totals, horizons, ranked def markdown_summary(totals: pd.DataFrame, horizons: pd.DataFrame, ranked: pd.DataFrame) -> str: top15 = ranked.head(15) top_names = set(top15["name"]) horizon_top = horizons[(horizons["cost_model"] == "maker_taker") & (horizons["name"].isin(top_names))].sort_values(["name", "horizon"]) total_top = totals[(totals["cost_model"] == "maker_taker") & (totals["name"].isin(top_names))].sort_values("name") columns = [ "name", "trades", "trend_sma", "rsi_threshold", "exit_rsi", "stop_loss_pct", "max_hold_bars", "entry_offsets", "entry_valid_bars", "fill_buffer", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ] candidate_lines = [ f"- `{row.name}`: 3y Calmar {float(row.net_calmar):.4f}, 3y annualized {float(row.net_annualized_return):.4f}, trades {int(row.trades)}" for row in top15.head(5).itertuples(index=False) ] return "\n".join( [ "# ETH TWAP 10y refine", "", "Evaluation: run 10 years through `scripts/explore_ultrashort.py` price-TWAP runner, then derive cost scenarios and 3y/1y/6m/3m horizons from the same cost-adjusted equity curve.", "", "Primary sort: maker_taker 3y net_calmar, then 3y net_annualized_return. Eligibility: maker_taker 1y/6m/3m net_calmar are all nonnegative.", "", "## Top 15 maker_taker 3y eligible candidates", "", markdown_table(top15, columns), "", "## 10y total metrics for top 15", "", markdown_table(total_top, columns), "", "## Recent horizons for top 15", "", markdown_table( horizon_top, [ "name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ], ), "", "## Suggested ETH candidates for main report", "", *(candidate_lines if candidate_lines else ["No maker_taker candidate passed the nonnegative 1y/6m/3m Calmar filter."]), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--max-candidates", type=int, default=None) parser.add_argument("--workers", type=int, default=8) args = parser.parse_args() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) totals, horizons, ranked = run_search(args.max_candidates, args.workers) all_path = OUTPUT_DIR / "eth-twap-10y-refine-all.csv" horizon_path = OUTPUT_DIR / "eth-twap-10y-refine-horizons.csv" top_path = OUTPUT_DIR / "eth-twap-10y-refine-top15.csv" summary_path = OUTPUT_DIR / "eth-twap-10y-refine-summary.md" totals.to_csv(all_path, index=False) horizons.to_csv(horizon_path, index=False) ranked.head(15).to_csv(top_path, index=False) summary_path.write_text(markdown_summary(totals, horizons, ranked), encoding="utf-8") print(f"wrote {all_path}") print(f"wrote {horizon_path}") print(f"wrote {top_path}") print(f"wrote {summary_path}") print(ranked.head(15).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())