| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import argparse
- import json
- import sys
- from dataclasses import dataclass
- from itertools import product
- from pathlib import Path
- import pandas as pd
- ROOT = Path(__file__).resolve().parents[1]
- sys.path.insert(0, str(ROOT))
- from scripts import explore_ultrashort as explore
- ETH_SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- PREFIX = "eth-regime-router"
- OUTPUT_DIR = Path("reports/eth-exploration")
- YEARS = 10.0
- BAR = "15m"
- PRIMARY_COST = "maker_taker"
- COSTS = {
- "maker_taker": 0.0021,
- "taker_taker": 0.0030,
- }
- HORIZONS = (
- ("all", None),
- ("10y", pd.DateOffset(years=10)),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class RouterSpec:
- name: str
- trend_sma: int
- vol_lookback: int
- corr_lookback: int
- ratio_lookback: int
- lead_lookback: int
- eth_trend_min: float
- btc_trend_min: float
- max_eth_vol: float
- min_corr: float
- ratio_z_entry: float
- lead_return_min: float
- lag_gap_min: float
- min_volume_ratio: float
- stop_loss_pct: float
- take_profit_pct: float
- max_hold_bars: int
- def load_candles(symbol: str, bar: str, years: float) -> list[explore.Candle]:
- candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar)
- if not candles:
- raise FileNotFoundError(f"missing cached candles for {symbol} {bar}")
- requested = explore.history_bars_for_years(bar, years)
- return candles[-requested:] if len(candles) > requested else candles
- def close_position(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- account_equity: float,
- candle: explore.Candle,
- exit_price: float,
- leverage: int,
- ) -> tuple[float, bool]:
- margin_used = float(position["margin_used"])
- exit_equity = explore.trade_equity(
- side="long",
- margin_used=margin_used,
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=leverage,
- )
- pnl = exit_equity - margin_used
- trades.append(
- {
- "side": "Long",
- "entry_time": explore._format_ts(int(position["entry_time"])),
- "exit_time": explore._format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / account_equity * 100, 6),
- "cost_weight": 1.0,
- "regime": str(position["regime"]),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": "long", "regime": str(position["regime"])})
- return account_equity + pnl, pnl > 0.0
- def regime_action(
- *,
- index: int,
- eth: list[explore.Candle],
- btc: list[explore.Candle],
- eth_close: pd.Series,
- btc_close: pd.Series,
- eth_volume: pd.Series,
- eth_sma: pd.Series,
- btc_sma: pd.Series,
- eth_vol: pd.Series,
- btc_vol: pd.Series,
- corr: pd.Series,
- ratio_z: pd.Series,
- volume_ratio: pd.Series,
- eth_rsi: list[float],
- spec: RouterSpec,
- ) -> str:
- values = (
- eth_sma.iloc[index],
- btc_sma.iloc[index],
- eth_vol.iloc[index],
- btc_vol.iloc[index],
- corr.iloc[index],
- ratio_z.iloc[index],
- volume_ratio.iloc[index],
- )
- if any(value != value for value in values):
- return "cash"
- eth_trend = eth_close.iloc[index] / eth_sma.iloc[index] - 1.0
- btc_trend = btc_close.iloc[index] / btc_sma.iloc[index] - 1.0
- eth_ret = eth_close.iloc[index] / eth_close.iloc[index - spec.lead_lookback] - 1.0
- btc_ret = btc_close.iloc[index] / btc_close.iloc[index - spec.lead_lookback] - 1.0
- current_rsi = eth_rsi[index]
- if current_rsi != current_rsi:
- return "cash"
- hour = pd.to_datetime(eth[index].ts, unit="ms", utc=True).hour
- liquid = volume_ratio.iloc[index] >= spec.min_volume_ratio and hour not in (0, 1, 2, 3)
- calm = eth_vol.iloc[index] <= spec.max_eth_vol and btc_vol.iloc[index] <= spec.max_eth_vol * 0.85
- coupled = corr.iloc[index] >= spec.min_corr
- if not liquid:
- return "cash"
- if coupled and calm and eth_trend >= spec.eth_trend_min and btc_trend >= spec.btc_trend_min and current_rsi <= 8.0:
- return "trend_follow"
- if coupled and btc_ret >= spec.lead_return_min and btc_ret - eth_ret >= spec.lag_gap_min and calm and current_rsi <= 35.0:
- return "btc_lead"
- if abs(eth_trend) < spec.eth_trend_min and ratio_z.iloc[index] <= -spec.ratio_z_entry and calm and current_rsi <= 15.0:
- return "mean_reversion"
- return "cash"
- def run_router_segment(
- *,
- eth: list[explore.Candle],
- btc: list[explore.Candle],
- spec: RouterSpec,
- leverage: int,
- ) -> explore.SegmentResult:
- eth_close = pd.Series([candle.close for candle in eth], dtype=float)
- btc_close = pd.Series([candle.close for candle in btc], dtype=float)
- eth_volume = pd.Series([candle.volume for candle in eth], dtype=float)
- eth_rsi = explore._compute_rsi(eth_close, 2)
- eth_ret = eth_close.pct_change()
- btc_ret = btc_close.pct_change()
- ratio = eth_close / btc_close
- ratio_mean = ratio.rolling(spec.ratio_lookback).mean()
- ratio_std = ratio.rolling(spec.ratio_lookback).std(ddof=0)
- ratio_z = (ratio - ratio_mean) / ratio_std
- eth_sma = eth_close.rolling(spec.trend_sma).mean()
- btc_sma = btc_close.rolling(spec.trend_sma).mean()
- eth_vol = eth_ret.rolling(spec.vol_lookback).std(ddof=0)
- btc_vol = btc_ret.rolling(spec.vol_lookback).std(ddof=0)
- corr = eth_ret.rolling(spec.corr_lookback).corr(btc_ret)
- volume_ratio = eth_volume / eth_volume.rolling(spec.vol_lookback).median()
- warmup = max(spec.trend_sma, spec.vol_lookback, spec.corr_lookback, spec.ratio_lookback, spec.lead_lookback + 1)
- equity = explore.INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_entry: str | None = None
- pending_exit = False
- for index in range(warmup, len(eth)):
- candle = eth[index]
- if pending_exit and position is not None:
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- account_equity=equity,
- candle=candle,
- exit_price=candle.open,
- leverage=leverage,
- )
- wins += 1 if won else 0
- position = None
- pending_exit = False
- if pending_entry is not None and position is None and equity > 0.0:
- position = {
- "side": "long",
- "regime": pending_entry,
- "entry_time": candle.ts,
- "entry_price": candle.open,
- "entry_index": index,
- "margin_used": equity,
- "stop_price": candle.open * (1.0 - spec.stop_loss_pct),
- "take_profit_price": candle.open * (1.0 + spec.take_profit_pct),
- }
- entries.append({"ts": candle.ts, "price": candle.open, "side": "long", "regime": pending_entry})
- pending_entry = None
- current_equity = equity
- if position is not None and candle.low <= float(position["stop_price"]):
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- account_equity=equity,
- candle=candle,
- exit_price=float(position["stop_price"]),
- leverage=leverage,
- )
- wins += 1 if won else 0
- current_equity = equity
- position = None
- if position is not None and candle.high >= float(position["take_profit_price"]):
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- account_equity=equity,
- candle=candle,
- exit_price=float(position["take_profit_price"]),
- leverage=leverage,
- )
- wins += 1 if won else 0
- current_equity = equity
- position = None
- if position is not None:
- current_equity = explore.mark_to_market(
- side="long",
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=leverage,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(eth) - 1 or equity <= 0.0:
- continue
- if position is not None:
- held_bars = index - int(position["entry_index"])
- current_rsi = eth_rsi[index]
- current_ratio_z = ratio_z.iloc[index]
- current_sma = eth_sma.iloc[index]
- regime = str(position["regime"])
- if held_bars >= spec.max_hold_bars:
- pending_exit = True
- elif regime == "trend_follow" and current_rsi == current_rsi and current_rsi >= 55.0:
- pending_exit = True
- elif regime == "trend_follow" and current_sma == current_sma and candle.close < float(current_sma):
- pending_exit = True
- elif regime == "mean_reversion" and current_rsi == current_rsi and current_rsi >= 50.0:
- pending_exit = True
- elif regime == "mean_reversion" and current_ratio_z == current_ratio_z and current_ratio_z >= 0.0:
- pending_exit = True
- elif regime == "btc_lead" and current_rsi == current_rsi and current_rsi >= 60.0:
- pending_exit = True
- continue
- action = regime_action(
- index=index,
- eth=eth,
- btc=btc,
- eth_close=eth_close,
- btc_close=btc_close,
- eth_volume=eth_volume,
- eth_sma=eth_sma,
- btc_sma=btc_sma,
- eth_vol=eth_vol,
- btc_vol=btc_vol,
- corr=corr,
- ratio_z=ratio_z,
- volume_ratio=volume_ratio,
- eth_rsi=eth_rsi,
- spec=spec,
- )
- if action != "cash":
- pending_entry = action
- trade_count = len(trades)
- return explore.SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY,
- win_rate=wins / trade_count if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=eth[warmup:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def specs() -> list[RouterSpec]:
- out: list[RouterSpec] = []
- for trend_sma, max_eth_vol, min_corr, ratio_z_entry in product(
- (96, 192),
- (0.018, 0.024),
- (0.45, 0.60),
- (1.0, 1.5),
- ):
- eth_trend_min = 0.006
- lead_return_min = 0.016
- min_volume_ratio = 0.90
- out.append(
- RouterSpec(
- name=(
- f"router-t{trend_sma}-et{eth_trend_min}-v{max_eth_vol}-c{min_corr}"
- f"-rz{ratio_z_entry}-br{lead_return_min}-liq{min_volume_ratio}"
- ),
- trend_sma=trend_sma,
- vol_lookback=96,
- corr_lookback=192,
- ratio_lookback=192,
- lead_lookback=16,
- eth_trend_min=eth_trend_min,
- btc_trend_min=eth_trend_min * 0.6,
- max_eth_vol=max_eth_vol,
- min_corr=min_corr,
- ratio_z_entry=ratio_z_entry,
- lead_return_min=lead_return_min,
- lag_gap_min=0.006,
- min_volume_ratio=min_volume_ratio,
- stop_loss_pct=0.010,
- take_profit_pct=0.022,
- max_hold_bars=48,
- )
- )
- return out
- def cost_frame(result: explore.SegmentResult, cost: float) -> pd.DataFrame:
- rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": explore.INITIAL_EQUITY}]
- equity = explore.INITIAL_EQUITY
- for trade in result.trades:
- equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0))
- rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity})
- return pd.DataFrame(rows)
- def trade_stats(result: explore.SegmentResult, cost: float) -> dict[str, float]:
- returns = [float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) for trade in result.trades]
- wins = [value for value in returns if value > 0.0]
- losses = [value for value in returns if value < 0.0]
- avg_win = sum(wins) / len(wins) if wins else 0.0
- avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0
- gross_profit = sum(wins)
- gross_loss = abs(sum(losses))
- return {
- "win_rate": len(wins) / len(returns) if returns else 0.0,
- "profit_loss_ratio": avg_win / avg_loss if avg_loss else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- }
- def horizon_rows(name: str, frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- end_time = pd.to_datetime(last_ts, unit="ms", utc=True)
- for label, offset in HORIZONS:
- if offset is None:
- horizon = frame[["ts", "equity"]].copy()
- start_time = pd.Timestamp(horizon["ts"].iloc[0])
- else:
- cutoff = end_time - offset
- before_cutoff = frame[frame["ts"] <= cutoff]
- if len(before_cutoff):
- start_equity = float(before_cutoff["equity"].iloc[-1])
- after_cutoff = frame[frame["ts"] > cutoff]
- horizon = pd.concat(
- [pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after_cutoff[["ts", "equity"]]],
- ignore_index=True,
- )
- start_time = cutoff
- else:
- horizon = frame[["ts", "equity"]].copy()
- start_time = pd.Timestamp(horizon["ts"].iloc[0])
- metrics = explore.annualized_metrics_from_equity(horizon, int(start_time.timestamp() * 1000), last_ts)
- rows.append(
- {
- "name": name,
- "horizon": label,
- "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"),
- "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"),
- **metrics,
- "risk_return_ratio": metrics["net_total_return"] / metrics["net_max_drawdown"] if metrics["net_max_drawdown"] else 0.0,
- }
- )
- return rows
- def monthly_rows(name: str, frame: pd.DataFrame) -> pd.DataFrame:
- series = frame.set_index("ts")["equity"].resample("ME").last().ffill()
- out = pd.DataFrame(
- {
- "name": name,
- "month": series.index.strftime("%Y-%m"),
- "start_equity": series.shift(1).fillna(frame["equity"].iloc[0]).to_numpy(),
- "end_equity": series.to_numpy(),
- }
- )
- out["return"] = out["end_equity"] / out["start_equity"] - 1.0
- return out
- def regime_rows(name: str, result: explore.SegmentResult, cost: float) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- for regime, group in pd.DataFrame(result.trades).groupby("regime") if result.trades else []:
- net_returns = group["return_pct"].astype(float) / 100.0 - cost * group.get("cost_weight", 1.0).astype(float)
- wins = net_returns[net_returns > 0.0]
- losses = net_returns[net_returns < 0.0]
- rows.append(
- {
- "name": name,
- "regime": regime,
- "trades": len(group),
- "net_return_contribution": float(net_returns.sum()),
- "win_rate": float(len(wins) / len(group)) if len(group) else 0.0,
- "profit_factor": float(wins.sum() / abs(losses.sum())) if len(losses) and abs(losses.sum()) > 0.0 else 0.0,
- }
- )
- if not rows:
- rows.append({"name": name, "regime": "none", "trades": 0, "net_return_contribution": 0.0, "win_rate": 0.0, "profit_factor": 0.0})
- return rows
- def markdown_table(frame: pd.DataFrame) -> str:
- columns = list(frame.columns)
- rows = [columns, ["---" for _ in columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_report(command: str, output_files: list[Path], total: pd.DataFrame, horizon: pd.DataFrame, regime: pd.DataFrame, monthly: pd.DataFrame) -> str:
- primary = total[total["cost_model"] == PRIMARY_COST].head(10)
- names = set(primary["name"])
- horizon_top = horizon[(horizon["cost_model"] == PRIMARY_COST) & horizon["name"].isin(names)].copy()
- regime_top = regime[(regime["cost_model"] == PRIMARY_COST) & regime["name"].isin(names)].copy()
- worst_months = monthly[(monthly["cost_model"] == PRIMARY_COST) & monthly["name"].isin(names)].sort_values("return").head(20)
- best = primary.iloc[0] if len(primary) else pd.Series(dtype=object)
- decision = "No router variants produced trades."
- if len(best):
- decision = (
- f"Best maker_taker router `{best['name']}`: annualized={best['net_annualized_return']:.4f}, "
- f"DD={best['net_max_drawdown']:.4f}, Calmar={best['net_calmar']:.4f}, "
- f"win_rate={best['win_rate']:.4f}, profit_factor={best['profit_factor']:.4f}."
- )
- lines = [
- "# ETH regime router variants",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in output_files],
- "",
- "Router actions: trend_follow, mean_reversion, btc_lead, or cash. Regime inputs: BTC/ETH volatility, trend distance from SMA, rolling correlation, ETH/BTC ratio z-score, UTC hour, and ETH volume ratio.",
- f"Costs: {', '.join(COSTS)}. Primary sort uses {PRIMARY_COST}, qualified horizons, Calmar, annualized return, and drawdown.",
- "",
- f"Decision: {decision}",
- "",
- "## Top maker_taker routers",
- "",
- markdown_table(
- primary[
- [
- "name",
- "trades",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "win_rate",
- "profit_loss_ratio",
- "profit_factor",
- "risk_return_ratio",
- "min_horizon_total_return",
- ]
- ]
- ),
- "",
- "## Horizon checks",
- "",
- markdown_table(
- horizon_top[
- [
- "name",
- "horizon",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "risk_return_ratio",
- ]
- ]
- ),
- "",
- "## Regime contribution",
- "",
- markdown_table(regime_top[["name", "regime", "trades", "net_return_contribution", "win_rate", "profit_factor"]]),
- "",
- "## Worst months among top routers",
- "",
- markdown_table(worst_months[["name", "month", "return"]]),
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bar", default=BAR)
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--max-candidates", type=int)
- args = parser.parse_args()
- eth_raw = load_candles(ETH_SYMBOL, args.bar, args.years)
- btc_raw = load_candles(BTC_SYMBOL, args.bar, args.years)
- eth, btc = explore.align_pair_candles(eth_raw, btc_raw)
- if not eth or not btc:
- raise RuntimeError("no aligned ETH/BTC candles")
- candidates = specs()
- if args.max_candidates is not None:
- candidates = candidates[: args.max_candidates]
- total_rows: list[dict[str, object]] = []
- horizon_output: list[dict[str, object]] = []
- monthly_frames: list[pd.DataFrame] = []
- regime_output: list[dict[str, object]] = []
- for index, spec in enumerate(candidates, start=1):
- result = run_router_segment(eth=eth, btc=btc, spec=spec, leverage=explore.LEVERAGE)
- print(f"done {index}/{len(candidates)} {spec.name} trades={result.trade_count}", flush=True)
- for cost_model, cost in COSTS.items():
- frame = cost_frame(result, cost)
- metric_start_ts = int(pd.Timestamp(frame["ts"].iloc[0]).timestamp() * 1000)
- metric_end_ts = int(pd.Timestamp(frame["ts"].iloc[-1]).timestamp() * 1000)
- metrics = explore.annualized_metrics_from_equity(frame, metric_start_ts, metric_end_ts)
- stats = trade_stats(result, cost)
- current_horizons = horizon_rows(spec.name, frame, eth[-1].ts)
- min_horizon_return = min(float(row["net_total_return"]) for row in current_horizons if row["horizon"] != "all")
- total_rows.append(
- {
- "name": spec.name,
- "cost_model": cost_model,
- "symbol": ETH_SYMBOL,
- "signal_symbol": BTC_SYMBOL,
- "bar": args.bar,
- "first_candle": pd.Timestamp(frame["ts"].iloc[0]).strftime("%Y-%m-%d %H:%M"),
- "last_candle": pd.Timestamp(frame["ts"].iloc[-1]).strftime("%Y-%m-%d %H:%M"),
- "years": (metric_end_ts - metric_start_ts) / 86_400_000 / 365,
- "trades": result.trade_count,
- "gross_total_return": result.total_return,
- "gross_max_drawdown_mark_to_market": result.max_drawdown,
- "risk_return_ratio": metrics["net_total_return"] / metrics["net_max_drawdown"] if metrics["net_max_drawdown"] else 0.0,
- "min_horizon_total_return": min_horizon_return,
- **spec.__dict__,
- **metrics,
- **stats,
- }
- )
- for row in current_horizons:
- horizon_output.append({"cost_model": cost_model, **row})
- monthly_frames.append(monthly_rows(spec.name, frame).assign(cost_model=cost_model))
- for row in regime_rows(spec.name, result, cost):
- regime_output.append({"cost_model": cost_model, **row})
- total = pd.DataFrame(total_rows).sort_values(
- ["cost_model", "min_horizon_total_return", "net_calmar", "net_annualized_return", "net_max_drawdown"],
- ascending=[True, False, False, False, True],
- )
- horizon = pd.DataFrame(horizon_output)
- horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True)
- horizon = horizon.sort_values(["cost_model", "name", "horizon"])
- monthly = pd.concat(monthly_frames, ignore_index=True)
- regime = pd.DataFrame(regime_output).sort_values(["cost_model", "name", "net_return_contribution"], ascending=[True, True, False])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- total_path = args.output_dir / f"{PREFIX}-total.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizon.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly.csv"
- regime_path = args.output_dir / f"{PREFIX}-regime-contribution.csv"
- top_path = args.output_dir / f"{PREFIX}-top10.csv"
- json_path = args.output_dir / f"{PREFIX}-summary.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- total.to_csv(total_path, index=False)
- horizon.to_csv(horizon_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- regime.to_csv(regime_path, index=False)
- total[total["cost_model"] == PRIMARY_COST].head(10).to_csv(top_path, index=False)
- summary = {
- "report": PREFIX,
- "command": f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}",
- "primary_cost": PRIMARY_COST,
- "candidate_count": len(candidates),
- "top_maker_taker": total[total["cost_model"] == PRIMARY_COST].head(10).to_dict("records"),
- "output_files": [str(path) for path in [total_path, horizon_path, monthly_path, regime_path, top_path, json_path, report_path]],
- }
- json_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
- report_path.write_text(
- markdown_report(
- command=summary["command"],
- output_files=[total_path, horizon_path, monthly_path, regime_path, top_path, json_path, report_path],
- total=total,
- horizon=horizon,
- regime=regime,
- monthly=monthly,
- ),
- encoding="utf-8",
- )
- print(total[total["cost_model"] == PRIMARY_COST].head(20).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|