| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- from __future__ import annotations
- import csv
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- ROOT = Path(__file__).resolve().parents[1]
- sys.path.insert(0, str(ROOT))
- from okx_codex_trader.models import Candle
- from scripts import explore_ultrashort as explore
- SYMBOL = "ETH-USDT-SWAP"
- BAR = "15m"
- STRATEGY = "ETH Robust Price TWAP 15m"
- OUTPUT_DIR = Path("reports/eth-exploration")
- PREFIX = "eth-robust-twap-validation"
- INITIAL_EQUITY = 10_000.0
- LEVERAGE = 3
- YEARS = 10.0
- TREND_SMA = 60
- RSI_THRESHOLD = 3.0
- EXIT_RSI = 50.0
- STOP_LOSS_PCT = 0.012
- MAX_HOLD_BARS = 48
- ENTRY_OFFSETS = (0.003, 0.006, 0.009)
- ENTRY_VALID_BARS = 4
- FILL_BUFFER = 0.0
- COSTS = {
- "maker_maker": 0.0012,
- "maker_taker": 0.0021,
- "taker_taker": 0.0030,
- }
- HORIZONS = (
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass
- class ExternalResult:
- trades: list[dict[str, object]]
- equity_curve: list[dict[str, object]]
- trade_count: int
- total_return: float
- win_rate: float
- max_drawdown: float
- def fmt_ts(ts: int) -> str:
- return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M")
- def pct(value: float) -> str:
- return f"{value * 100:.2f}%"
- def parse_pct(value: object) -> float:
- return float(str(value).rstrip("%")) / 100.0
- def load_candles() -> list[Candle]:
- candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, SYMBOL, BAR)
- requested = explore.history_bars_for_years(BAR, YEARS)
- return candles[-requested:] if len(candles) > requested else candles
- def compute_rsi(closes: list[float], length: int) -> list[float]:
- series = pd.Series(closes, dtype=float)
- deltas = series.diff()
- gains = deltas.clip(lower=0.0)
- losses = -deltas.clip(upper=0.0)
- rsi = [float("nan")] * len(closes)
- if len(closes) <= length:
- return rsi
- average_gain = float(gains.iloc[1 : length + 1].mean())
- average_loss = float(losses.iloc[1 : length + 1].mean())
- for index in range(length, len(closes)):
- if index > length:
- average_gain = ((average_gain * (length - 1)) + float(gains.iloc[index])) / length
- average_loss = ((average_loss * (length - 1)) + float(losses.iloc[index])) / length
- if average_loss == 0.0:
- rsi[index] = 100.0 if average_gain > 0.0 else 50.0
- else:
- relative_strength = average_gain / average_loss
- rsi[index] = 100.0 - (100.0 / (1.0 + relative_strength))
- return rsi
- def trade_equity(entry_price: float, exit_price: float, margin_used: float) -> float:
- return margin_used + margin_used * LEVERAGE * ((exit_price - entry_price) / entry_price)
- def max_drawdown(values: list[float]) -> float:
- peak = values[0]
- drawdown = 0.0
- for value in values:
- peak = max(peak, value)
- drawdown = max(drawdown, (peak - value) / peak if peak else 0.0)
- return drawdown
- def external_backtest(candles: list[Candle], warmup_bars: int) -> ExternalResult:
- closes = [candle.close for candle in candles]
- trend = pd.Series(closes, dtype=float).rolling(TREND_SMA).mean().tolist()
- rsi = compute_rsi(closes, 2)
- account_equity = INITIAL_EQUITY
- ending_equity = INITIAL_EQUITY
- peak_equity = INITIAL_EQUITY
- worst_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, object]] = []
- position: dict[str, object] | None = None
- pending_limits: list[dict[str, float | int]] = []
- pending_exit = False
- for index in range(warmup_bars, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- margin_used = float(position["margin_used"])
- exit_equity = trade_equity(float(position["entry_price"]), candle.open, margin_used)
- pnl = exit_equity - margin_used
- trades.append(
- {
- "side": "Long",
- "entry_time": fmt_ts(int(position["entry_time"])),
- "exit_time": fmt_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(candle.open, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / margin_used * 100.0, 4),
- }
- )
- account_equity = exit_equity
- wins += 1 if pnl > 0.0 else 0
- position = None
- pending_exit = False
- pending_limits = []
- active_limits: list[dict[str, float | int]] = []
- for limit in pending_limits:
- if index > int(limit["expires_index"]):
- continue
- limit_price = float(limit["price"])
- if candle.low <= limit_price * (1.0 - FILL_BUFFER) and account_equity > 0.0:
- slice_margin = account_equity / len(ENTRY_OFFSETS)
- if position is None:
- position = {
- "entry_time": candle.ts,
- "entry_price": limit_price,
- "entry_index": index,
- "margin_used": slice_margin,
- "stop_price": limit_price * (1.0 - STOP_LOSS_PCT),
- }
- else:
- old_margin = float(position["margin_used"])
- new_margin = old_margin + slice_margin
- entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin
- position["entry_price"] = entry_price
- position["margin_used"] = new_margin
- position["stop_price"] = entry_price * (1.0 - STOP_LOSS_PCT)
- else:
- active_limits.append(limit)
- pending_limits = active_limits
- current_equity = account_equity
- if position is not None and candle.low <= float(position["stop_price"]):
- margin_used = float(position["margin_used"])
- exit_price = float(position["stop_price"])
- exit_equity = trade_equity(float(position["entry_price"]), exit_price, margin_used)
- pnl = exit_equity - margin_used
- trades.append(
- {
- "side": "Long",
- "entry_time": fmt_ts(int(position["entry_time"])),
- "exit_time": fmt_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / account_equity * 100.0, 4),
- "cost_weight": round(margin_used / account_equity, 8),
- }
- )
- account_equity += pnl
- current_equity = account_equity
- wins += 1 if pnl > 0.0 else 0
- position = None
- pending_limits = []
- if position is not None:
- position_equity = trade_equity(float(position["entry_price"]), candle.close, float(position["margin_used"]))
- current_equity = account_equity - float(position["margin_used"]) + position_equity
- peak_equity = max(peak_equity, current_equity)
- worst_drawdown = max(worst_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1 or account_equity <= 0.0:
- continue
- if rsi[index] != rsi[index] or trend[index] != trend[index]:
- continue
- if position is not None:
- held_bars = index - int(position["entry_index"])
- if rsi[index] >= EXIT_RSI or held_bars >= MAX_HOLD_BARS:
- pending_exit = True
- pending_limits = []
- continue
- if not pending_limits and candle.close > float(trend[index]) and rsi[index] <= RSI_THRESHOLD:
- pending_limits = [
- {"price": candle.close * (1.0 - offset), "expires_index": index + ENTRY_VALID_BARS}
- for offset in ENTRY_OFFSETS
- ]
- return ExternalResult(
- trades=trades,
- equity_curve=equity_curve,
- trade_count=len(trades),
- total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
- win_rate=wins / len(trades) if trades else 0.0,
- max_drawdown=worst_drawdown,
- )
- def cost_equity_frame(trades: list[dict[str, object]], first_ts: int, roundtrip_cost: float) -> pd.DataFrame:
- rows = [{"ts": pd.to_datetime(first_ts, unit="ms", utc=True), "equity": INITIAL_EQUITY}]
- equity = INITIAL_EQUITY
- for trade in trades:
- equity *= 1.0 + float(trade["return_pct"]) / 100.0 - roundtrip_cost * float(trade.get("cost_weight", 1.0))
- rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity})
- return pd.DataFrame(rows)
- def annualized_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]:
- years = (last_ts - first_ts) / 86_400_000 / 365
- total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0)
- annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0
- drawdown = max_drawdown([float(value) for value in frame["equity"]])
- return {
- "return": total_return,
- "annualized": annualized,
- "max_dd": drawdown,
- "calmar": annualized / drawdown if drawdown else 0.0,
- }
- def normalize_from_cutoff(frame: pd.DataFrame, cutoff: pd.Timestamp) -> pd.DataFrame:
- recent = frame[frame["ts"] >= cutoff][["ts", "equity"]].copy()
- if recent.empty:
- recent = frame[["ts", "equity"]].copy()
- recent["equity"] = recent["equity"] / float(recent["equity"].iloc[0]) * INITIAL_EQUITY
- return recent
- def horizon_metrics(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- end = pd.to_datetime(last_ts, unit="ms", utc=True)
- for label, offset in HORIZONS:
- cutoff = end - offset
- horizon = normalize_from_cutoff(frame, cutoff)
- metrics = annualized_metrics(horizon, int(horizon["ts"].iloc[0].timestamp() * 1000), last_ts)
- rows.append({"horizon": label, "start": horizon["ts"].iloc[0].strftime("%Y-%m-%d %H:%M"), **metrics})
- return rows
- def monthly_rows(frame: pd.DataFrame, label: str) -> list[dict[str, object]]:
- month_end = frame.set_index("ts")["equity"].resample("ME").last().ffill()
- month_start = month_end.shift(1)
- if len(month_end):
- month_start.iloc[0] = float(frame["equity"].iloc[0])
- rows = []
- for period, end_equity, start_equity in zip(month_end.index, month_end, month_start, strict=True):
- rows.append(
- {
- "cost": label,
- "period": period.tz_localize(None).to_period("M").strftime("%Y-%m"),
- "return": float(end_equity / start_equity - 1.0),
- "end_equity": float(end_equity),
- }
- )
- return rows
- def trade_stats(trades: list[dict[str, object]]) -> dict[str, float | int]:
- returns = [float(trade["return_pct"]) / 100.0 for trade in trades]
- wins = [value for value in returns if value > 0.0]
- losses = [value for value in returns if value < 0.0]
- max_consecutive_losses = 0
- current_losses = 0
- for value in returns:
- if value < 0.0:
- current_losses += 1
- max_consecutive_losses = max(max_consecutive_losses, current_losses)
- else:
- current_losses = 0
- avg_win = sum(wins) / len(wins) if wins else 0.0
- avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0
- return {
- "trades": len(trades),
- "win_rate": len(wins) / len(returns) if returns else 0.0,
- "avg_return": sum(returns) / len(returns) if returns else 0.0,
- "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0,
- "profit_factor": sum(wins) / abs(sum(losses)) if losses else 0.0,
- "max_consecutive_losses": max_consecutive_losses,
- }
- def compare_report_rows(cost_rows: list[dict[str, object]], recent_metrics: dict[str, object]) -> list[dict[str, object]]:
- comparisons: list[dict[str, object]] = []
- recent_summary = pd.read_csv("reports/ultrashort/ultrashort-recent-summary.csv")
- recent_row = recent_summary[recent_summary["strategy"] == STRATEGY].iloc[0]
- for key in ("3y_return", "3y_annualized", "3y_max_dd"):
- local = float(recent_metrics[key])
- report = parse_pct(recent_row[key])
- comparisons.append(
- {
- "source": "ultrashort-recent-summary.csv",
- "field": key,
- "local": local,
- "report": report,
- "diff": local - report,
- "flag": abs(local - report) > 0.001,
- }
- )
- cost_summary = pd.read_csv("reports/ultrashort/ultrashort-cost-scenarios.csv")
- for local_row in cost_rows:
- report_row = cost_summary[(cost_summary["strategy"] == STRATEGY) & (cost_summary["cost"] == local_row["cost"])].iloc[0]
- for key in ("return", "annualized", "max_dd"):
- local = float(local_row[key])
- report = parse_pct(report_row[key])
- comparisons.append(
- {
- "source": "ultrashort-cost-scenarios.csv",
- "field": f"{local_row['cost']} {key}",
- "local": local,
- "report": report,
- "diff": local - report,
- "flag": abs(local - report) > 0.001,
- }
- )
- return comparisons
- def main() -> int:
- candles = load_candles()
- candidate = explore.build_rsi2_long_guarded_price_twap_candidate(
- TREND_SMA,
- RSI_THRESHOLD,
- EXIT_RSI,
- STOP_LOSS_PCT,
- MAX_HOLD_BARS,
- ENTRY_OFFSETS,
- ENTRY_VALID_BARS,
- FILL_BUFFER,
- )
- main_result = candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=candidate.warmup_bars)
- external_result = external_backtest(candles, candidate.warmup_bars)
- OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
- cost_rows = []
- horizon_rows_out = []
- monthly_out = []
- equity_rows = [
- {
- "equity_type": "mark_to_market",
- "cost": "gross",
- "ts": pd.to_datetime(point["ts"], unit="ms", utc=True),
- "equity": point["equity"],
- "close": point["close"],
- }
- for point in main_result.equity_curve
- ]
- for cost_name, cost_value in COSTS.items():
- frame = explore.cost_adjusted_trade_equity_frame(main_result, cost_value)
- equity_rows.extend(
- {
- "equity_type": "closed_trade_cost_adjusted",
- "cost": cost_name,
- "ts": row.ts,
- "equity": row.equity,
- "close": "",
- }
- for row in frame.itertuples(index=False)
- )
- metrics = explore.annualized_metrics_from_equity(frame, int(frame["ts"].iloc[0].timestamp() * 1000), candles[-1].ts)
- cost_rows.append(
- {
- "cost": cost_name,
- "roundtrip_cost_on_margin": cost_value,
- "return": metrics["net_total_return"],
- "annualized": metrics["net_annualized_return"],
- "max_dd": metrics["net_max_drawdown"],
- "calmar": metrics["net_calmar"],
- }
- )
- for row in horizon_metrics(frame, candles[-1].ts):
- horizon_rows_out.append({"cost": cost_name, **row})
- monthly_out.extend(monthly_rows(frame, cost_name))
- maker_taker = explore.cost_adjusted_trade_equity_frame(main_result, COSTS["maker_taker"])
- cutoff_3y = pd.to_datetime(candles[-1].ts, unit="ms", utc=True) - pd.DateOffset(years=3)
- recent_3y = normalize_from_cutoff(maker_taker, cutoff_3y)
- recent_3y_metrics = annualized_metrics(recent_3y, int(recent_3y["ts"].iloc[0].timestamp() * 1000), candles[-1].ts)
- recent_metrics = {
- "3y_return": recent_3y_metrics["return"],
- "3y_annualized": recent_3y_metrics["annualized"],
- "3y_max_dd": recent_3y_metrics["max_dd"],
- }
- comparisons = compare_report_rows(cost_rows, recent_metrics)
- main_first = main_result.trades[:50]
- external_first = external_result.trades[:50]
- trade_compare_rows = []
- for index, (main_trade, external_trade) in enumerate(zip(main_first, external_first, strict=True), start=1):
- fields = ("entry_time", "exit_time", "entry_price", "exit_price")
- trade_compare_rows.append(
- {
- "index": index,
- **{f"main_{field}": main_trade[field] for field in fields},
- **{f"external_{field}": external_trade[field] for field in fields},
- "match": all(main_trade[field] == external_trade[field] for field in fields),
- }
- )
- stats = trade_stats(main_result.trades)
- gross_metrics = {
- "return": main_result.total_return,
- "annualized": (1.0 + main_result.total_return) ** (1.0 / ((candles[-1].ts - candles[0].ts) / 86_400_000 / 365)) - 1.0,
- "max_dd": main_result.max_drawdown,
- "calmar": 0.0,
- }
- gross_metrics["calmar"] = gross_metrics["annualized"] / gross_metrics["max_dd"] if gross_metrics["max_dd"] else 0.0
- metrics_rows = [
- {"section": "gross_10y", **gross_metrics},
- {"section": "trades", **stats},
- {
- "section": "external_delta",
- "trades": external_result.trade_count - main_result.trade_count,
- "return": external_result.total_return - main_result.total_return,
- "win_rate": external_result.win_rate - main_result.win_rate,
- "max_dd": external_result.max_drawdown - main_result.max_drawdown,
- },
- ]
- worst_month = min(monthly_out, key=lambda row: row["return"])
- main_trade_count_match = external_result.trade_count == main_result.trade_count
- first_50_match = all(bool(row["match"]) for row in trade_compare_rows)
- flagged = [row for row in comparisons if row["flag"]]
- pd.DataFrame(equity_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-equity.csv", index=False)
- pd.DataFrame(metrics_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-metrics.csv", index=False)
- pd.DataFrame(cost_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-costs.csv", index=False)
- pd.DataFrame(horizon_rows_out).to_csv(OUTPUT_DIR / f"{PREFIX}-horizons.csv", index=False)
- pd.DataFrame(monthly_out).to_csv(OUTPUT_DIR / f"{PREFIX}-monthly.csv", index=False)
- pd.DataFrame(comparisons).to_csv(OUTPUT_DIR / f"{PREFIX}-comparison.csv", index=False)
- pd.DataFrame(trade_compare_rows).to_csv(OUTPUT_DIR / f"{PREFIX}-first50-trades.csv", index=False)
- summary = OUTPUT_DIR / f"{PREFIX}-summary.md"
- summary.write_text(
- "\n".join(
- [
- "# ETH Robust Price TWAP 15m Validation",
- "",
- f"Candidate: `{candidate.name}`",
- f"Candles: {len(candles)} from {fmt_ts(candles[0].ts)} to {fmt_ts(candles[-1].ts)}",
- f"Params: trend={TREND_SMA}, rsi={RSI_THRESHOLD:g}, exit={EXIT_RSI:g}, stop={STOP_LOSS_PCT}, max_hold={MAX_HOLD_BARS}, offsets={ENTRY_OFFSETS}, valid={ENTRY_VALID_BARS}, fill_buffer={FILL_BUFFER:g}",
- "",
- "## Main Runner",
- "",
- f"- Mark-to-market 10y return: {pct(main_result.total_return)}",
- f"- Mark-to-market 10y annualized: {pct(gross_metrics['annualized'])}",
- f"- Mark-to-market max drawdown: {pct(main_result.max_drawdown)}",
- f"- Trades: {main_result.trade_count}",
- f"- Win rate: {pct(float(stats['win_rate']))}",
- f"- Average trade return: {pct(float(stats['avg_return']))}",
- f"- Payoff ratio: {float(stats['payoff_ratio']):.4f}",
- f"- Profit factor: {float(stats['profit_factor']):.4f}",
- f"- Max consecutive losses: {int(stats['max_consecutive_losses'])}",
- f"- Worst month: {worst_month['cost']} {worst_month['period']} {pct(float(worst_month['return']))}",
- "",
- "## Cost Scenarios",
- "",
- "| cost | return | annualized | max_dd | calmar |",
- "| --- | --- | --- | --- | --- |",
- *[
- f"| {row['cost']} | {pct(float(row['return']))} | {pct(float(row['annualized']))} | {pct(float(row['max_dd']))} | {float(row['calmar']):.2f} |"
- for row in cost_rows
- ],
- "",
- "## Report Comparison",
- "",
- f"- Differences over 0.1%: {len(flagged)}",
- f"- First 50 independent trades match: {first_50_match}",
- f"- Independent full trade count matches: {main_trade_count_match}",
- f"- Note: report CSV rows are closed-trade cost-adjusted equity, not the runner's open-position mark-to-market `total_return`.",
- f"- Finding: the runner's mark-to-market `total_return` collapses to {pct(main_result.total_return)} while the closed-trade cost equity used by the report is positive; this is a separate accounting-path issue from the CSV reproduction.",
- "",
- "## Generated Files",
- "",
- f"- `{OUTPUT_DIR / f'{PREFIX}-summary.md'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-metrics.csv'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-costs.csv'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-horizons.csv'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-monthly.csv'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-comparison.csv'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-first50-trades.csv'}`",
- f"- `{OUTPUT_DIR / f'{PREFIX}-equity.csv'}`",
- "",
- ]
- ),
- encoding="utf-8",
- )
- print(f"summary={summary}")
- print(f"flagged_differences={len(flagged)}")
- print(f"first_50_independent_trades_match={first_50_match}")
- print(f"independent_trade_count_match={main_trade_count_match}")
- return 1 if flagged or not first_50_match or not main_trade_count_match else 0
- if __name__ == "__main__":
- raise SystemExit(main())
|