from __future__ import annotations import argparse import multiprocessing import sys from concurrent.futures import ProcessPoolExecutor, as_completed from itertools import product from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market from scripts.explore_ultrashort import ( CANDLE_BAR_MS, CANDLE_CACHE_DIR, INITIAL_EQUITY, LEVERAGE, _compute_rsi, _format_ts, annualized_metrics_from_equity, cost_adjusted_trade_equity_frame, history_bars_for_years, load_cached_candles, ) SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 MAX_HOLD_BARS = 48 ENTRY_SLIPPAGE = 0.0005 OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-twap-taker-entry" COSTS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) CANDLES = None def init_worker(candles: list[Candle]) -> None: global CANDLES CANDLES = candles def candidate_specs() -> list[dict[str, object]]: specs: list[dict[str, object]] = [] for trend_sma, rsi_threshold, exit_rsi, stop_loss_pct, entry_steps in product( (50, 60, 80), (2.0, 3.0, 4.0, 5.0), (45.0, 50.0, 55.0), (0.008, 0.010, 0.012, 0.015), (1, 2, 3), ): base = { "trend_sma": trend_sma, "rsi_threshold": rsi_threshold, "exit_rsi": exit_rsi, "stop_loss_pct": stop_loss_pct, "max_hold_bars": MAX_HOLD_BARS, "entry_steps": entry_steps, "entry_slippage": ENTRY_SLIPPAGE, } specs.append({**base, "entry_mode": "time_twap"}) specs.append({**base, "entry_mode": "price_trigger"}) return specs def strategy_name(spec: dict[str, object]) -> str: return ( f"rsi2-long-guarded-taker-{spec['entry_mode']}{spec['entry_steps']}" f"-slip{float(spec['entry_slippage']):.4f}" f"-t{spec['trend_sma']}-l{spec['rsi_threshold']}-x{spec['exit_rsi']}" f"-sl{spec['stop_loss_pct']}-mh{spec['max_hold_bars']}" ) def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: Candle, exit_price: float, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = mark_to_market( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), mark_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return account_equity + pnl, pnl > 0.0 def add_entry( *, entries: list[dict[str, object]], position: dict[str, object] | None, equity: float, candle: Candle, entry_price: float, entry_steps: int, stop_loss_pct: float, ) -> dict[str, object]: slice_margin = equity / entry_steps if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": entry_price, "entry_index": int(candle.ts), "margin_used": slice_margin, "stop_price": entry_price * (1.0 - stop_loss_pct), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin weighted_entry = (float(position["entry_price"]) * old_margin + entry_price * slice_margin) / new_margin position["entry_price"] = weighted_entry position["margin_used"] = new_margin position["stop_price"] = weighted_entry * (1.0 - stop_loss_pct) entries.append({"ts": candle.ts, "price": entry_price, "side": "long"}) return position def run_taker_entry_segment(candles: list[Candle], spec: dict[str, object]) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(int(spec["trend_sma"])).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_exit = False pending_time_slices = 0 pending_trigger: dict[str, float | int] | None = None warmup_bars = max(int(spec["trend_sma"]), 3) entry_steps = int(spec["entry_steps"]) entry_slippage = float(spec["entry_slippage"]) stop_loss_pct = float(spec["stop_loss_pct"]) for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open, ) wins += 1 if won else 0 position = None pending_exit = False pending_time_slices = 0 pending_trigger = None if pending_time_slices and equity > 0.0: entry_price = candle.open * (1.0 + entry_slippage) position = add_entry( entries=entries, position=position, equity=equity, candle=candle, entry_price=entry_price, entry_steps=entry_steps, stop_loss_pct=stop_loss_pct, ) position["entry_index"] = index pending_time_slices -= 1 if pending_trigger is not None and equity > 0.0: if index <= int(pending_trigger["expires_index"]) and candle.low <= float(pending_trigger["price"]): entry_price = float(pending_trigger["price"]) * (1.0 + entry_slippage) position = add_entry( entries=entries, position=position, equity=equity, candle=candle, entry_price=entry_price, entry_steps=1, stop_loss_pct=stop_loss_pct, ) position["entry_index"] = index pending_trigger = None elif index > int(pending_trigger["expires_index"]): pending_trigger = None current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]), ) wins += 1 if won else 0 current_equity = equity position = None pending_time_slices = 0 pending_trigger = None if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= float(spec["exit_rsi"]) or held_bars >= int(spec["max_hold_bars"]): pending_exit = True pending_time_slices = 0 pending_trigger = None continue if pending_time_slices == 0 and pending_trigger is None and candle.close > float(current_trend) and current_rsi <= float(spec["rsi_threshold"]): if spec["entry_mode"] == "time_twap": pending_time_slices = entry_steps else: pending_trigger = {"price": candle.close, "expires_index": index + entry_steps} trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) start_time = cutoff horizon_frame = pd.concat( [ pd.DataFrame([{"ts": start_time, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **annualized_metrics_from_equity(horizon_frame, int(start_time.timestamp() * 1000), last_ts), } ) return rows def evaluate_spec(spec: dict[str, object]) -> tuple[list[dict[str, object]], list[dict[str, object]], str, int]: if CANDLES is None: raise RuntimeError("candles are not initialized") candles = CANDLES result = run_taker_entry_segment(candles, spec) gross_years = (candles[-1].ts - candles[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / gross_years) - 1.0 if result.total_return > -1.0 else 0.0 name = strategy_name(spec) total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] for cost_label, roundtrip_cost in COSTS.items(): net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost) total_metrics = annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts) base_row = { "symbol": SYMBOL, "bar": BAR, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, "name": name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "actual_bars": len(candles), "actual_years": gross_years, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **spec, } total_rows.append({**base_row, **total_metrics}) for horizon_row in horizon_metrics(net_equity, candles[-1].ts): horizon_rows.append({**base_row, **horizon_row}) return total_rows, horizon_rows, name, result.trade_count def load_10y_cached_candles() -> list[Candle]: candles, _ = load_cached_candles(CANDLE_CACHE_DIR, SYMBOL, BAR) if not candles: raise RuntimeError(f"missing cached candles for {SYMBOL} {BAR}") candles = sorted(candles, key=lambda candle: candle.ts) candles = candles[-history_bars_for_years(BAR, YEARS) :] interval = CANDLE_BAR_MS[BAR] gaps = [(left.ts, right.ts) for left, right in zip(candles, candles[1:]) if right.ts - left.ts != interval] if gaps: first_gap = gaps[0] raise RuntimeError(f"non-continuous candle cache: {_format_ts(first_gap[0])} -> {_format_ts(first_gap[1])}") return candles def run_search(max_candidates: int | None, workers: int) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: candles = load_10y_cached_candles() specs = candidate_specs() if max_candidates is not None: specs = specs[:max_candidates] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] with ProcessPoolExecutor(max_workers=workers, mp_context=multiprocessing.get_context("fork"), initializer=init_worker, initargs=(candles,)) as executor: futures = [executor.submit(evaluate_spec, spec) for spec in specs] for index, future in enumerate(as_completed(futures), start=1): spec_totals, spec_horizons, name, trade_count = future.result() total_rows.extend(spec_totals) horizon_rows.extend(spec_horizons) print(f"{index}/{len(specs)} {name} trades={trade_count}", flush=True) totals = pd.DataFrame(total_rows) horizons = pd.DataFrame(horizon_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) primary_horizons = horizons[horizons["cost_model"] == "taker_taker"].pivot_table( index="name", columns="horizon", values="net_calmar", aggfunc="first", observed=False, ) eligible_names = primary_horizons[ (primary_horizons["3y"] > 0.0) & (primary_horizons["1y"] > 0.0) & (primary_horizons["6m"] > 0.0) & (primary_horizons["3m"] > 0.0) ].index ranked = horizons[(horizons["cost_model"] == "taker_taker") & (horizons["horizon"] == "3y")].copy() ranked["eligible_all_horizons_positive"] = ranked["name"].isin(eligible_names) ranked = ranked[ranked["eligible_all_horizons_positive"]].sort_values( ["net_calmar", "net_annualized_return"], ascending=False, ) totals = totals.sort_values(["cost_model", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) horizons = horizons.sort_values(["cost_model", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) return totals, horizons, ranked def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: if frame.empty: return "_empty_" rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] return "\n".join( [ "| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |", *["| " + " | ".join(row) + " |" for row in rows], ] ) def markdown_summary(totals: pd.DataFrame, horizons: pd.DataFrame, ranked: pd.DataFrame) -> str: top = ranked.head(15) top_names = set(top["name"]) horizon_top = horizons[(horizons["cost_model"].isin(("taker_taker", "maker_taker"))) & (horizons["name"].isin(top_names))].sort_values(["name", "cost_model", "horizon"]) total_top = totals[(totals["cost_model"].isin(("taker_taker", "maker_taker"))) & (totals["name"].isin(top_names))].sort_values(["name", "cost_model"]) best_noneligible = horizons[(horizons["cost_model"] == "taker_taker") & (horizons["horizon"] == "3y")].head(10) best_maker_taker = horizons[(horizons["cost_model"] == "maker_taker") & (horizons["horizon"] == "3y")].head(10) columns = [ "name", "trades", "entry_mode", "entry_steps", "trend_sma", "rsi_threshold", "exit_rsi", "stop_loss_pct", "entry_slippage", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ] verdict = ( "Found taker_taker candidates with positive 3y/1y/6m/3m net Calmar." if len(top) else "No ETH taker-entry TWAP candidate in this grid had positive taker_taker net Calmar across 3y/1y/6m/3m." ) actual = totals.iloc[0] return "\n".join( [ "# ETH TWAP taker entry search", "", f"Verdict: {verdict}", "", f"Data: cached continuous `{SYMBOL}` `{BAR}` candles from {actual.first_candle} to {actual.last_candle}; actual span {float(actual.actual_years):.2f} years, requested horizon cap {YEARS:.1f} years.", "", "Execution: RSI2 long with trend guard. Time TWAP enters over the next 1/2/3 bar opens plus 0.05% entry slippage. Price trigger arms at the signal close for 1/2/3 bars and fills as taker at trigger price plus 0.05% entry slippage. Exits use next open for signal/time exit and stop price for stop exit. Costs are applied after gross trades; primary sort is taker_taker=0.003 roundtrip cost on margin, with maker_taker=0.0021 retained only for comparison.", "", "Eligibility: taker_taker 3y/1y/6m/3m net Calmar all strictly positive. Ranking: taker_taker 3y net Calmar, then 3y net annualized return.", "", "## Eligible taker_taker top 15", "", markdown_table(top, columns), "", "## Total metrics for eligible names", "", markdown_table(total_top, ["cost_model", *columns]), "", "## Horizon metrics for eligible names", "", markdown_table( horizon_top, [ "cost_model", "name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ], ), "", "## Best taker_taker 3y rows before all-horizon filter", "", markdown_table(best_noneligible, columns), "", "## Best maker_taker 3y rows for comparison", "", markdown_table(best_maker_taker, columns), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--max-candidates", type=int, default=None) parser.add_argument("--workers", type=int, default=8) args = parser.parse_args() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) totals, horizons, ranked = run_search(args.max_candidates, args.workers) all_path = OUTPUT_DIR / f"{PREFIX}-all.csv" horizon_path = OUTPUT_DIR / f"{PREFIX}-horizons.csv" top_path = OUTPUT_DIR / f"{PREFIX}-top15.csv" summary_path = OUTPUT_DIR / f"{PREFIX}-summary.md" totals.to_csv(all_path, index=False) horizons.to_csv(horizon_path, index=False) ranked.head(15).to_csv(top_path, index=False) summary_path.write_text(markdown_summary(totals, horizons, ranked), encoding="utf-8") print(f"wrote {all_path}") print(f"wrote {horizon_path}") print(f"wrote {top_path}") print(f"wrote {summary_path}") print(ranked.head(15).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())