from __future__ import annotations import csv import sys from dataclasses import dataclass from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.models import Candle from scripts import explore_ultrashort as explore SYMBOL = "ETH-USDT-SWAP" BAR = "15m" STRATEGY = "ETH Robust Price TWAP 15m" OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-robust-twap-validation" INITIAL_EQUITY = 10_000.0 LEVERAGE = 3 YEARS = 10.0 TREND_SMA = 60 RSI_THRESHOLD = 3.0 EXIT_RSI = 50.0 STOP_LOSS_PCT = 0.012 MAX_HOLD_BARS = 48 ENTRY_OFFSETS = (0.003, 0.006, 0.009) ENTRY_VALID_BARS = 4 FILL_BUFFER = 0.0 COSTS = { "maker_maker": 0.0012, "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass class ExternalResult: trades: list[dict[str, object]] equity_curve: list[dict[str, object]] trade_count: int total_return: float win_rate: float max_drawdown: float def fmt_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def pct(value: float) -> str: return f"{value * 100:.2f}%" def parse_pct(value: object) -> float: return float(str(value).rstrip("%")) / 100.0 def load_candles() -> list[Candle]: candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, SYMBOL, BAR) requested = explore.history_bars_for_years(BAR, YEARS) return candles[-requested:] if len(candles) > requested else candles def compute_rsi(closes: list[float], length: int) -> list[float]: series = pd.Series(closes, dtype=float) deltas = series.diff() gains = deltas.clip(lower=0.0) losses = -deltas.clip(upper=0.0) rsi = [float("nan")] * len(closes) if len(closes) <= length: return rsi average_gain = float(gains.iloc[1 : length + 1].mean()) average_loss = float(losses.iloc[1 : length + 1].mean()) for index in range(length, len(closes)): if index > length: average_gain = ((average_gain * (length - 1)) + float(gains.iloc[index])) / length average_loss = ((average_loss * (length - 1)) + float(losses.iloc[index])) / length if average_loss == 0.0: rsi[index] = 100.0 if average_gain > 0.0 else 50.0 else: relative_strength = average_gain / average_loss rsi[index] = 100.0 - (100.0 / (1.0 + relative_strength)) return rsi def trade_equity(entry_price: float, exit_price: float, margin_used: float) -> float: return margin_used + margin_used * LEVERAGE * ((exit_price - entry_price) / entry_price) def max_drawdown(values: list[float]) -> float: peak = values[0] drawdown = 0.0 for value in values: peak = max(peak, value) drawdown = max(drawdown, (peak - value) / peak if peak else 0.0) return drawdown def external_backtest(candles: list[Candle], warmup_bars: int) -> ExternalResult: closes = [candle.close for candle in candles] trend = pd.Series(closes, dtype=float).rolling(TREND_SMA).mean().tolist() rsi = compute_rsi(closes, 2) account_equity = INITIAL_EQUITY ending_equity = INITIAL_EQUITY peak_equity = INITIAL_EQUITY worst_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] equity_curve: list[dict[str, object]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: margin_used = float(position["margin_used"]) exit_equity = trade_equity(float(position["entry_price"]), candle.open, margin_used) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": fmt_ts(int(position["entry_time"])), "exit_time": fmt_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(candle.open, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), } ) account_equity = exit_equity wins += 1 if pnl > 0.0 else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - FILL_BUFFER) and account_equity > 0.0: slice_margin = account_equity / len(ENTRY_OFFSETS) if position is None: position = { "entry_time": candle.ts, "entry_price": limit_price, "entry_index": index, "margin_used": slice_margin, "stop_price": limit_price * (1.0 - STOP_LOSS_PCT), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1.0 - STOP_LOSS_PCT) else: active_limits.append(limit) pending_limits = active_limits current_equity = account_equity if position is not None and candle.low <= float(position["stop_price"]): margin_used = float(position["margin_used"]) exit_price = float(position["stop_price"]) exit_equity = trade_equity(float(position["entry_price"]), exit_price, margin_used) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": fmt_ts(int(position["entry_time"])), "exit_time": fmt_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100.0, 4), "cost_weight": round(margin_used / account_equity, 8), } ) account_equity += pnl current_equity = account_equity wins += 1 if pnl > 0.0 else 0 position = None pending_limits = [] if position is not None: position_equity = trade_equity(float(position["entry_price"]), candle.close, float(position["margin_used"])) current_equity = account_equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) worst_drawdown = max(worst_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or account_equity <= 0.0: continue if rsi[index] != rsi[index] or trend[index] != trend[index]: continue if position is not None: held_bars = index - int(position["entry_index"]) if rsi[index] >= EXIT_RSI or held_bars >= MAX_HOLD_BARS: pending_exit = True pending_limits = [] continue if not pending_limits and candle.close > float(trend[index]) and rsi[index] <= RSI_THRESHOLD: pending_limits = [ {"price": candle.close * (1.0 - offset), "expires_index": index + ENTRY_VALID_BARS} for offset in ENTRY_OFFSETS ] return ExternalResult( trades=trades, equity_curve=equity_curve, trade_count=len(trades), total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / len(trades) if trades else 0.0, max_drawdown=worst_drawdown, ) def cost_equity_frame(trades: list[dict[str, object]], first_ts: int, roundtrip_cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(first_ts, unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - roundtrip_cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) return pd.DataFrame(rows) def annualized_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 drawdown = max_drawdown([float(value) for value in frame["equity"]]) return { "return": total_return, "annualized": annualized, "max_dd": drawdown, "calmar": annualized / drawdown if drawdown else 0.0, } def normalize_from_cutoff(frame: pd.DataFrame, cutoff: pd.Timestamp) -> pd.DataFrame: recent = frame[frame["ts"] >= cutoff][["ts", "equity"]].copy() if recent.empty: recent = frame[["ts", "equity"]].copy() recent["equity"] = recent["equity"] / float(recent["equity"].iloc[0]) * INITIAL_EQUITY return recent def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = end - offset horizon = normalize_from_cutoff(frame, cutoff) metrics = annualized_metrics(horizon, int(horizon["ts"].iloc[0].timestamp() * 1000), last_ts) rows.append({"horizon": label, "start": horizon["ts"].iloc[0].strftime("%Y-%m-%d %H:%M"), **metrics}) return rows def monthly_rows(frame: pd.DataFrame, label: str) -> list[dict[str, object]]: month_end = frame.set_index("ts")["equity"].resample("ME").last().ffill() month_start = month_end.shift(1) if len(month_end): month_start.iloc[0] = float(frame["equity"].iloc[0]) rows = [] for period, end_equity, start_equity in zip(month_end.index, month_end, month_start, strict=True): rows.append( { "cost": label, "period": period.tz_localize(None).to_period("M").strftime("%Y-%m"), "return": float(end_equity / start_equity - 1.0), "end_equity": float(end_equity), } ) return rows def trade_stats(trades: list[dict[str, object]]) -> dict[str, float | int]: returns = [float(trade["return_pct"]) / 100.0 for trade in trades] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] max_consecutive_losses = 0 current_losses = 0 for value in returns: if value < 0.0: current_losses += 1 max_consecutive_losses = max(max_consecutive_losses, current_losses) else: current_losses = 0 avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0 return { "trades": len(trades), "win_rate": len(wins) / len(returns) if returns else 0.0, "avg_return": sum(returns) / len(returns) if returns else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, "profit_factor": sum(wins) / abs(sum(losses)) if losses else 0.0, "max_consecutive_losses": max_consecutive_losses, } def compare_report_rows(cost_rows: list[dict[str, object]], recent_metrics: dict[str, object]) -> list[dict[str, object]]: comparisons: list[dict[str, object]] = [] recent_summary = pd.read_csv("reports/ultrashort/ultrashort-recent-summary.csv") recent_row = recent_summary[recent_summary["strategy"] == STRATEGY].iloc[0] for key in ("3y_return", "3y_annualized", "3y_max_dd"): local = float(recent_metrics[key]) report = parse_pct(recent_row[key]) comparisons.append( { "source": "ultrashort-recent-summary.csv", "field": key, "local": local, "report": report, "diff": local - report, "flag": abs(local - report) > 0.001, } ) cost_summary = pd.read_csv("reports/ultrashort/ultrashort-cost-scenarios.csv") for local_row in cost_rows: report_row = cost_summary[(cost_summary["strategy"] == STRATEGY) & (cost_summary["cost"] == local_row["cost"])].iloc[0] for key in ("return", "annualized", "max_dd"): local = float(local_row[key]) report = parse_pct(report_row[key]) comparisons.append( { "source": "ultrashort-cost-scenarios.csv", "field": f"{local_row['cost']} {key}", "local": local, "report": report, "diff": local - report, "flag": abs(local - report) > 0.001, } ) return comparisons def main() -> int: candles = load_candles() candidate = explore.build_rsi2_long_guarded_price_twap_candidate( TREND_SMA, RSI_THRESHOLD, EXIT_RSI, STOP_LOSS_PCT, MAX_HOLD_BARS, ENTRY_OFFSETS, ENTRY_VALID_BARS, FILL_BUFFER, ) main_result = candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars) external_result = external_backtest(candles, candidate.warmup_bars) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) cost_rows = [] horizon_rows_out = [] monthly_out = [] equity_rows = [ { "equity_type": "mark_to_market", "cost": "gross", "ts": pd.to_datetime(point["ts"], unit="ms", utc=True), "equity": point["equity"], "close": point["close"], } for point in main_result.equity_curve ] for cost_name, cost_value in COSTS.items(): frame = explore.cost_adjusted_trade_equity_frame(main_result, cost_value) equity_rows.extend( { "equity_type": "closed_trade_cost_adjusted", "cost": cost_name, "ts": row.ts, "equity": row.equity, "close": "", } for row in frame.itertuples(index=False) ) metrics = explore.annualized_metrics_from_equity(frame, int(frame["ts"].iloc[0].timestamp() * 1000), candles[-1].ts) cost_rows.append( { "cost": cost_name, "roundtrip_cost_on_margin": cost_value, "return": metrics["net_total_return"], "annualized": metrics["net_annualized_return"], "max_dd": metrics["net_max_drawdown"], "calmar": metrics["net_calmar"], } ) for row in horizon_metrics(frame, candles[-1].ts): horizon_rows_out.append({"cost": cost_name, **row}) monthly_out.extend(monthly_rows(frame, cost_name)) maker_taker = explore.cost_adjusted_trade_equity_frame(main_result, COSTS["maker_taker"]) cutoff_3y = pd.to_datetime(candles[-1].ts, unit="ms", utc=True) - pd.DateOffset(years=3) recent_3y = normalize_from_cutoff(maker_taker, cutoff_3y) recent_3y_metrics = annualized_metrics(recent_3y, int(recent_3y["ts"].iloc[0].timestamp() * 1000), candles[-1].ts) recent_metrics = { "3y_return": recent_3y_metrics["return"], "3y_annualized": recent_3y_metrics["annualized"], "3y_max_dd": recent_3y_metrics["max_dd"], } comparisons = compare_report_rows(cost_rows, recent_metrics) main_first = main_result.trades[:50] external_first = external_result.trades[:50] trade_compare_rows = [] for index, (main_trade, external_trade) in enumerate(zip(main_first, external_first, strict=True), start=1): fields = ("entry_time", "exit_time", "entry_price", "exit_price") trade_compare_rows.append( { "index": index, **{f"main_{field}": main_trade[field] for field in fields}, **{f"external_{field}": external_trade[field] for field in fields}, "match": all(main_trade[field] == external_trade[field] for field in fields), } ) stats = trade_stats(main_result.trades) gross_metrics = { "return": main_result.total_return, "annualized": (1.0 + main_result.total_return) ** (1.0 / ((candles[-1].ts - candles[0].ts) / 86_400_000 / 365)) - 1.0, "max_dd": main_result.max_drawdown, "calmar": 0.0, } gross_metrics["calmar"] = gross_metrics["annualized"] / gross_metrics["max_dd"] if gross_metrics["max_dd"] else 0.0 metrics_rows = [ {"section": "gross_10y", **gross_metrics}, {"section": "trades", **stats}, { "section": "external_delta", "trades": external_result.trade_count - main_result.trade_count, "return": external_result.total_return - main_result.total_return, "win_rate": external_result.win_rate - main_result.win_rate, "max_dd": external_result.max_drawdown - main_result.max_drawdown, }, ] worst_month = min(monthly_out, key=lambda row: row["return"]) main_trade_count_match = external_result.trade_count == main_result.trade_count first_50_match = all(bool(row["match"]) for row in trade_compare_rows) flagged = [row for row in comparisons if row["flag"]] pd.DataFrame(equity_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-equity.csv", index=False) pd.DataFrame(metrics_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-metrics.csv", index=False) pd.DataFrame(cost_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-costs.csv", index=False) pd.DataFrame(horizon_rows_out).to_csv(OUTPUT_DIR / f"{PREFIX}-horizons.csv", index=False) pd.DataFrame(monthly_out).to_csv(OUTPUT_DIR / f"{PREFIX}-monthly.csv", index=False) pd.DataFrame(comparisons).to_csv(OUTPUT_DIR / f"{PREFIX}-comparison.csv", index=False) pd.DataFrame(trade_compare_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-first50-trades.csv", index=False) summary = OUTPUT_DIR / f"{PREFIX}-summary.md" summary.write_text( "\n".join( [ "# ETH Robust Price TWAP 15m Validation", "", f"Candidate: `{candidate.name}`", f"Candles: {len(candles)} from {fmt_ts(candles[0].ts)} to {fmt_ts(candles[-1].ts)}", f"Params: trend={TREND_SMA}, rsi={RSI_THRESHOLD:g}, exit={EXIT_RSI:g}, stop={STOP_LOSS_PCT}, max_hold={MAX_HOLD_BARS}, offsets={ENTRY_OFFSETS}, valid={ENTRY_VALID_BARS}, fill_buffer={FILL_BUFFER:g}", "", "## Main Runner", "", f"- Mark-to-market 10y return: {pct(main_result.total_return)}", f"- Mark-to-market 10y annualized: {pct(gross_metrics['annualized'])}", f"- Mark-to-market max drawdown: {pct(main_result.max_drawdown)}", f"- Trades: {main_result.trade_count}", f"- Win rate: {pct(float(stats['win_rate']))}", f"- Average trade return: {pct(float(stats['avg_return']))}", f"- Payoff ratio: {float(stats['payoff_ratio']):.4f}", f"- Profit factor: {float(stats['profit_factor']):.4f}", f"- Max consecutive losses: {int(stats['max_consecutive_losses'])}", f"- Worst month: {worst_month['cost']} {worst_month['period']} {pct(float(worst_month['return']))}", "", "## Cost Scenarios", "", "| cost | return | annualized | max_dd | calmar |", "| --- | --- | --- | --- | --- |", *[ f"| {row['cost']} | {pct(float(row['return']))} | {pct(float(row['annualized']))} | {pct(float(row['max_dd']))} | {float(row['calmar']):.2f} |" for row in cost_rows ], "", "## Report Comparison", "", f"- Differences over 0.1%: {len(flagged)}", f"- First 50 independent trades match: {first_50_match}", f"- Independent full trade count matches: {main_trade_count_match}", f"- Note: report CSV rows are closed-trade cost-adjusted equity, not the runner's open-position mark-to-market `total_return`.", f"- Finding: the runner's mark-to-market `total_return` collapses to {pct(main_result.total_return)} while the closed-trade cost equity used by the report is positive; this is a separate accounting-path issue from the CSV reproduction.", "", "## Generated Files", "", f"- `{OUTPUT_DIR / f'{PREFIX}-summary.md'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-metrics.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-costs.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-horizons.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-monthly.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-comparison.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-first50-trades.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-equity.csv'}`", "", ] ), encoding="utf-8", ) print(f"summary={summary}") print(f"flagged_differences={len(flagged)}") print(f"first_50_independent_trades_match={first_50_match}") print(f"independent_trade_count_match={main_trade_count_match}") return 1 if flagged or not first_50_match or not main_trade_count_match else 0 if __name__ == "__main__": raise SystemExit(main())