from __future__ import annotations import sys from dataclasses import dataclass from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore SYMBOL = "ETH-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = explore.INITIAL_EQUITY OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-robust-twap-accounting-audit" COSTS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ("21d", pd.DateOffset(days=21)), ) @dataclass(frozen=True) class Spec: trend_sma: int rsi_length: int rsi_threshold: float exit_rsi: float stop_loss_pct: float max_hold_bars: int entry_offsets: tuple[float, ...] entry_valid_bars: int fill_buffer: float = 0.0 @property def name(self) -> str: offsets = "-".join(f"{offset:.4f}" for offset in self.entry_offsets) buffer_label = f"-fb{self.fill_buffer:.4f}" if self.fill_buffer else "" return ( f"rsi2-long-guarded-price-twap-o{offsets}-v{self.entry_valid_bars}{buffer_label}" f"-t{self.trend_sma}-r{self.rsi_length}-l{self.rsi_threshold}" f"-x{self.exit_rsi}-sl{self.stop_loss_pct}-mh{self.max_hold_bars}" ) BASE_SPEC = Spec( trend_sma=60, rsi_length=2, rsi_threshold=3.0, exit_rsi=50.0, stop_loss_pct=0.012, max_hold_bars=48, entry_offsets=(0.003, 0.006, 0.009), entry_valid_bars=4, ) def fmt_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def pct(value: float) -> str: return f"{value * 100:.2f}%" def load_candles() -> list[Candle]: cached, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, SYMBOL, BAR) requested = explore.history_bars_for_years(BAR, YEARS) return cached[-requested:] if len(cached) > requested else cached def compute_rsi(closes: pd.Series, length: int) -> list[float]: deltas = closes.diff() gains = deltas.clip(lower=0.0) losses = -deltas.clip(upper=0.0) rsi = [float("nan")] * len(closes) if len(closes) <= length: return rsi average_gain = float(gains.iloc[1 : length + 1].mean()) average_loss = float(losses.iloc[1 : length + 1].mean()) for index in range(length, len(closes)): if index > length: average_gain = ((average_gain * (length - 1)) + float(gains.iloc[index])) / length average_loss = ((average_loss * (length - 1)) + float(losses.iloc[index])) / length if average_loss == 0.0: rsi[index] = 100.0 if average_gain > 0.0 else 50.0 else: rsi[index] = 100.0 - (100.0 / (1.0 + average_gain / average_loss)) return rsi def close_position( trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_margin_equity = trade_equity( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_margin_equity - margin_used cost_weight = margin_used / account_equity trades.append( { "side": "Long", "entry_time": fmt_ts(int(position["entry_time"])), "exit_time": fmt_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100.0, 4), "return_fraction": pnl / account_equity, "cost_weight": round(cost_weight, 8), "margin_used": round(margin_used, 8), "account_equity_before_exit": round(account_equity, 8), "reason": reason, } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long", "reason": reason}) return account_equity + pnl, pnl > 0.0 def run_corrected_price_twap(candles: list[Candle], spec: Spec) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(spec.trend_sma).mean().tolist() rsi_values = compute_rsi(closes, spec.rsi_length) warmup_bars = max(spec.trend_sma, spec.rsi_length + 1) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_position(trades, exits, position, equity, candle, candle.open, "signal_or_max_hold") wins += 1 if won else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] filled_this_bar = 0 for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - spec.fill_buffer) and equity > 0.0: slice_margin = equity / len(spec.entry_offsets) filled_this_bar += 1 if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": limit_price, "entry_index": index, "margin_used": slice_margin, "stop_price": limit_price * (1.0 - spec.stop_loss_pct), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin position["entry_price"] = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin position["margin_used"] = new_margin position["stop_price"] = float(position["entry_price"]) * (1.0 - spec.stop_loss_pct) entries.append({"ts": candle.ts, "price": limit_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_position(trades, exits, position, equity, candle, float(position["stop_price"]), "stop") wins += 1 if won else 0 current_equity = equity position = None pending_limits = [] if position is not None: position_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = equity - float(position["margin_used"]) + position_equity equity_curve.append( { "ts": candle.ts, "equity": current_equity, "close": candle.close, "cash": equity - float(position["margin_used"]), "margin_used": float(position["margin_used"]), "open_position_equity": position_equity, "filled_slices_this_bar": filled_this_bar, } ) else: equity_curve.append( { "ts": candle.ts, "equity": current_equity, "close": candle.close, "cash": equity, "margin_used": 0.0, "open_position_equity": 0.0, "filled_slices_this_bar": filled_this_bar, } ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_rsi = rsi_values[index] current_trend = trend[index] if current_rsi != current_rsi or current_trend != current_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_rsi >= spec.exit_rsi or held_bars >= spec.max_hold_bars: pending_exit = True pending_limits = [] continue if not pending_limits and candle.close > float(current_trend) and current_rsi <= spec.rsi_threshold: pending_limits = [ {"price": candle.close * (1.0 - offset), "expires_index": index + spec.entry_valid_bars} for offset in spec.entry_offsets ] trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def cost_adjusted_equity_frame(result: SegmentResult, roundtrip_cost_on_margin: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_fraction"]) - roundtrip_cost_on_margin * float(trade["cost_weight"]) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) if result.open_position is not None: rows.append( { "ts": pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True), "equity": float(result.equity_curve[-1]["equity"]), } ) return pd.DataFrame(rows) def metrics_from_equity(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 max_dd = explore.max_drawdown_from_equity([float(value) for value in frame["equity"]]) return { "total_return": total_return, "annualized_return": annualized_return, "max_drawdown": max_dd, "calmar": annualized_return / max_dd if max_dd else 0.0, } def horizon_frame(frame: pd.DataFrame, last_ts: int, offset: pd.DateOffset | None) -> tuple[pd.DataFrame, int]: if offset is None: start_ts = int(frame["ts"].iloc[0].timestamp() * 1000) return frame[["ts", "equity"]].copy(), start_ts end = pd.to_datetime(last_ts, unit="ms", utc=True) cutoff = end - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) sliced = pd.concat( [ pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) return sliced, int(cutoff.timestamp() * 1000) sliced = frame[["ts", "equity"]].copy() return sliced, int(sliced["ts"].iloc[0].timestamp() * 1000) def horizon_rows(result: SegmentResult, spec: Spec, candles: list[Candle], cost_label: str, cost_value: float) -> list[dict[str, object]]: frame = cost_adjusted_equity_frame(result, cost_value) rows = [] for label, offset in HORIZONS: sliced, start_ts = horizon_frame(frame, candles[-1].ts, offset) metrics = metrics_from_equity(sliced, start_ts, candles[-1].ts) rows.append( { "name": spec.name, "cost_model": cost_label, "roundtrip_cost_on_margin": cost_value, "horizon": label, "horizon_start": fmt_ts(start_ts), "horizon_end": fmt_ts(candles[-1].ts), "trades": result.trade_count, **metrics, "trend_sma": spec.trend_sma, "rsi_length": spec.rsi_length, "rsi_threshold": spec.rsi_threshold, "exit_rsi": spec.exit_rsi, "stop_loss_pct": spec.stop_loss_pct, "max_hold_bars": spec.max_hold_bars, "entry_offsets": "-".join(f"{offset:.4f}" for offset in spec.entry_offsets), "entry_valid_bars": spec.entry_valid_bars, "fill_buffer": spec.fill_buffer, } ) return rows def accounting_rows(candles: list[Candle], spec: Spec) -> tuple[list[dict[str, object]], SegmentResult, SegmentResult]: legacy_candidate = explore.build_rsi2_long_guarded_price_twap_candidate( spec.trend_sma, spec.rsi_threshold, spec.exit_rsi, spec.stop_loss_pct, spec.max_hold_bars, spec.entry_offsets, spec.entry_valid_bars, spec.fill_buffer, ) legacy = legacy_candidate.run(candles=candles, leverage=LEVERAGE, warmup_bars=legacy_candidate.warmup_bars) corrected = run_corrected_price_twap(candles, spec) corrected_closed = cost_adjusted_equity_frame(corrected, 0.0) legacy_closed = explore.cost_adjusted_trade_equity_frame(legacy, 0.0) return ( [ { "check": "legacy_mtm_vs_legacy_closed_trade", "legacy_mtm_total_return": legacy.total_return, "legacy_closed_trade_total_return": float(legacy_closed["equity"].iloc[-1] / INITIAL_EQUITY - 1.0), "absolute_gap": abs(legacy.total_return - float(legacy_closed["equity"].iloc[-1] / INITIAL_EQUITY + -1.0)), "pass": False, "reason": "regular exit replaces account equity with margin equity and drops unused cash when partial TWAP fills occur", }, { "check": "corrected_mtm_vs_corrected_closed_trade_gross", "corrected_mtm_total_return": corrected.total_return, "corrected_closed_trade_total_return": float(corrected_closed["equity"].iloc[-1] / INITIAL_EQUITY - 1.0), "absolute_gap": abs(corrected.total_return - float(corrected_closed["equity"].iloc[-1] / INITIAL_EQUITY - 1.0)), "pass": corrected.open_position is None and abs(corrected.total_return - float(corrected_closed["equity"].iloc[-1] / INITIAL_EQUITY - 1.0)) < 1e-8, "reason": "all closed-trade returns are measured against account equity and weighted by margin_used/account_equity", }, { "check": "position_sizing", "max_cost_weight": max(float(trade["cost_weight"]) for trade in corrected.trades) if corrected.trades else 0.0, "min_cost_weight": min(float(trade["cost_weight"]) for trade in corrected.trades) if corrected.trades else 0.0, "open_position": corrected.open_position is not None, "pass": corrected.open_position is None and all(0.0 < float(trade["cost_weight"]) <= 1.00000001 for trade in corrected.trades), "reason": "TWAP sizing uses account_equity/number_of_offsets per filled slice; cost is charged only on filled margin", }, ], legacy, corrected, ) def retest_specs() -> list[Spec]: specs = {BASE_SPEC.name: BASE_SPEC} variants = [ {"entry_offsets": (0.002, 0.005, 0.008)}, {"entry_offsets": (0.004, 0.007, 0.010)}, {"entry_valid_bars": 3}, {"trend_sma": 50}, {"trend_sma": 80}, {"rsi_length": 3}, {"exit_rsi": 45.0}, {"stop_loss_pct": 0.010}, {"max_hold_bars": 36}, ] for values in variants: spec = Spec( trend_sma=int(values.get("trend_sma", BASE_SPEC.trend_sma)), rsi_length=int(values.get("rsi_length", BASE_SPEC.rsi_length)), rsi_threshold=float(values.get("rsi_threshold", BASE_SPEC.rsi_threshold)), exit_rsi=float(values.get("exit_rsi", BASE_SPEC.exit_rsi)), stop_loss_pct=float(values.get("stop_loss_pct", BASE_SPEC.stop_loss_pct)), max_hold_bars=int(values.get("max_hold_bars", BASE_SPEC.max_hold_bars)), entry_offsets=tuple(values.get("entry_offsets", BASE_SPEC.entry_offsets)), entry_valid_bars=int(values.get("entry_valid_bars", BASE_SPEC.entry_valid_bars)), fill_buffer=float(values.get("fill_buffer", BASE_SPEC.fill_buffer)), ) specs[spec.name] = spec return list(specs.values()) def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str: rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)] return "\n".join( [ "| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |", *["| " + " | ".join(row) + " |" for row in rows], ] ) def main() -> int: candles = load_candles() OUTPUT_DIR.mkdir(parents=True, exist_ok=True) accounting, legacy, corrected = accounting_rows(candles, BASE_SPEC) accounting_ok = all(bool(row["pass"]) for row in accounting if row["check"] != "legacy_mtm_vs_legacy_closed_trade") all_rows: list[dict[str, object]] = [] if accounting_ok: for spec in retest_specs(): result = run_corrected_price_twap(candles, spec) for cost_label, cost_value in COSTS.items(): all_rows.extend(horizon_rows(result, spec, candles, cost_label, cost_value)) accounting_frame = pd.DataFrame(accounting) horizon_frame_out = pd.DataFrame(all_rows) accounting_frame.to_csv(OUTPUT_DIR / f"{PREFIX}-accounting.csv", index=False) horizon_frame_out.to_csv(OUTPUT_DIR / f"{PREFIX}-horizons.csv", index=False) top = pd.DataFrame() if len(horizon_frame_out): pivot = horizon_frame_out.pivot_table( index=[ "name", "cost_model", "trades", "trend_sma", "rsi_length", "rsi_threshold", "exit_rsi", "stop_loss_pct", "max_hold_bars", "entry_offsets", "entry_valid_bars", "fill_buffer", ], columns="horizon", values=["total_return", "annualized_return", "max_drawdown", "calmar"], aggfunc="first", observed=False, ) pivot.columns = [f"{metric}_{horizon}" for metric, horizon in pivot.columns] ranked = pivot.reset_index() ranked["min_recent_calmar"] = ranked[["calmar_1y", "calmar_6m", "calmar_3m", "calmar_30d", "calmar_21d"]].min(axis=1) ranked["max_recent_drawdown"] = ranked[["max_drawdown_1y", "max_drawdown_6m", "max_drawdown_3m", "max_drawdown_30d", "max_drawdown_21d"]].max(axis=1) ranked["min_recent_return"] = ranked[["total_return_1y", "total_return_6m", "total_return_3m", "total_return_30d", "total_return_21d"]].min(axis=1) ranked = ranked.sort_values( ["cost_model", "min_recent_return", "min_recent_calmar", "calmar_3y", "annualized_return_full"], ascending=[True, False, False, False, False], ) ranked.to_csv(OUTPUT_DIR / f"{PREFIX}-ranked.csv", index=False) top = ranked.groupby("cost_model", group_keys=False).head(5) else: pd.DataFrame().to_csv(OUTPUT_DIR / f"{PREFIX}-ranked.csv", index=False) base_horizons = horizon_frame_out[horizon_frame_out["name"] == BASE_SPEC.name] if len(horizon_frame_out) else pd.DataFrame() summary_path = OUTPUT_DIR / f"{PREFIX}-summary.md" summary_path.write_text( "\n".join( [ "# ETH Robust TWAP Accounting Audit", "", f"Candidate: `{BASE_SPEC.name}`", f"Candles: {len(candles)} from {fmt_ts(candles[0].ts)} to {fmt_ts(candles[-1].ts)}", "", "## Accounting", "", f"- Legacy runner mark-to-market return: {pct(legacy.total_return)}", f"- Corrected gross mark-to-market return: {pct(corrected.total_return)}", f"- Corrected open position at end: {corrected.open_position is not None}", f"- Corrected trades: {corrected.trade_count}", f"- Accounting status for corrected path: {'PASS' if accounting_ok else 'FAIL'}", "", markdown_table(accounting_frame, list(accounting_frame.columns)), "", "## Base Candidate Horizons", "", markdown_table( base_horizons.sort_values(["cost_model", "horizon"]), [ "cost_model", "horizon", "total_return", "annualized_return", "max_drawdown", "calmar", ], ) if len(base_horizons) else "Not ranked because accounting failed.", "", "## Small Retest Top 5 Per Cost", "", markdown_table( top, [ "cost_model", "name", "trades", "total_return_full", "annualized_return_full", "max_drawdown_full", "total_return_3y", "total_return_1y", "total_return_6m", "total_return_3m", "total_return_30d", "total_return_21d", "min_recent_return", ], ) if len(top) else "Not ranked because accounting failed.", "", "## Files", "", f"- `{OUTPUT_DIR / f'{PREFIX}-accounting.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-horizons.csv'}`", f"- `{OUTPUT_DIR / f'{PREFIX}-ranked.csv'}`", f"- `{summary_path}`", "", ] ), encoding="utf-8", ) print(f"summary={summary_path}") print(f"legacy_total_return={legacy.total_return:.8f}") print(f"corrected_total_return={corrected.total_return:.8f}") print(f"accounting_ok={accounting_ok}") return 0 if accounting_ok else 1 if __name__ == "__main__": raise SystemExit(main())