#!/usr/bin/env python3 from __future__ import annotations import argparse import json from dataclasses import dataclass from pathlib import Path import pandas as pd from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-recent-regime-router-v3" PRIMARY_COST = "maker_taker" COSTS = ( ("maker_maker", 0.0012), ("maker_taker", 0.0021), ("taker_taker", 0.0030), ) HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ) @dataclass(frozen=True) class RouterSpec: name: str band_length: int bandwidth_lookback: int bandwidth_quantile: float trend_lookback: int momentum_lookback: int vol_lookback: int btc_bear_momentum: float btc_bull_momentum: float eth_bear_momentum: float high_vol: float extreme_vol: float ratio_z_lookback: int ratio_z_abs_max: float stop_loss_pct: float middle_exit_buffer_pct: float middle_exit_confirm_bars: int breakeven_trigger_pct: float breakeven_lock_pct: float trail_trigger_pct: float trail_giveback_pct: float max_giveback_trigger_pct: float max_giveback_pct: float cooldown_bars: int def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def _load_candles(symbol: str, bar: str, years: float) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") if years > 0.0: limit = int(years * 365 * 24 * 60 / 15) frame = frame.tail(limit) return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def _align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_out: list[Candle] = [] right_out: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is not None: left_out.append(candle) right_out.append(other) return left_out, right_out def _is_nan(value: float) -> bool: return value != value def favorable_move(side: str, entry_price: float, candle: Candle) -> float: if side == "long": return candle.high / entry_price - 1.0 return entry_price / candle.low - 1.0 def adverse_move(side: str, entry_price: float, candle: Candle) -> float: if side == "long": return 1.0 - candle.low / entry_price return candle.high / entry_price - 1.0 def route_state( *, index: int, frame: pd.DataFrame, spec: RouterSpec, position: dict[str, object] | None, ) -> str: values = ( frame.at[index, "eth_sma"], frame.at[index, "btc_sma"], frame.at[index, "eth_momentum"], frame.at[index, "btc_momentum"], frame.at[index, "eth_vol"], frame.at[index, "ratio_z"], ) if any(_is_nan(float(value)) for value in values): return "cash" if position is not None: mfe = float(position["mfe_pct"]) mae = float(position["mae_pct"]) giveback = mfe - float(position["open_profit_pct"]) if mfe >= spec.max_giveback_trigger_pct and giveback >= spec.max_giveback_pct: return "protected_bb" if mae >= spec.stop_loss_pct * 0.75: return "protected_bb" eth_close = float(frame.at[index, "eth_close"]) btc_close = float(frame.at[index, "btc_close"]) eth_sma = float(frame.at[index, "eth_sma"]) btc_sma = float(frame.at[index, "btc_sma"]) eth_momentum = float(frame.at[index, "eth_momentum"]) btc_momentum = float(frame.at[index, "btc_momentum"]) eth_vol = float(frame.at[index, "eth_vol"]) ratio_z = abs(float(frame.at[index, "ratio_z"])) if eth_vol >= spec.extreme_vol or ratio_z > spec.ratio_z_abs_max: return "cash" if btc_close < btc_sma and btc_momentum <= spec.btc_bear_momentum and eth_close < eth_sma and eth_momentum <= spec.eth_bear_momentum: return "short_bias" if eth_vol >= spec.high_vol: return "protected_bb" if btc_close > btc_sma and btc_momentum >= spec.btc_bull_momentum: return "baseline_bb" return "protected_bb" def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "route": position["route"], "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, "exit_reason": reason, "mfe_pct": round(float(position["mfe_pct"]) * 100.0, 4), "mae_pct": round(float(position["mae_pct"]) * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"], "route": position["route"]}) return exit_equity, pnl > 0.0 def protection_exit(position: dict[str, object], candle: Candle, spec: RouterSpec) -> tuple[float, str] | None: side = str(position["side"]) entry_price = float(position["entry_price"]) mfe = float(position["mfe_pct"]) stop_price = float(position["stop_price"]) protected = str(position["route"]) == "protected_bb" if protected and mfe >= spec.breakeven_trigger_pct: be_stop = entry_price * (1.0 + spec.breakeven_lock_pct if side == "long" else 1.0 - spec.breakeven_lock_pct) stop_price = max(stop_price, be_stop) if side == "long" else min(stop_price, be_stop) if protected and mfe >= spec.trail_trigger_pct: if side == "long": stop_price = max(stop_price, entry_price * (1.0 + mfe - spec.trail_giveback_pct)) else: stop_price = min(stop_price, entry_price * (1.0 - mfe + spec.trail_giveback_pct)) if side == "long": if candle.open <= stop_price: return candle.open, "protect_gap" if stop_price != float(position["stop_price"]) else "stop_gap" if candle.low <= stop_price: return stop_price, "profit_protect" if stop_price != float(position["stop_price"]) else "stop" else: if candle.open >= stop_price: return candle.open, "protect_gap" if stop_price != float(position["stop_price"]) else "stop_gap" if candle.high >= stop_price: return stop_price, "profit_protect" if stop_price != float(position["stop_price"]) else "stop" if not protected or mfe < spec.max_giveback_trigger_pct: return None close_profit = candle.close / entry_price - 1.0 if side == "long" else entry_price / candle.close - 1.0 if close_profit <= mfe - spec.max_giveback_pct: return candle.close, "giveback_close" return None def indicator_frame(eth: list[Candle], btc: list[Candle], spec: RouterSpec) -> pd.DataFrame: frame = pd.DataFrame( { "ts": [candle.ts for candle in eth], "eth_open": [candle.open for candle in eth], "eth_high": [candle.high for candle in eth], "eth_low": [candle.low for candle in eth], "eth_close": [candle.close for candle in eth], "btc_close": [candle.close for candle in btc], } ) eth_close = frame["eth_close"] btc_close = frame["btc_close"] middle = eth_close.rolling(spec.band_length).mean() stdev = eth_close.rolling(spec.band_length).std(ddof=0) frame["middle"] = middle frame["upper"] = middle + 2.0 * stdev frame["lower"] = middle - 2.0 * stdev bandwidth = (frame["upper"] - frame["lower"]) / middle frame["bandwidth"] = bandwidth frame["bandwidth_threshold"] = bandwidth.rolling(spec.bandwidth_lookback).quantile(spec.bandwidth_quantile) frame["eth_sma"] = eth_close.rolling(spec.trend_lookback).mean() frame["btc_sma"] = btc_close.rolling(spec.trend_lookback).mean() frame["eth_momentum"] = eth_close / eth_close.shift(spec.momentum_lookback) - 1.0 frame["btc_momentum"] = btc_close / btc_close.shift(spec.momentum_lookback) - 1.0 frame["eth_vol"] = eth_close.pct_change().rolling(spec.vol_lookback).std(ddof=0) ratio = eth_close / btc_close frame["ratio_z"] = (ratio - ratio.rolling(spec.ratio_z_lookback).mean()) / ratio.rolling(spec.ratio_z_lookback).std(ddof=0) return frame def run_router(eth: list[Candle], btc: list[Candle], spec: RouterSpec) -> tuple[SegmentResult, dict[str, int]]: frame = indicator_frame(eth, btc, spec) warmup = max(spec.band_length, spec.bandwidth_lookback, spec.trend_lookback, spec.momentum_lookback + 1, spec.vol_lookback, spec.ratio_z_lookback) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry: tuple[str, str] | None = None pending_exit = False middle_exit_streak = 0 cooldown_until = -1 state_hits = {"baseline_bb": 0, "protected_bb": 0, "short_bias": 0, "cash": 0} entry_hits = {"baseline_bb_entries": 0, "protected_bb_entries": 0, "short_bias_entries": 0} exit_hits = {"stop_exits": 0, "protect_exits": 0, "giveback_exits": 0, "route_exits": 0, "middle_exits": 0} for index in range(warmup, len(eth)): candle = eth[index] if pending_exit and position is not None: equity, won = close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, reason="route_or_middle") wins += int(won) exit_hits["route_exits"] += 1 position = None pending_exit = False middle_exit_streak = 0 cooldown_until = index + spec.cooldown_bars if pending_entry is not None and position is None and equity > 0.0: side, route = pending_entry entry_price = candle.open position = { "side": side, "route": route, "entry_time": candle.ts, "entry_price": entry_price, "margin_used": equity, "stop_price": entry_price * (1.0 - spec.stop_loss_pct if side == "long" else 1.0 + spec.stop_loss_pct), "mfe_pct": 0.0, "mae_pct": 0.0, "open_profit_pct": 0.0, } entries.append({"ts": candle.ts, "price": entry_price, "side": side, "route": route}) entry_hits[f"{route}_entries"] += 1 pending_entry = None if position is not None: side = str(position["side"]) entry_price = float(position["entry_price"]) position["mfe_pct"] = max(float(position["mfe_pct"]), favorable_move(side, entry_price, candle)) position["mae_pct"] = max(float(position["mae_pct"]), adverse_move(side, entry_price, candle)) position["open_profit_pct"] = candle.close / entry_price - 1.0 if side == "long" else entry_price / candle.close - 1.0 current_state = route_state(index=index, frame=frame, spec=spec, position=position) state_hits[current_state] += 1 current_equity = equity if position is not None: risk_exit = protection_exit(position, candle, spec) if risk_exit is not None: exit_price, reason = risk_exit equity, won = close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, reason=reason) wins += int(won) if reason.startswith("stop"): exit_hits["stop_exits"] += 1 elif reason == "giveback_close": exit_hits["giveback_exits"] += 1 else: exit_hits["protect_exits"] += 1 current_equity = equity position = None middle_exit_streak = 0 cooldown_until = index + spec.cooldown_bars if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue values = ( frame.at[index, "middle"], frame.at[index, "upper"], frame.at[index, "lower"], frame.at[index, "bandwidth"], frame.at[index, "bandwidth_threshold"], ) if any(_is_nan(float(value)) for value in values): continue middle = float(frame.at[index, "middle"]) upper = float(frame.at[index, "upper"]) lower = float(frame.at[index, "lower"]) bandwidth = float(frame.at[index, "bandwidth"]) threshold = float(frame.at[index, "bandwidth_threshold"]) if position is not None: side = str(position["side"]) middle_exit = (side == "long" and candle.close < middle * (1.0 - spec.middle_exit_buffer_pct)) or ( side == "short" and candle.close > middle * (1.0 + spec.middle_exit_buffer_pct) ) route_exit = current_state == "cash" or (side == "long" and current_state == "short_bias") middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 if route_exit or middle_exit_streak >= spec.middle_exit_confirm_bars: pending_exit = True if middle_exit_streak >= spec.middle_exit_confirm_bars: exit_hits["middle_exits"] += 1 continue if index < cooldown_until or current_state == "cash" or bandwidth > threshold: continue if current_state in ("baseline_bb", "protected_bb") and candle.close > upper: pending_entry = ("long", current_state) elif current_state == "short_bias" and candle.close < lower: pending_entry = ("short", current_state) result = SegmentResult( trade_count=len(trades), total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / len(trades) if trades else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth[warmup:], equity_curve=equity_curve, entries=entries, exits=exits, ) return result, {**state_hits, **entry_hits, **exit_hits} def cost_frame(result: SegmentResult, cost: float, last_ts: int) -> pd.DataFrame: if not result.equity_curve: return pd.DataFrame([{"ts": pd.to_datetime(last_ts, unit="ms", utc=True), "equity": INITIAL_EQUITY}]) rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) end_time = pd.to_datetime(last_ts, unit="ms", utc=True) if pd.Timestamp(rows[-1]["ts"]) < end_time: rows.append({"ts": end_time, "equity": equity}) return pd.DataFrame(rows) def max_drawdown(values: list[float]) -> float: peak = values[0] drawdown = 0.0 for value in values: peak = max(peak, value) drawdown = max(drawdown, (peak - value) / peak if peak else 0.0) return drawdown def equity_metrics(frame: pd.DataFrame, start_ts: int, end_ts: int) -> dict[str, float]: years = (end_ts - start_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 dd = max_drawdown([float(value) for value in frame["equity"]]) return { "total_return": total_return, "annualized_return": annualized, "max_drawdown": dd, "calmar": annualized / dd if dd else 0.0, } def scoped_trades(trades: list[dict[str, object]], start: pd.Timestamp | None) -> list[dict[str, object]]: if start is None: return trades return [trade for trade in trades if pd.to_datetime(str(trade["exit_time"]), utc=True) >= start] def trade_stats(trades: list[dict[str, object]], cost: float, start: pd.Timestamp | None = None) -> dict[str, float | int]: scoped = scoped_trades(trades, start) returns = [float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) for trade in scoped] wins = [value for value in returns if value > 0.0] losses = [-value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = sum(losses) / len(losses) if losses else 0.0 return { "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "profit_factor": sum(wins) / sum(losses) if losses else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, } def horizon_rows(name: str, frame: pd.DataFrame, trades: list[dict[str, object]], cost: float) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.Timestamp(frame["ts"].iloc[-1]) for label, offset in HORIZONS: if offset is None: current = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(current["ts"].iloc[0]) else: cutoff = end_time - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) after = frame[frame["ts"] > cutoff] current = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True) start_time = cutoff else: current = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(current["ts"].iloc[0]) metrics = equity_metrics(current, int(start_time.timestamp() * 1000), int(end_time.timestamp() * 1000)) rows.append( { "name": name, "horizon": label, "start": start_time.strftime("%Y-%m-%d %H:%M"), "end": end_time.strftime("%Y-%m-%d %H:%M"), **metrics, **trade_stats(trades, cost, None if offset is None else start_time), } ) return rows def route_rows(name: str, trades: list[dict[str, object]], cost: float, stats: dict[str, int]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] trade_frame = pd.DataFrame(trades) for route in ("baseline_bb", "protected_bb", "short_bias", "cash"): route_trades = [] if trade_frame.empty or route == "cash" else trade_frame[trade_frame["route"] == route].to_dict("records") rows.append( { "name": name, "route": route, "bar_hits": stats.get(route, 0), "entry_hits": stats.get(f"{route}_entries", 0), **trade_stats(route_trades, cost), } ) return rows def specs() -> list[RouterSpec]: out: list[RouterSpec] = [] for high_vol, extreme_vol, bear_momo, bull_momo, ratio_cap, quantile in ( (0.0060, 0.0110, -0.010, 0.006, 2.5, 0.20), (0.0060, 0.0125, -0.014, 0.008, 3.0, 0.20), (0.0075, 0.0125, -0.014, 0.006, 3.0, 0.25), (0.0075, 0.0140, -0.018, 0.008, 3.5, 0.25), ): for band_length, bandwidth_lookback, trend_lookback, momentum_lookback in ( (48, 960, 480, 96), (72, 960, 480, 96), (96, 1440, 672, 192), ): name = ( f"{PREFIX}-l{band_length}-bw{bandwidth_lookback}-q{quantile:g}" f"-tr{trend_lookback}-m{momentum_lookback}-hv{high_vol:g}" f"-xv{extreme_vol:g}-bm{bear_momo:g}-um{bull_momo:g}-rz{ratio_cap:g}" ) out.append( RouterSpec( name=name, band_length=band_length, bandwidth_lookback=bandwidth_lookback, bandwidth_quantile=quantile, trend_lookback=trend_lookback, momentum_lookback=momentum_lookback, vol_lookback=96, btc_bear_momentum=bear_momo, btc_bull_momentum=bull_momo, eth_bear_momentum=bear_momo * 0.7, high_vol=high_vol, extreme_vol=extreme_vol, ratio_z_lookback=672, ratio_z_abs_max=ratio_cap, stop_loss_pct=0.012, middle_exit_buffer_pct=0.001, middle_exit_confirm_bars=2, breakeven_trigger_pct=0.006, breakeven_lock_pct=0.000, trail_trigger_pct=0.012, trail_giveback_pct=0.006, max_giveback_trigger_pct=0.014, max_giveback_pct=0.009, cooldown_bars=24, ) ) return out def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def report_text(command: str, output_files: list[Path], total: pd.DataFrame, horizon: pd.DataFrame, route: pd.DataFrame) -> str: primary = total[total["cost_model"] == PRIMARY_COST].head(10) names = set(primary["name"]) horizon_top = horizon[(horizon["cost_model"] == PRIMARY_COST) & horizon["name"].isin(names)] route_top = route[(route["cost_model"] == PRIMARY_COST) & route["name"].isin(names)] return "\n".join( [ "# ETH recent regime router v3", "", f"Run command: `{command}`", "", "Scope: offline ETH/BTC 15m local OKX candle cache only. No live executor, private API, env, service, or order path is used.", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Router states are `baseline_bb`, `protected_bb`, `short_bias`, and `cash`. State choice uses recent ETH volatility, ETH/BTC trend and momentum, ETH/BTC ratio z-score, and open-position MFE/MAE giveback pressure.", "", "## Top maker_taker routers", "", markdown_table( primary[ [ "name", "trades", "total_return", "annualized_return", "max_drawdown", "calmar", "win_rate", "profit_factor", "payoff_ratio", "min_recent_total_return", ] ] ), "", "## Required horizons", "", markdown_table( horizon_top[ [ "name", "horizon", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "win_rate", "profit_factor", "payoff_ratio", ] ] ), "", "## State hit statistics", "", markdown_table(route_top[["name", "route", "bar_hits", "entry_hits", "trades", "win_rate", "profit_factor", "payoff_ratio"]]), ] ) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int) args = parser.parse_args() eth_raw = _load_candles(ETH_SYMBOL, BAR, args.years) btc_raw = _load_candles(BTC_SYMBOL, BAR, args.years) eth, btc = _align_pair(eth_raw, btc_raw) if not eth: raise RuntimeError("no aligned ETH/BTC candles") candidates = specs() if args.max_candidates is not None: candidates = candidates[: args.max_candidates] total_rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] route_output: list[dict[str, object]] = [] for index, spec in enumerate(candidates, start=1): result, stats = run_router(eth, btc, spec) print(f"done {index}/{len(candidates)} {spec.name} trades={result.trade_count}", flush=True) for cost_model, cost in COSTS: frame = cost_frame(result, cost, eth[-1].ts) start_ts = int(pd.Timestamp(frame["ts"].iloc[0]).timestamp() * 1000) end_ts = int(pd.Timestamp(frame["ts"].iloc[-1]).timestamp() * 1000) metrics = equity_metrics(frame, start_ts, end_ts) current_horizons = horizon_rows(spec.name, frame, result.trades, cost) min_recent = min(float(row["total_return"]) for row in current_horizons if row["horizon"] != "full") total_rows.append( { "name": spec.name, "cost_model": cost_model, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL, "bar": BAR, "first_candle": pd.Timestamp(frame["ts"].iloc[0]).strftime("%Y-%m-%d %H:%M"), "last_candle": pd.Timestamp(frame["ts"].iloc[-1]).strftime("%Y-%m-%d %H:%M"), "years": (end_ts - start_ts) / 86_400_000 / 365, **metrics, "min_recent_total_return": min_recent, **trade_stats(result.trades, cost), **stats, **spec.__dict__, } ) for row in current_horizons: horizon_output.append({"cost_model": cost_model, **row}) for row in route_rows(spec.name, result.trades, cost, stats): route_output.append({"cost_model": cost_model, **row}) total = pd.DataFrame(total_rows).sort_values( ["cost_model", "min_recent_total_return", "calmar", "annualized_return", "trades"], ascending=[True, False, False, False, True], ) horizon = pd.DataFrame(horizon_output) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizon = horizon.sort_values(["cost_model", "name", "horizon"]) route = pd.DataFrame(route_output).sort_values(["cost_model", "name", "bar_hits"], ascending=[True, True, False]) args.output_dir.mkdir(parents=True, exist_ok=True) total_path = args.output_dir / f"{PREFIX}-total.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" route_path = args.output_dir / f"{PREFIX}-states.csv" top_path = args.output_dir / f"{PREFIX}-top10.csv" json_path = args.output_dir / f"{PREFIX}-summary.json" report_path = args.output_dir / f"{PREFIX}-report.md" total.to_csv(total_path, index=False) horizon.to_csv(horizon_path, index=False) route.to_csv(route_path, index=False) total[total["cost_model"] == PRIMARY_COST].head(10).to_csv(top_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" output_files = [total_path, horizon_path, route_path, top_path, json_path, report_path] summary = { "report": PREFIX, "command": command, "primary_cost": PRIMARY_COST, "candidate_count": len(candidates), "horizons": [label for label, _ in HORIZONS], "top_maker_taker": total[total["cost_model"] == PRIMARY_COST].head(10).to_dict("records"), "output_files": [str(path) for path in output_files], } json_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") report_path.write_text(report_text(command, output_files, total, horizon, route), encoding="utf-8") print(total[total["cost_model"] == PRIMARY_COST].head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())