from __future__ import annotations import argparse import multiprocessing import sys from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from scripts.explore_ultrashort import ( CANDLE_CACHE_DIR, LEVERAGE, _format_ts, annualized_metrics_from_equity, build_rsi2_long_guarded_price_twap_candidate, cost_adjusted_trade_equity_frame, history_bars_for_years, load_cached_candles, max_drawdown_from_equity, ) SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 MAX_HOLD_BARS = 48 OUTPUT_DIR = Path("reports/eth-exploration") COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } FILL_BUFFERS = (0.0, 0.0002, 0.0005) HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) CANDLES = None def base_candidate_specs() -> list[dict[str, object]]: return [ { "candidate_id": "x45-v3", "trend_sma": 60, "rsi_threshold": 5.0, "exit_rsi": 45.0, "stop_loss_pct": 0.008, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": (0.0015, 0.004, 0.007), "entry_valid_bars": 3, }, { "candidate_id": "x45-v4", "trend_sma": 60, "rsi_threshold": 5.0, "exit_rsi": 45.0, "stop_loss_pct": 0.008, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": (0.0015, 0.004, 0.007), "entry_valid_bars": 4, }, { "candidate_id": "x55-v2", "trend_sma": 60, "rsi_threshold": 5.0, "exit_rsi": 55.0, "stop_loss_pct": 0.008, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": (0.0015, 0.004, 0.007), "entry_valid_bars": 2, }, { "candidate_id": "x55-v3", "trend_sma": 60, "rsi_threshold": 5.0, "exit_rsi": 55.0, "stop_loss_pct": 0.008, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": (0.0015, 0.004, 0.007), "entry_valid_bars": 3, }, { "candidate_id": "r3x50", "trend_sma": 60, "rsi_threshold": 3.0, "exit_rsi": 50.0, "stop_loss_pct": 0.012, "max_hold_bars": MAX_HOLD_BARS, "entry_offsets": (0.003, 0.006, 0.009), "entry_valid_bars": 4, }, ] def candidate_specs() -> list[dict[str, object]]: specs: list[dict[str, object]] = [] for base in base_candidate_specs(): for fill_buffer in FILL_BUFFERS: specs.append({**base, "fill_buffer": fill_buffer}) return specs def init_worker(candles: list[object]) -> None: global CANDLES CANDLES = candles def offset_label(entry_offsets: tuple[float, ...]) -> str: return "-".join(f"{value:.4f}" for value in entry_offsets) def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] return "\n".join( [ "| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |", *["| " + " | ".join(row) + " |" for row in rows], ] ) def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = end_time - offset before_cutoff = frame[frame["ts"] <= cutoff] if len(before_cutoff): start_equity = float(before_cutoff["equity"].iloc[-1]) start_time = cutoff horizon_frame = pd.concat( [ pd.DataFrame([{"ts": start_time, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **annualized_metrics_from_equity( horizon_frame, int(start_time.timestamp() * 1000), last_ts, ), } ) return rows def rolling_window_stats(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: daily = frame.set_index("ts")["equity"].resample("1D").last().ffill().dropna() rows: list[dict[str, object]] = [] for label, days in (("rolling_1y", 365), ("rolling_30d", 30)): windows: list[dict[str, object]] = [] for end_index in range(days, len(daily)): window = daily.iloc[end_index - days : end_index + 1] total_return = float(window.iloc[-1] / window.iloc[0] - 1.0) max_drawdown = max_drawdown_from_equity([float(value) for value in window]) years = days / 365.0 annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else 0.0 windows.append( { "window_start": window.index[0].strftime("%Y-%m-%d"), "window_end": window.index[-1].strftime("%Y-%m-%d"), "rolling_total_return": total_return, "rolling_annualized_return": annualized_return, "rolling_max_drawdown": max_drawdown, "rolling_calmar": annualized_return / max_drawdown if max_drawdown else 0.0, } ) worst_return = min(windows, key=lambda row: row["rolling_total_return"]) worst_drawdown = max(windows, key=lambda row: row["rolling_max_drawdown"]) rows.append( { "window": label, "window_days": days, "sample_end": _format_ts(last_ts), "worst_return_start": worst_return["window_start"], "worst_return_end": worst_return["window_end"], "worst_rolling_total_return": worst_return["rolling_total_return"], "worst_rolling_annualized_return": worst_return["rolling_annualized_return"], "worst_return_window_max_drawdown": worst_return["rolling_max_drawdown"], "worst_drawdown_start": worst_drawdown["window_start"], "worst_drawdown_end": worst_drawdown["window_end"], "worst_rolling_max_drawdown": worst_drawdown["rolling_max_drawdown"], "worst_drawdown_window_total_return": worst_drawdown["rolling_total_return"], } ) return rows def evaluate_spec(spec: dict[str, object]) -> tuple[list[dict[str, object]], list[dict[str, object]], list[dict[str, object]], str, int]: if CANDLES is None: raise RuntimeError("candles are not initialized") candles = CANDLES entry_offsets = tuple(float(value) for value in spec["entry_offsets"]) candidate = build_rsi2_long_guarded_price_twap_candidate( int(spec["trend_sma"]), float(spec["rsi_threshold"]), float(spec["exit_rsi"]), float(spec["stop_loss_pct"]), int(spec["max_hold_bars"]), entry_offsets, int(spec["entry_valid_bars"]), float(spec["fill_buffer"]), ) result = candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) gross_years = (candles[-1].ts - candles[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / gross_years) - 1.0 if result.total_return > -1.0 else 0.0 total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] rolling_rows: list[dict[str, object]] = [] for cost_label, roundtrip_cost in COSTS.items(): net_equity = cost_adjusted_trade_equity_frame(result, roundtrip_cost) base_row = { "symbol": SYMBOL, "bar": BAR, "cost_model": cost_label, "roundtrip_cost_on_margin": roundtrip_cost, "candidate_id": spec["candidate_id"], "name": candidate.name, "first_candle": _format_ts(candles[0].ts), "last_candle": _format_ts(candles[-1].ts), "actual_bars": len(candles), "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, "trend_sma": spec["trend_sma"], "rsi_threshold": spec["rsi_threshold"], "exit_rsi": spec["exit_rsi"], "stop_loss_pct": spec["stop_loss_pct"], "max_hold_bars": spec["max_hold_bars"], "entry_offsets": offset_label(entry_offsets), "entry_valid_bars": spec["entry_valid_bars"], "fill_buffer": spec["fill_buffer"], } total_rows.append({**base_row, **annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts)}) for horizon_row in horizon_metrics(net_equity, candles[-1].ts): horizon_rows.append({**base_row, **horizon_row}) for rolling_row in rolling_window_stats(net_equity, candles[-1].ts): rolling_rows.append({**base_row, **rolling_row}) return total_rows, horizon_rows, rolling_rows, candidate.name, result.trade_count def run_search(workers: int) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: cached, _ = load_cached_candles(CANDLE_CACHE_DIR, SYMBOL, BAR) candles = cached[-history_bars_for_years(BAR, YEARS) :] specs = candidate_specs() total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] rolling_rows: list[dict[str, object]] = [] with ProcessPoolExecutor(max_workers=workers, mp_context=multiprocessing.get_context("fork"), initializer=init_worker, initargs=(candles,)) as executor: futures = [executor.submit(evaluate_spec, spec) for spec in specs] for index, future in enumerate(as_completed(futures), start=1): spec_totals, spec_horizons, spec_rolling, name, trade_count = future.result() total_rows.extend(spec_totals) horizon_rows.extend(spec_horizons) rolling_rows.extend(spec_rolling) print(f"{index}/{len(specs)} {name} trades={trade_count}", flush=True) totals = pd.DataFrame(total_rows) horizons = pd.DataFrame(horizon_rows) rolling = pd.DataFrame(rolling_rows) ranked = rank_candidates(totals, horizons, rolling) totals = totals.sort_values(["cost_model", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) horizons = horizons.sort_values(["cost_model", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) rolling = rolling.sort_values(["cost_model", "window", "worst_rolling_total_return"], ascending=[True, True, False]) return totals, horizons, rolling, ranked def rank_candidates(totals: pd.DataFrame, horizons: pd.DataFrame, rolling: pd.DataFrame) -> pd.DataFrame: key_columns = ["candidate_id", "name", "fill_buffer"] maker_taker_totals = totals[totals["cost_model"] == "maker_taker"].copy() horizon_pivot = horizons[horizons["cost_model"] == "maker_taker"].pivot_table( index=key_columns, columns="horizon", values=["net_annualized_return", "net_max_drawdown", "net_calmar"], aggfunc="first", observed=False, ) horizon_pivot.columns = [f"{metric}_{horizon}" for metric, horizon in horizon_pivot.columns] rolling_pivot = rolling[rolling["cost_model"] == "maker_taker"].pivot_table( index=key_columns, columns="window", values=["worst_rolling_total_return", "worst_rolling_max_drawdown"], aggfunc="first", observed=False, ) rolling_pivot.columns = [f"{metric}_{window}" for metric, window in rolling_pivot.columns] ranked = maker_taker_totals.merge(horizon_pivot.reset_index(), on=key_columns).merge(rolling_pivot.reset_index(), on=key_columns) ranked["min_recent_calmar"] = ranked[["net_calmar_1y", "net_calmar_6m", "net_calmar_3m"]].min(axis=1) ranked["min_recent_annualized"] = ranked[["net_annualized_return_1y", "net_annualized_return_6m", "net_annualized_return_3m"]].min(axis=1) ranked["max_recent_drawdown"] = ranked[["net_max_drawdown_1y", "net_max_drawdown_6m", "net_max_drawdown_3m"]].max(axis=1) ranked["rolling_1y_positive"] = ranked["worst_rolling_total_return_rolling_1y"] > 0.0 all_costs = totals.pivot_table( index=key_columns, columns="cost_model", values="net_annualized_return", aggfunc="first", observed=False, ) all_costs["all_cost_positive_10y"] = all_costs.min(axis=1) > 0.0 ranked = ranked.merge(all_costs[["all_cost_positive_10y"]].reset_index(), on=key_columns) ranked = ranked.sort_values( [ "rolling_1y_positive", "min_recent_calmar", "net_calmar_3y", "worst_rolling_total_return_rolling_1y", "net_annualized_return", ], ascending=False, ) return ranked def markdown_summary(totals: pd.DataFrame, horizons: pd.DataFrame, rolling: pd.DataFrame, ranked: pd.DataFrame) -> str: top = ranked.head(8) best = top.iloc[0] if len(top) else None top_names = set(top["name"]) top_buffers = set(zip(top["name"], top["fill_buffer"])) total_top = totals[(totals["cost_model"] == "maker_taker") & (totals["name"].isin(top_names))].sort_values(["name", "fill_buffer"]) horizon_top = horizons[(horizons["cost_model"] == "maker_taker") & (horizons.apply(lambda row: (row["name"], row["fill_buffer"]) in top_buffers, axis=1))].sort_values(["name", "fill_buffer", "horizon"]) rolling_top = rolling[(rolling["cost_model"] == "maker_taker") & (rolling.apply(lambda row: (row["name"], row["fill_buffer"]) in top_buffers, axis=1))].sort_values(["name", "fill_buffer", "window"]) should_include = bool( best is not None and float(best["min_recent_calmar"]) > 0.0 and float(best["worst_rolling_total_return_rolling_1y"]) > 0.0 and bool(best["all_cost_positive_10y"]) ) decision = "建议纳入主报告" if should_include else "暂不建议纳入主报告" if best is None: decision_line = "没有候选完成排名。" else: decision_line = ( f"{decision}: top 候选 `{best['name']}` fill_buffer={float(best['fill_buffer']):.4f}; " f"maker_taker 10y annualized={float(best['net_annualized_return']):.4f}, " f"10y maxDD={float(best['net_max_drawdown']):.4f}, " f"3y Calmar={float(best['net_calmar_3y']):.4f}, " f"min recent Calmar={float(best['min_recent_calmar']):.4f}, " f"worst rolling 1y return={float(best['worst_rolling_total_return_rolling_1y']):.4f}." ) return "\n".join( [ "# ETH TWAP robustness 10y", "", "Scope: targeted robustness review only. Base candidates are the ETH TWAP refine top candidates requested by parameter family; no broad grid search.", "", "Robustness dimensions: fill_buffer 0/0.0002/0.0005, three cost models, continuous 10y backtest sliced into 3y/1y/6m/3m, plus rolling 365D and 30D worst return/drawdown statistics.", "", f"Decision: {decision_line}", "", "## Top maker_taker robustness ranking", "", markdown_table( top, [ "candidate_id", "name", "trades", "fill_buffer", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_calmar_3y", "min_recent_calmar", "max_recent_drawdown", "worst_rolling_total_return_rolling_1y", "worst_rolling_max_drawdown_rolling_1y", "worst_rolling_total_return_rolling_30d", "worst_rolling_max_drawdown_rolling_30d", "rolling_1y_positive", "all_cost_positive_10y", ], ), "", "## Top maker_taker 10y totals", "", markdown_table( total_top, [ "candidate_id", "name", "trades", "fill_buffer", "net_annualized_return", "net_max_drawdown", "net_calmar", "net_sharpe_daily", ], ), "", "## Top recent horizons", "", markdown_table( horizon_top, [ "candidate_id", "name", "fill_buffer", "horizon", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ], ), "", "## Top rolling worst windows", "", markdown_table( rolling_top, [ "candidate_id", "name", "fill_buffer", "window", "worst_return_start", "worst_return_end", "worst_rolling_total_return", "worst_return_window_max_drawdown", "worst_drawdown_start", "worst_drawdown_end", "worst_rolling_max_drawdown", ], ), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--workers", type=int, default=6) args = parser.parse_args() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) totals, horizons, rolling, ranked = run_search(args.workers) totals_path = OUTPUT_DIR / "eth-twap-robustness-10y-totals.csv" horizons_path = OUTPUT_DIR / "eth-twap-robustness-10y-horizons.csv" rolling_path = OUTPUT_DIR / "eth-twap-robustness-10y-rolling.csv" ranked_path = OUTPUT_DIR / "eth-twap-robustness-10y-ranked.csv" summary_path = OUTPUT_DIR / "eth-twap-robustness-10y-summary.md" totals.to_csv(totals_path, index=False) horizons.to_csv(horizons_path, index=False) rolling.to_csv(rolling_path, index=False) ranked.to_csv(ranked_path, index=False) summary_path.write_text(markdown_summary(totals, horizons, rolling, ranked), encoding="utf-8") print(f"wrote {totals_path}") print(f"wrote {horizons_path}") print(f"wrote {rolling_path}") print(f"wrote {ranked_path}") print(f"wrote {summary_path}") print(ranked.head(8).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())