from __future__ import annotations import argparse import multiprocessing import sys from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass from itertools import product from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market from scripts.explore_ultrashort import ( CANDLE_CACHE_DIR, INITIAL_EQUITY, LEVERAGE, _compute_rsi, _format_ts, annualized_metrics_from_equity, cost_adjusted_trade_equity_frame, history_bars_for_years, load_cached_candles, max_drawdown_from_equity, ) SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-robust-twap-fill-slippage" COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) BASE_SPEC = { "trend_sma": 60, "rsi_threshold": 3.0, "exit_rsi": 50.0, "stop_loss_pct": 0.012, "max_hold_bars": 48, "entry_offsets": (0.003, 0.006, 0.009), "entry_valid_bars": 4, } FILL_BUFFERS = (0.0, 0.0002, 0.0005, 0.001) PRICE_SLIPPAGES = (0.0, 0.0002, 0.0005) MAKER_MISS_RATIOS = (0.0, 0.10, 0.25) CANDLES = None @dataclass(frozen=True) class RobustResult: result: SegmentResult missed_fills: int fill_attempts: int def init_worker(candles: list[Candle]) -> None: global CANDLES CANDLES = candles def scenario_specs() -> list[dict[str, object]]: specs: list[dict[str, object]] = [] for fill_buffer, price_slippage, maker_miss_ratio in product(FILL_BUFFERS, PRICE_SLIPPAGES, MAKER_MISS_RATIOS): specs.append( { **BASE_SPEC, "fill_buffer": fill_buffer, "price_slippage": price_slippage, "maker_miss_ratio": maker_miss_ratio, } ) return specs def offset_label(entry_offsets: tuple[float, ...]) -> str: return "-".join(f"{value:.4f}" for value in entry_offsets) def scenario_id(spec: dict[str, object]) -> str: return ( f"fb{float(spec['fill_buffer']):.4f}" f"-ps{float(spec['price_slippage']):.4f}" f"-mm{int(round(float(spec['maker_miss_ratio']) * 100)):02d}" ) def strategy_name(spec: dict[str, object]) -> str: return ( f"rsi2-long-guarded-price-twap-o{offset_label(tuple(spec['entry_offsets']))}" f"-v{spec['entry_valid_bars']}-t{spec['trend_sma']}-l{spec['rsi_threshold']}" f"-x{spec['exit_rsi']}-sl{spec['stop_loss_pct']}-mh{spec['max_hold_bars']}" f"-{scenario_id(spec)}" ) def should_miss_fill(fill_attempt: int, maker_miss_ratio: float) -> bool: if maker_miss_ratio == 0.0: return False interval = round(1.0 / maker_miss_ratio) return fill_attempt % interval == 0 def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: Candle, exit_price: float, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = mark_to_market( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), mark_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return account_equity + pnl, pnl > 0.0 def run_robust_twap_segment(candles: list[Candle], spec: dict[str, object]) -> RobustResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(int(spec["trend_sma"])).mean().tolist() rsi_values = _compute_rsi(closes, 2) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False warmup_bars = max(int(spec["trend_sma"]), 3) entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) fill_buffer = float(spec["fill_buffer"]) price_slippage = float(spec["price_slippage"]) maker_miss_ratio = float(spec["maker_miss_ratio"]) fill_attempts = 0 missed_fills = 0 for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open * (1.0 - price_slippage), ) wins += 1 if won else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - fill_buffer) and equity > 0.0: fill_attempts += 1 if should_miss_fill(fill_attempts, maker_miss_ratio): missed_fills += 1 continue slice_margin = equity / len(entry_offsets) actual_entry_price = limit_price * (1.0 + price_slippage) if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": actual_entry_price, "entry_index": index, "margin_used": slice_margin, "stop_price": actual_entry_price * (1.0 - float(spec["stop_loss_pct"])), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + actual_entry_price * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1.0 - float(spec["stop_loss_pct"])) entries.append({"ts": candle.ts, "price": actual_entry_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_position( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]) * (1.0 - price_slippage), ) wins += 1 if won else 0 current_equity = equity position = None pending_limits = [] if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= float(spec["exit_rsi"]) or held_bars >= int(spec["max_hold_bars"]): pending_exit = True pending_limits = [] continue if not pending_limits and candle.close > float(current_trend) and current_rsi <= float(spec["rsi_threshold"]): pending_limits = [ { "price": candle.close * (1.0 - offset), "expires_index": index + int(spec["entry_valid_bars"]), } for offset in entry_offsets ] trade_count = len(trades) return RobustResult( result=SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ), missed_fills=missed_fills, fill_attempts=fill_attempts, ) def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) start_time = cutoff horizon_frame = pd.concat( [ pd.DataFrame([{"ts": start_time, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **annualized_metrics_from_equity(horizon_frame, int(start_time.timestamp() * 1000), last_ts), } ) return rows def rolling_365_worst(frame: pd.DataFrame, last_ts: int) -> dict[str, object]: daily = frame.set_index("ts")["equity"].resample("1D").last().ffill().dropna() windows: list[dict[str, object]] = [] for end_index in range(365, len(daily)): window = daily.iloc[end_index - 365 : end_index + 1] total_return = float(window.iloc[-1] / window.iloc[0] - 1.0) max_drawdown = max_drawdown_from_equity([float(value) for value in window]) windows.append( { "worst_365_start": window.index[0].strftime("%Y-%m-%d"), "worst_365_end": window.index[-1].strftime("%Y-%m-%d"), "worst_365_total_return": total_return, "worst_365_max_drawdown": max_drawdown, "worst_365_calmar": total_return / max_drawdown if max_drawdown else 0.0, } ) worst = min(windows, key=lambda row: row["worst_365_total_return"]) return {"sample_end": _format_ts(last_ts), **worst} def worst_month(frame: pd.DataFrame) -> dict[str, object]: daily = frame.set_index("ts")["equity"].resample("1D").last().ffill().dropna() monthly = daily.resample("ME").last().pct_change().dropna() worst_ts = monthly.idxmin() return { "worst_month": worst_ts.strftime("%Y-%m"), "worst_month_return": float(monthly.loc[worst_ts]), } def evaluate_spec(spec: dict[str, object]) -> tuple[list[dict[str, object]], list[dict[str, object]], list[dict[str, object]], list[dict[str, object]], str, int]: if CANDLES is None: raise RuntimeError("candles are not initialized") candles = CANDLES robust = run_robust_twap_segment(candles, spec) result = robust.result gross_years = (candles[-1].ts - candles[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / gross_years) - 1.0 if result.total_return > -1.0 else 0.0 entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) name = strategy_name(spec) total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] rolling_rows: list[dict[str, object]] = [] month_rows: list[dict[str, object]] = [] base_row = { "symbol": SYMBOL, "bar": BAR, "scenario_id": scenario_id(spec), "name": name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "actual_bars": len(candles), "trades": result.trade_count, "fill_attempts": robust.fill_attempts, "missed_fills": robust.missed_fills, "actual_miss_ratio": robust.missed_fills / robust.fill_attempts if robust.fill_attempts else 0.0, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **spec, "entry_offsets": offset_label(entry_offsets), } for cost_label, roundtrip_cost in COSTS.items(): net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost) cost_row = { **base_row, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, } total_rows.append({**cost_row, **annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts)}) for horizon_row in horizon_metrics(net_equity, candles[-1].ts): horizon_rows.append({**cost_row, **horizon_row}) rolling_rows.append({**cost_row, **rolling_365_worst(net_equity, candles[-1].ts)}) month_rows.append({**cost_row, **worst_month(net_equity)}) return total_rows, horizon_rows, rolling_rows, month_rows, name, result.trade_count def run_search(workers: int) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: cached, _ = load_cached_candles(CANDLE_CACHE_DIR, SYMBOL, BAR) candles = cached[-history_bars_for_years(BAR, YEARS) :] specs = scenario_specs() total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] rolling_rows: list[dict[str, object]] = [] month_rows: list[dict[str, object]] = [] with ProcessPoolExecutor(max_workers=workers, mp_context=multiprocessing.get_context("fork"), initializer=init_worker, initargs=(candles,)) as executor: futures = [executor.submit(evaluate_spec, spec) for spec in specs] for index, future in enumerate(as_completed(futures), start=1): spec_totals, spec_horizons, spec_rolling, spec_months, name, trade_count = future.result() total_rows.extend(spec_totals) horizon_rows.extend(spec_horizons) rolling_rows.extend(spec_rolling) month_rows.extend(spec_months) print(f"{index}/{len(specs)} {name} trades={trade_count}", flush=True) totals = pd.DataFrame(total_rows) horizons = pd.DataFrame(horizon_rows) rolling = pd.DataFrame(rolling_rows) months = pd.DataFrame(month_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) ranked = rank_scenarios(totals, horizons, rolling, months) totals = totals.sort_values(["cost_model", "fill_buffer", "price_slippage", "maker_miss_ratio"], ascending=True) horizons = horizons.sort_values(["cost_model", "horizon", "fill_buffer", "price_slippage", "maker_miss_ratio"], ascending=True) rolling = rolling.sort_values(["cost_model", "fill_buffer", "price_slippage", "maker_miss_ratio"], ascending=True) months = months.sort_values(["cost_model", "fill_buffer", "price_slippage", "maker_miss_ratio"], ascending=True) return totals, horizons, rolling, months, ranked def rank_scenarios(totals: pd.DataFrame, horizons: pd.DataFrame, rolling: pd.DataFrame, months: pd.DataFrame) -> pd.DataFrame: key_columns = ["scenario_id", "fill_buffer", "price_slippage", "maker_miss_ratio"] maker_totals = totals[totals["cost_model"] == "maker_taker"].copy() horizon_pivot = horizons[horizons["cost_model"] == "maker_taker"].pivot_table( index=key_columns, columns="horizon", values=["net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar"], aggfunc="first", observed=False, ) horizon_pivot.columns = [f"{metric}_{horizon}" for metric, horizon in horizon_pivot.columns] maker_rolling = rolling[rolling["cost_model"] == "maker_taker"][ key_columns + ["worst_365_start", "worst_365_end", "worst_365_total_return", "worst_365_max_drawdown"] ] maker_months = months[months["cost_model"] == "maker_taker"][key_columns + ["worst_month", "worst_month_return"]] ranked = maker_totals.merge(horizon_pivot.reset_index(), on=key_columns).merge(maker_rolling, on=key_columns).merge(maker_months, on=key_columns) ranked["min_recent_calmar"] = ranked[["net_calmar_1y", "net_calmar_6m", "net_calmar_3m"]].min(axis=1) ranked["min_recent_total_return"] = ranked[["net_total_return_1y", "net_total_return_6m", "net_total_return_3m"]].min(axis=1) ranked = ranked.sort_values( ["net_calmar_3y", "min_recent_calmar", "worst_365_total_return", "net_annualized_return"], ascending=False, ) return ranked def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] return "\n".join( [ "| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |", *["| " + " | ".join(row) + " |" for row in rows], ] ) def markdown_summary(totals: pd.DataFrame, horizons: pd.DataFrame, rolling: pd.DataFrame, months: pd.DataFrame, ranked: pd.DataFrame) -> str: maker_totals = totals[totals["cost_model"] == "maker_taker"].copy() best = ranked.iloc[0] worst = ranked.iloc[-1] harsh = ranked[ (ranked["fill_buffer"] == max(FILL_BUFFERS)) & (ranked["price_slippage"] == max(PRICE_SLIPPAGES)) & (ranked["maker_miss_ratio"] == max(MAKER_MISS_RATIOS)) ].iloc[0] viable = ( float(harsh["net_annualized_return_3y"]) > 0.0 and float(harsh["min_recent_total_return"]) > 0.0 and float(harsh["worst_365_total_return"]) > 0.0 ) decision = "仍可考虑实盘小仓" if viable else "暂不值得实盘小仓" return "\n".join( [ "# ETH Robust Price TWAP fill/slippage stress", "", "Baseline: ETH 15m RSI2 long price-TWAP, trend=60, rsi=3, exit=50, stop=0.012, max_hold=48, offsets=(0.003,0.006,0.009), valid=4.", "", "Stress dimensions: fill_buffer 0/0.0002/0.0005/0.001; additional adverse price_slippage 0/0.0002/0.0005 on entries and exits; deterministic maker miss ratio 0/10%/25%. A missed maker fill removes that fill opportunity.", "", f"Decision: {decision}. Harshest maker_taker scenario {harsh['scenario_id']} has 3y annualized={float(harsh['net_annualized_return_3y']):.4f}, 1y/6m/3m min total return={float(harsh['min_recent_total_return']):.4f}, worst rolling 365D return={float(harsh['worst_365_total_return']):.4f}, worst month={harsh['worst_month']} {float(harsh['worst_month_return']):.4f}.", "", f"Best maker_taker scenario: {best['scenario_id']} 3y Calmar={float(best['net_calmar_3y']):.4f}, worst rolling 365D return={float(best['worst_365_total_return']):.4f}. Worst ranked maker_taker scenario: {worst['scenario_id']} 3y Calmar={float(worst['net_calmar_3y']):.4f}, worst rolling 365D return={float(worst['worst_365_total_return']):.4f}.", "", "## Maker_taker ranked scenarios", "", markdown_table( ranked, [ "scenario_id", "trades", "fill_attempts", "missed_fills", "actual_miss_ratio", "net_annualized_return_3y", "net_calmar_3y", "net_total_return_1y", "net_total_return_6m", "net_total_return_3m", "worst_365_start", "worst_365_end", "worst_365_total_return", "worst_month", "worst_month_return", ], ), "", "## Three cost total metrics", "", markdown_table( totals, [ "cost_model", "scenario_id", "trades", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ], ), "", "## Maker_taker horizons", "", markdown_table( horizons[horizons["cost_model"] == "maker_taker"], [ "scenario_id", "horizon", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ], ), "", "## Maker_taker rolling 365D and worst month", "", markdown_table( rolling[rolling["cost_model"] == "maker_taker"].merge( months[months["cost_model"] == "maker_taker"][ ["scenario_id", "worst_month", "worst_month_return"] ], on="scenario_id", ), [ "scenario_id", "worst_365_start", "worst_365_end", "worst_365_total_return", "worst_365_max_drawdown", "worst_month", "worst_month_return", ], ), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--workers", type=int, default=6) args = parser.parse_args() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) totals, horizons, rolling, months, ranked = run_search(args.workers) totals_path = OUTPUT_DIR / f"{PREFIX}-totals.csv" horizons_path = OUTPUT_DIR / f"{PREFIX}-horizons.csv" rolling_path = OUTPUT_DIR / f"{PREFIX}-rolling365.csv" months_path = OUTPUT_DIR / f"{PREFIX}-worst-month.csv" ranked_path = OUTPUT_DIR / f"{PREFIX}-ranked.csv" summary_path = OUTPUT_DIR / f"{PREFIX}-summary.md" totals.to_csv(totals_path, index=False) horizons.to_csv(horizons_path, index=False) rolling.to_csv(rolling_path, index=False) months.to_csv(months_path, index=False) ranked.to_csv(ranked_path, index=False) summary_path.write_text(markdown_summary(totals, horizons, rolling, months, ranked), encoding="utf-8") print(f"wrote {totals_path}") print(f"wrote {horizons_path}") print(f"wrote {rolling_path}") print(f"wrote {months_path}") print(f"wrote {ranked_path}") print(f"wrote {summary_path}") print(ranked.to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())