from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore OUTPUT_DIR = Path("reports/ultrashort") PREFIX = "short-overlay" YEARS = 10.0 INITIAL_EQUITY = explore.INITIAL_EQUITY LEVERAGE = explore.LEVERAGE PRIMARY_COST = "maker_taker" COSTS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Strategy: family: str symbol: str signal_symbol: str bar: str name: str warmup_bars: int params: dict[str, float | int | str] def load_candles(symbol: str, bar: str, years: float) -> list[Candle]: candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar) if not candles: raise FileNotFoundError(f"missing cached candles for {symbol} {bar}") requested = explore.history_bars_for_years(bar, years) return candles[-requested:] if len(candles) > requested else candles def frame_from_candles(candles: list[Candle], prefix: str = "") -> pd.DataFrame: return pd.DataFrame( { f"{prefix}ts": [candle.ts for candle in candles], f"{prefix}open": [candle.open for candle in candles], f"{prefix}high": [candle.high for candle in candles], f"{prefix}low": [candle.low for candle in candles], f"{prefix}close": [candle.close for candle in candles], f"{prefix}volume": [candle.volume for candle in candles], } ) def aligned_frame(symbol_candles: list[Candle], signal_candles: list[Candle] | None = None) -> pd.DataFrame: base = frame_from_candles(symbol_candles) if signal_candles is None: return base signal = frame_from_candles(signal_candles, "sig_").rename(columns={"sig_ts": "ts"}) return base.merge(signal, on="ts", how="inner") def rsi(series: pd.Series, length: int) -> pd.Series: diff = series.diff() gain = diff.clip(lower=0.0).rolling(length).mean() loss = (-diff.clip(upper=0.0)).rolling(length).mean() rs = gain / loss return 100.0 - 100.0 / (1.0 + rs) def true_range(frame: pd.DataFrame) -> pd.Series: prev_close = frame["close"].shift(1) return pd.concat( [ frame["high"] - frame["low"], (frame["high"] - prev_close).abs(), (frame["low"] - prev_close).abs(), ], axis=1, ).max(axis=1) def close_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, ) -> tuple[float, bool]: exit_equity = trade_equity( side="short", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - float(position["margin_used"]) trades.append( { "side": "Short", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / float(position["margin_used"]) * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "short"}) return exit_equity, pnl > 0.0 def run_short_segment(candles: list[Candle], entry_signal: pd.Series, exit_signal: pd.Series, max_hold_bars: int, stop_loss_pct: float, take_profit_pct: float) -> SegmentResult: equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False warmup_bars = max(int(entry_signal.first_valid_index() or 0), 1) for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_trade(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += 1 if won else 0 position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = { "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open * (1.0 + stop_loss_pct), "take_profit_price": candle.open * (1.0 - take_profit_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": "short"}) pending_entry = False current_equity = equity if position is not None: stop_hit = candle.high >= float(position["stop_price"]) take_hit = candle.low <= float(position["take_profit_price"]) if stop_hit or take_hit: exit_price = float(position["stop_price"] if stop_hit else position["take_profit_price"]) equity, won = close_trade(trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side="short", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) current_equity = max(0.0, current_equity) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue if position is not None: held = index - int(position["entry_index"]) if bool(exit_signal.iloc[index]) or held >= max_hold_bars: pending_exit = True continue if bool(entry_signal.iloc[index]): pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def strategy_signals(strategy: Strategy, frame: pd.DataFrame) -> tuple[pd.Series, pd.Series]: close = frame["close"] open_ = frame["open"] high = frame["high"] low = frame["low"] p = strategy.params trend = close.rolling(int(p["trend"])).mean() fast = close.rolling(int(p.get("fast", max(2, int(p["trend"]) // 4)))).mean() atr = true_range(frame).rolling(int(p.get("atr", 48))).mean() below_trend = close < trend if strategy.family == "trend_bounce_fail": pullback = high >= fast * (1.0 + float(p["pullback"])) if float(p["pullback"]) >= 0.0 else high >= fast * (1.0 + float(p["pullback"])) reject = close < open_ entry = below_trend & pullback & reject & (close < fast) exit_ = (close > fast) | (close > trend) elif strategy.family == "crash_continuation": ret = close / close.shift(int(p["lookback"])) - 1.0 prior_low = low.shift(1).rolling(int(p["break_lookback"])).min() entry = below_trend & (ret <= -float(p["drop"])) & (close <= prior_low * (1.0 + float(p["break_buffer"]))) exit_ = (close > close.rolling(int(p["exit_fast"])).mean()) | (ret > 0.0) elif strategy.family == "vwap_deviation_fade": typical = (high + low + close) / 3.0 vwap = (typical * frame["volume"]).rolling(int(p["vwap"])).sum() / frame["volume"].rolling(int(p["vwap"])).sum() entry = below_trend & (high >= vwap * (1.0 + float(p["deviation"]))) & (close < vwap) exit_ = (close <= vwap * (1.0 - float(p["exit_deviation"]))) | (close > trend) elif strategy.family == "rsi_overbought_downtrend": value = rsi(close, int(p["rsi"])) entry = below_trend & (value >= float(p["entry_rsi"])) & (close < open_) exit_ = (value <= float(p["exit_rsi"])) | (close > trend) elif strategy.family == "bb_upper_rejection": mid = close.rolling(int(p["bb"])).mean() std = close.rolling(int(p["bb"])).std(ddof=0) upper = mid + std * float(p["std"]) entry = below_trend & (high >= upper) & (close < upper) & (close < open_) exit_ = (close <= mid) | (close > trend) elif strategy.family == "eth_by_btc_down": sig_close = frame["sig_close"] sig_trend = sig_close.rolling(int(p["btc_trend"])).mean() sig_ret = sig_close / sig_close.shift(int(p["btc_lookback"])) - 1.0 eth_ret = close / close.shift(int(p["eth_lookback"])) - 1.0 entry = (sig_close < sig_trend) & (sig_ret <= -float(p["btc_drop"])) & (eth_ret <= float(p["eth_max_rebound"])) exit_ = (sig_ret >= 0.0) | (close > fast) else: raise ValueError(f"unknown family {strategy.family}") enough_range = atr.notna() & (atr > 0.0) return (entry & enough_range).fillna(False), exit_.fillna(False) def make_strategy(family: str, symbol: str, signal_symbol: str, bar: str, params: dict[str, float | int | str]) -> Strategy: parts = [family, symbol.split("-")[0].lower(), bar] parts.extend(f"{key}{value}" for key, value in params.items()) warmup = max(int(value) for key, value in params.items() if key.endswith(("trend", "lookback", "fast", "atr", "vwap", "rsi", "bb")) and isinstance(value, int)) return Strategy(family, symbol, signal_symbol, bar, "-".join(parts), warmup, params) def build_strategies() -> list[Strategy]: strategies: list[Strategy] = [] for symbol in ("BTC-USDT-SWAP", "ETH-USDT-SWAP"): for bar in ("15m",): strategies.extend( make_strategy( "trend_bounce_fail", symbol, symbol, bar, {"trend": trend, "fast": fast, "atr": 48, "pullback": pullback, "stop": stop, "take": take, "hold": hold}, ) for trend in (96,) for fast in (16,) for pullback in (0.003,) for stop in (0.006, 0.009) for take in (0.010,) for hold in (12,) ) strategies.extend( make_strategy( "crash_continuation", symbol, symbol, bar, { "trend": trend, "lookback": lookback, "break_lookback": break_lookback, "break_buffer": 0.002, "exit_fast": 8, "atr": 48, "drop": drop, "stop": stop, "take": take, "hold": hold, }, ) for trend in (96, 192) if bar == "5m" or trend == 96 for lookback in (3, 6) for break_lookback in (12,) for drop in (0.010, 0.016) for stop in (0.006, 0.010) for take in (0.010,) for hold in (8,) ) strategies.extend( make_strategy( "vwap_deviation_fade", symbol, symbol, bar, {"trend": trend, "vwap": vwap, "atr": 48, "deviation": dev, "exit_deviation": 0.001, "stop": stop, "take": take, "hold": hold}, ) for trend in (96,) for vwap in (48,) for dev in (0.004, 0.007) for stop in (0.006,) for take in (0.010,) for hold in (12,) ) strategies.extend( make_strategy( "rsi_overbought_downtrend", symbol, symbol, bar, {"trend": trend, "fast": 16, "atr": 48, "rsi": rsi_length, "entry_rsi": entry_rsi, "exit_rsi": exit_rsi, "stop": stop, "take": take, "hold": hold}, ) for trend in (96,) for rsi_length in (14,) for entry_rsi in (60.0, 70.0) for exit_rsi in (45.0,) for stop in (0.006,) for take in (0.010,) for hold in (12,) ) strategies.extend( make_strategy( "bb_upper_rejection", symbol, symbol, bar, {"trend": trend, "bb": bb, "atr": 48, "std": std, "stop": stop, "take": take, "hold": hold}, ) for trend in (96,) for bb in (20, 48) for std in (1.5,) for stop in (0.006,) for take in (0.010,) for hold in (12,) ) for bar in ("15m",): strategies.extend( make_strategy( "eth_by_btc_down", "ETH-USDT-SWAP", "BTC-USDT-SWAP", bar, { "trend": 96, "fast": 16, "atr": 48, "btc_trend": btc_trend, "btc_lookback": btc_lookback, "eth_lookback": eth_lookback, "btc_drop": btc_drop, "eth_max_rebound": eth_max_rebound, "stop": stop, "take": take, "hold": hold, }, ) for btc_trend in (96,) for btc_lookback in (3, 6, 12) for eth_lookback in (3,) for btc_drop in (0.010, 0.015) for eth_max_rebound in (-0.002,) for stop in (0.006, 0.010) for take in (0.010,) for hold in (8,) ) return strategies def run_strategy(strategy: Strategy, data: dict[tuple[str, str], list[Candle]]) -> SegmentResult: symbol_candles = data[(strategy.symbol, strategy.bar)] signal_candles = None if strategy.signal_symbol == strategy.symbol else data[(strategy.signal_symbol, strategy.bar)] frame = aligned_frame(symbol_candles, signal_candles) candles = [ Candle(strategy.symbol, int(row.ts), float(row.open), float(row.high), float(row.low), float(row.close), float(row.volume)) for row in frame.itertuples(index=False) ] entry, exit_ = strategy_signals(strategy, frame) return run_short_segment(candles, entry, exit_, int(strategy.params["hold"]), float(strategy.params["stop"]), float(strategy.params["take"])) def daily_equity(frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() series = pd.concat([pd.Series([INITIAL_EQUITY], index=[start.normalize()]), series]).sort_index() series = series.groupby(level=0).last() index = pd.date_range(start.normalize(), end.normalize(), freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).ffill() def cost_adjusted_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame: frame = pd.DataFrame(result.equity_curve) if frame.empty: return pd.DataFrame({"ts": pd.Series(dtype="datetime64[ns, UTC]"), "equity": pd.Series(dtype=float)}) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) adjustments: dict[pd.Timestamp, float] = {} for trade in result.trades: exit_time = pd.to_datetime(str(trade["exit_time"]), utc=True) gross_factor = 1.0 + float(trade["return_pct"]) / 100.0 net_factor = gross_factor - cost if gross_factor <= 0.0 or net_factor <= 0.0: factor = 0.0 else: factor = net_factor / gross_factor adjustments[exit_time] = adjustments.get(exit_time, 1.0) * factor if adjustments: factor_frame = pd.DataFrame({"ts": list(adjustments.keys()), "factor": list(adjustments.values())}).sort_values("ts") frame = frame.merge(factor_frame, on="ts", how="left") frame["factor"] = frame["factor"].fillna(1.0).cumprod() frame["equity"] = frame["equity"] * frame["factor"] return frame[["ts", "equity"]] def metrics_from_daily_equity(series: pd.Series) -> dict[str, float]: series = series.clip(lower=0.0) years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 if float(series.iloc[0]) <= 0.0: total_return = -1.0 else: total_return = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 max_drawdown = explore.max_drawdown_from_equity([float(value) for value in series]) returns = series.where(series > 0.0).pct_change(fill_method=None).dropna() daily_std = float(returns.std(ddof=1)) if len(returns) > 1 else 0.0 risk_reward = float(returns.mean()) / daily_std * (365**0.5) if daily_std else 0.0 return { "net_total_return": total_return, "net_annualized_return": annualized_return, "net_max_drawdown": max_drawdown, "net_calmar": annualized_return / max_drawdown if max_drawdown else 0.0, "risk_reward_ratio": risk_reward, } def monthly_rows(name: str, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "name": name, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def trade_stats_for_window(result: SegmentResult, cost: float, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float | int]: returns: list[float] = [] for trade in result.trades: exit_time = pd.to_datetime(str(trade["exit_time"]), utc=True) if start <= exit_time <= end: returns.append(float(trade["return_pct"]) / 100.0 - cost) wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss_abs = abs(sum(losses) / len(losses)) if losses else 0.0 gross_profit = sum(wins) gross_loss_abs = abs(sum(losses)) months = max((end - start).days / 30.4375, 1.0) return { "trades": len(returns), "trades_per_month": len(returns) / months, "win_rate": len(wins) / len(returns) if returns else 0.0, "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, } def horizon_rows(name: str, result: SegmentResult, cost: float, series: pd.Series) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = series.index[-1] for label, offset in HORIZONS: horizon = series if offset is None else series[series.index >= end_time - offset] if len(horizon) < 2: horizon = series rows.append( { "name": name, "horizon": label, "horizon_start": horizon.index[0].strftime("%Y-%m-%d"), "horizon_end": horizon.index[-1].strftime("%Y-%m-%d"), **metrics_from_daily_equity(horizon), **trade_stats_for_window(result, cost, horizon.index[0], horizon.index[-1]), } ) return rows def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def markdown_report(command: str, paths: list[Path], totals: pd.DataFrame, horizons: pd.DataFrame, monthly_summary: pd.DataFrame, worst_months: pd.DataFrame, qualified: pd.DataFrame) -> str: primary = totals[totals["cost_model"] == PRIMARY_COST] top = qualified.head(10) if len(qualified) else primary.head(10) top_name = str(top.iloc[0]["name"]) if len(top) else "" top_horizon = horizons[(horizons["cost_model"] == PRIMARY_COST) & (horizons["name"] == top_name)] lines = [ "# BTC/ETH Short Overlay Search", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: cached BTC/ETH perpetual candles only. All candidates are short-only overlays; no live OKX calls and no order submission.", "Costs: maker_taker=0.0021 and taker_taker=0.0030 roundtrip on margin at 3x.", "Candidate families: trend bounce failure, crash continuation, VWAP deviation fade, RSI overbought under downtrend, BB upper rejection, and ETH short by BTC downside.", "", f"Qualified maker_taker candidates with >=8 trades/month and positive 1y/6m/3m: {len(qualified)}.", "", "## Top qualified or fallback candidates", "", markdown_table( top[ [ "name", "family", "symbol", "signal_symbol", "bar", "trades", "trades_per_month", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "worst_month_return", ] ] ), "", "## Horizon metrics for top candidate", "", markdown_table( top_horizon[ [ "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades_per_month", "profit_factor", "risk_reward_ratio", ] ] ), "", "## Monthly summary", "", markdown_table(monthly_summary.head(20)), "", "## Worst months", "", markdown_table(worst_months.head(20)), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--prefix", default=PREFIX) args = parser.parse_args() strategies = build_strategies() bars = sorted({strategy.bar for strategy in strategies}) data = { (symbol, bar): load_candles(symbol, bar, args.years) for bar in bars for symbol in ("BTC-USDT-SWAP", "ETH-USDT-SWAP") } results: dict[str, tuple[Strategy, SegmentResult]] = {} for index, strategy in enumerate(strategies, start=1): results[strategy.name] = (strategy, run_strategy(strategy, data)) print(f"done {index}/{len(strategies)} {strategy.name}", flush=True) start = max(pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True) for _, result in results.values()) end = min(pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True) for _, result in results.values()) total_rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] for name, (strategy, result) in results.items(): for cost_model, cost_value in COSTS.items(): frame = cost_adjusted_equity_frame(result, cost_value) daily = daily_equity(frame, start, end) monthly = monthly_rows(name, daily) worst = monthly.loc[monthly["return"].idxmin()] stats = trade_stats_for_window(result, cost_value, start, end) total_rows.append( { "name": name, "cost_model": cost_model, "roundtrip_cost_on_margin": cost_value, "family": strategy.family, "symbol": strategy.symbol, "signal_symbol": strategy.signal_symbol, "bar": strategy.bar, "first_candle": start.strftime("%Y-%m-%d %H:%M"), "last_candle": end.strftime("%Y-%m-%d %H:%M"), "years": (end - start).total_seconds() / 86_400 / 365, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month": str(worst["month"]), "worst_month_return": float(worst["return"]), **stats, **metrics_from_daily_equity(daily), **strategy.params, } ) for row in horizon_rows(name, result, cost_value, daily): horizon_output.append({"cost_model": cost_model, "family": strategy.family, "symbol": strategy.symbol, "signal_symbol": strategy.signal_symbol, "bar": strategy.bar, **row}) monthly_frames.append(monthly.assign(cost_model=cost_model, family=strategy.family, symbol=strategy.symbol, signal_symbol=strategy.signal_symbol, bar=strategy.bar)) totals = pd.DataFrame(total_rows).sort_values( ["cost_model", "net_calmar", "net_annualized_return", "trades_per_month"], ascending=[True, False, False, False], ) horizons = pd.DataFrame(horizon_output).sort_values(["cost_model", "name", "horizon"]) monthly_all = pd.concat(monthly_frames, ignore_index=True) monthly_summary = ( monthly_all[monthly_all["cost_model"] == PRIMARY_COST] .groupby(["name", "family", "symbol", "signal_symbol", "bar"], as_index=False) .agg( positive_months=("return", lambda values: int((values > 0.0).sum())), negative_months=("return", lambda values: int((values < 0.0).sum())), avg_month_return=("return", "mean"), worst_month_return=("return", "min"), ) .sort_values(["avg_month_return", "worst_month_return"], ascending=False) ) worst_months = monthly_all.sort_values("return").head(50) primary_horizon = horizons[horizons["cost_model"] == PRIMARY_COST] recent = primary_horizon[primary_horizon["horizon"].isin(("1y", "6m", "3m"))].pivot_table(index="name", columns="horizon", values="net_total_return", aggfunc="min") recent_positive = set(recent[(recent.reindex(columns=["1y", "6m", "3m"]) > 0.0).all(axis=1)].index) qualified = totals[ (totals["cost_model"] == PRIMARY_COST) & (totals["trades_per_month"] >= 8.0) & (totals["name"].isin(recent_positive)) & (totals["net_total_return"] > -1.0) & totals["net_total_return"].notna() & totals["net_calmar"].notna() & (totals["net_max_drawdown"] <= 0.65) ].sort_values(["net_calmar", "net_annualized_return", "trades_per_month"], ascending=[False, False, False]) args.output_dir.mkdir(parents=True, exist_ok=True) total_path = args.output_dir / f"{args.prefix}-totals.csv" horizon_path = args.output_dir / f"{args.prefix}-horizons.csv" monthly_path = args.output_dir / f"{args.prefix}-monthly.csv" qualified_path = args.output_dir / f"{args.prefix}-qualified.csv" summary_path = args.output_dir / f"{args.prefix}-summary.json" report_path = args.output_dir / f"{args.prefix}-report.md" totals.to_csv(total_path, index=False) horizons.to_csv(horizon_path, index=False) monthly_all.to_csv(monthly_path, index=False) qualified.to_csv(qualified_path, index=False) summary = { "years_requested": args.years, "strategy_count": len(strategies), "qualified_count": int(len(qualified)), "top_name": str((qualified if len(qualified) else totals[totals["cost_model"] == PRIMARY_COST]).iloc[0]["name"]), "output_files": [str(path) for path in (total_path, horizon_path, monthly_path, qualified_path, summary_path, report_path)], } summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") report_path.write_text( markdown_report( " ".join(sys.argv), [total_path, horizon_path, monthly_path, qualified_path, summary_path, report_path], totals, horizons, monthly_summary, worst_months, qualified, ), encoding="utf-8", ) print(f"wrote {report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())