from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from itertools import combinations from pathlib import Path from typing import Callable import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.bbmr_report import BBMRConfig, run_bbmr_segment from okx_codex_trader.bbsb_report import BBSBConfig, run_bbsb_segment from okx_codex_trader.models import Candle from scripts import explore_ultrashort as explore from scripts import search_eth_btc_nextgen_variants as nextgen OUTPUT_DIR = Path("reports/ultrashort") PREFIX = "high-frequency-portfolio" YEARS = 10.0 COST_MODELS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } PRIMARY_COST_MODEL = "maker_taker" MIN_TRADES_PER_MONTH = 15.0 MAX_DRAWDOWN = 0.20 PRIMARY_SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class LegSpec: key: str symbol: str family: str bar: str pair: bool warmup_bars: int run: Callable[[dict[tuple[str, str], list[Candle]]], explore.SegmentResult] @dataclass(frozen=True) class LegReturn: leg: str symbol: str family: str exit_time: pd.Timestamp value: float def load_candles(symbol: str, bar: str, years: float) -> list[Candle]: candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar) if not candles: raise FileNotFoundError(f"missing cached candles for {symbol} {bar}") requested = explore.history_bars_for_years(bar, years) return candles[-requested:] if len(candles) > requested else candles def single_leg(symbol: str, family: str, bar: str, candidate: explore.Candidate) -> LegSpec: key = f"{symbol}:{family}:{bar}:{candidate.name}" return LegSpec( key=key, symbol=symbol, family=family, bar=bar, pair=False, warmup_bars=candidate.warmup_bars, run=lambda data, symbol=symbol, bar=bar, candidate=candidate: candidate.run( candles=data[(symbol, bar)], leverage=explore.LEVERAGE, warmup_bars=candidate.warmup_bars, ), ) def pair_leg(family: str, bar: str, candidate: explore.PairCandidate) -> LegSpec: key = f"ETH-USDT-SWAP:{family}:{bar}:{candidate.name}" return LegSpec( key=key, symbol="ETH-USDT-SWAP", family=family, bar=bar, pair=True, warmup_bars=candidate.warmup_bars, run=lambda data, bar=bar, candidate=candidate: run_pair_candidate(candidate, data, bar), ) def run_pair_candidate( candidate: explore.PairCandidate, data: dict[tuple[str, str], list[Candle]], bar: str, ) -> explore.SegmentResult: eth, btc = explore.align_pair_candles(data[("ETH-USDT-SWAP", bar)], data[("BTC-USDT-SWAP", bar)]) return candidate.run( eth_candles=eth, btc_candles=btc, leverage=explore.LEVERAGE, warmup_bars=candidate.warmup_bars, ) def build_single_symbol_candidates() -> list[tuple[str, explore.Candidate]]: candidates: list[tuple[str, explore.Candidate]] = [ ( "bbmr", explore.Candidate( "bbmr-default", 69, lambda candles, leverage, warmup_bars: run_bbmr_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=BBMRConfig(), ), ), ), ] for trend in (30, 50): for long_threshold, short_threshold in ((8.0, 92.0), (12.0, 88.0)): candidates.append(("rsi", explore.build_rsi2_side_candidate(trend, long_threshold, short_threshold, 50.0, "both"))) for fast, slow in ((8, 21), (13, 34)): candidates.append(("ma", explore.build_ma_cross_candidate(fast, slow, "both"))) for window in (24, 48): candidates.append( ( "vwap", explore.Candidate( f"vwap-revert-w{window}-z1.5-sl0.006", window * 2, lambda candles, leverage, warmup_bars, window=window: explore.run_vwap_reversion_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, window=window, entry_z=1.5, exit_z=0.2, stop_loss_pct=0.006, ), ), ) ) candidates.append( ( "bbsb", explore.Candidate( "bbsb-default", 69, lambda candles, leverage, warmup_bars: run_bbsb_segment( candles=candles, leverage=leverage, warmup_bars=warmup_bars, config=BBSBConfig(), ), ), ) ) return candidates def build_legs() -> list[LegSpec]: legs: list[LegSpec] = [] single_candidates = build_single_symbol_candidates() for symbol in PRIMARY_SYMBOLS: for family, candidate in single_candidates: legs.append(single_leg(symbol, family, "15m", candidate)) nextgen_keep = { "btc_trend_eth_rsi:15m:eth-btc-rsi-filter-et50-l3.0-x55.0-bt480-bm240-br0.0", "btc_shock_guard_eth_rsi:15m:eth-btc-shock-filter-et50-l3.0-x55.0-bt480-bm240-br0.01-sw96-sv0.01-sd0.05", "btc_lead_eth_lag:5m:btc-lead-eth-lag-lb16-br0.012-gap0.006-mh8-sl0.006-tp0.018", "btc_lead_eth_lag:5m:btc-lead-eth-lag-lb16-br0.012-gap0.006-mh32-sl0.006-tp0.018", "btc_lead_eth_lag:15m:btc-lead-eth-lag-lb8-br0.018-gap0.006-mh8-sl0.006-tp0.018", "btc_lead_eth_lag:15m:btc-lead-eth-lag-lb16-br0.024-gap0.006-mh32-sl0.006-tp0.018", } for strategy in nextgen.build_strategies(): key = f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}" if key in nextgen_keep: legs.append(pair_leg(f"nextgen_{strategy.family}", strategy.bar, strategy.candidate)) return legs def cost_trade_returns(spec: LegSpec, result: explore.SegmentResult, roundtrip_cost: float) -> list[LegReturn]: rows: list[LegReturn] = [] for trade in result.trades: value = float(trade["return_pct"]) / 100.0 - roundtrip_cost * float(trade.get("cost_weight", 1.0)) rows.append( LegReturn( leg=spec.key, symbol=spec.symbol, family=spec.family, exit_time=pd.to_datetime(str(trade["exit_time"]), utc=True), value=value, ) ) return rows def returns_to_daily_equity( name: str, returns: list[LegReturn], start: pd.Timestamp, end: pd.Timestamp, ) -> pd.Series: index = pd.date_range(start.normalize(), end.normalize(), freq="1D", tz="UTC") if not returns: series = pd.Series(explore.INITIAL_EQUITY, index=index, name=name, dtype=float) return series frame = pd.DataFrame({"date": [row.exit_time.normalize() for row in returns], "return": [row.value for row in returns]}) daily_returns = frame.groupby("date")["return"].sum().reindex(index, fill_value=0.0) equity = explore.INITIAL_EQUITY * (1.0 + daily_returns).cumprod() equity.iloc[0] = explore.INITIAL_EQUITY equity.name = name return equity def metrics_from_daily(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total_return = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 max_drawdown = explore.max_drawdown_from_equity([float(value) for value in series]) daily_returns = series.pct_change().dropna() daily_std = float(daily_returns.std(ddof=1)) if len(daily_returns) > 1 else 0.0 risk_reward = float(daily_returns.mean()) / daily_std * (365**0.5) if daily_std else 0.0 return { "total_return": total_return, "annualized_return": annualized_return, "max_drawdown": max_drawdown, "calmar": annualized_return / max_drawdown if max_drawdown else 0.0, "risk_reward_ratio": risk_reward, } def trade_stats(returns: list[LegReturn], start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float]: values = [row.value for row in returns if start <= row.exit_time <= end] wins = [value for value in values if value > 0.0] losses = [value for value in values if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss_abs = abs(sum(losses) / len(losses)) if losses else 0.0 gross_profit = sum(wins) gross_loss_abs = abs(sum(losses)) months = max((end - start).total_seconds() / 86_400 / 30.4375, 1e-9) return { "trades": len(values), "trades_per_month": len(values) / months, "win_rate": len(wins) / len(values) if values else 0.0, "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, } def monthly_rows(name: str, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "portfolio": name, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def monthly_stability(monthly: pd.DataFrame) -> dict[str, float | int]: positive_months = int((monthly["return"] > 0.0).sum()) if len(monthly) else 0 negative_months = int((monthly["return"] < 0.0).sum()) if len(monthly) else 0 return { "months": len(monthly), "positive_month_rate": positive_months / len(monthly) if len(monthly) else 0.0, "negative_months": negative_months, } def horizon_metrics( portfolio: str, cost_model: str, series: pd.Series, returns: list[LegReturn], monthly: pd.DataFrame, ) -> list[dict[str, object]]: output: list[dict[str, object]] = [] end = series.index[-1] for label, offset in HORIZONS: horizon = series if offset is None else series[series.index >= end - offset] if len(horizon) < 2: horizon = series start = horizon.index[0] horizon_monthly = monthly[monthly["month"] >= start.strftime("%Y-%m")] worst = horizon_monthly.sort_values("return").iloc[0] if len(horizon_monthly) else None output.append( { "portfolio": portfolio, "cost_model": cost_model, "horizon": label, "horizon_start": start.strftime("%Y-%m-%d"), "horizon_end": end.strftime("%Y-%m-%d"), "worst_month": "" if worst is None else str(worst["month"]), "worst_month_return": 0.0 if worst is None else float(worst["return"]), **metrics_from_daily(horizon), **trade_stats(returns, start, end), **monthly_stability(horizon_monthly), } ) return output def split_weighted_returns(legs: tuple[str, ...], weights: pd.Series, leg_returns: dict[str, list[LegReturn]]) -> list[LegReturn]: rows: list[LegReturn] = [] for leg in legs: weight = float(weights[leg]) for row in leg_returns[leg]: rows.append(LegReturn(row.leg, row.symbol, row.family, row.exit_time, row.value * weight)) return rows def leg_contribution_rows( portfolio: str, cost_model: str, legs: tuple[str, ...], weights: pd.Series, leg_returns: dict[str, list[LegReturn]], end: pd.Timestamp, ) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for label, offset in HORIZONS: start = pd.Timestamp.min.tz_localize("UTC") if offset is None else end - offset total = 0.0 by_leg: dict[str, float] = {} for leg in legs: contribution = sum(row.value * float(weights[leg]) for row in leg_returns[leg] if row.exit_time >= start) by_leg[leg] = contribution total += contribution for leg in legs: rows.append( { "portfolio": portfolio, "cost_model": cost_model, "horizon": label, "leg": leg, "weight": float(weights[leg]), "contribution_return_sum": by_leg[leg], "contribution_share": by_leg[leg] / total if total else 0.0, } ) return rows def add_cost_columns(row: dict[str, object], cost_model: str, roundtrip_cost: float) -> None: row["cost_model"] = cost_model row["roundtrip_cost_on_margin"] = roundtrip_cost row["net_total_return"] = row["total_return"] row["net_annualized_return"] = row["annualized_return"] row["net_max_drawdown"] = row["max_drawdown"] row["net_calmar"] = row["calmar"] def risk_qualified(frame: pd.DataFrame) -> pd.DataFrame: full = frame[frame["horizon"] == "full"].copy() recent = full for horizon in ("3y", "1y", "6m", "3m"): part = frame[frame["horizon"] == horizon][["cost_model", "portfolio", "total_return", "max_drawdown", "calmar"]].rename( columns={"total_return": f"ret_{horizon}", "max_drawdown": f"dd_{horizon}", "calmar": f"calmar_{horizon}"} ) recent = recent.merge(part, on=["cost_model", "portfolio"], how="inner") qualified = recent[ (recent["trades_per_month"] >= MIN_TRADES_PER_MONTH) & (recent["max_drawdown"] < MAX_DRAWDOWN) & (recent["ret_1y"] > 0.0) & (recent["ret_6m"] > 0.0) & (recent["ret_3m"] > 0.0) ].copy() qualified["risk_rank"] = ( qualified["calmar"] * 2.0 + qualified["annualized_return"] + qualified["ret_1y"] * 0.5 - qualified["max_drawdown"] ) return qualified.sort_values(["cost_model", "risk_rank"], ascending=[True, False]) def robust_survivors(frame: pd.DataFrame) -> pd.DataFrame: full = frame[frame["horizon"] == "full"].copy() recent = full for horizon in ("3y", "1y", "6m", "3m"): part = frame[frame["horizon"] == horizon][["cost_model", "portfolio", "total_return", "calmar"]].rename( columns={"total_return": f"ret_{horizon}", "calmar": f"calmar_{horizon}"} ) recent = recent.merge(part, on=["cost_model", "portfolio"], how="inner") return recent[ (recent["trades_per_month"] >= MIN_TRADES_PER_MONTH) & (recent["total_return"] > 0.0) & (recent["calmar"] > 0.0) & (recent["ret_3y"] > 0.0) & (recent["ret_1y"] > 0.0) & (recent["ret_6m"] > 0.0) & (recent["ret_3m"] > 0.0) & (recent["calmar_3y"] > 0.0) & (recent["calmar_1y"] > 0.0) & (recent["calmar_6m"] > 0.0) & (recent["calmar_3m"] > 0.0) ].copy() def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report( command: str, output_files: list[Path], portfolio_total: pd.DataFrame, qualified: pd.DataFrame, robust: pd.DataFrame, horizon: pd.DataFrame, leg_contrib: pd.DataFrame, ) -> str: primary_total = portfolio_total[portfolio_total["cost_model"] == PRIMARY_COST_MODEL] primary_qualified = qualified[qualified["cost_model"] == PRIMARY_COST_MODEL] stress_qualified = qualified[qualified["cost_model"] == "taker_taker"] top = primary_total.head(10) top_qualified = primary_qualified.head(10) best_name = str(top.iloc[0]["portfolio"]) if len(top) else "" best_horizon = horizon[(horizon["cost_model"] == PRIMARY_COST_MODEL) & (horizon["portfolio"] == best_name)] best_contrib = leg_contrib[(leg_contrib["cost_model"] == PRIMARY_COST_MODEL) & (leg_contrib["portfolio"] == best_name)] lines = [ "# Cross-symbol high-frequency portfolio ranking", "", f"Run command: `{command}`", "", "No exchange API, order placement, or live trading path is used. The search reads local BTC/ETH candle data and existing local strategy modules only.", "Primary ranking uses maker/taker roundtrip margin cost 0.21%. Taker/taker stress uses 0.30%. Funding and slippage remain excluded.", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Selection target: at least 15 trades/month, max DD below 20%, and positive 1y/6m/3m return.", f"Strict robust survivors with positive full/3y/1y/6m/3m net return and Calmar: {len(robust)}.", "", "## Top portfolios: maker/taker", "", markdown_table( top[ [ "cost_model", "portfolio", "mode", "leg_count", "symbols", "families", "total_return", "annualized_return", "max_drawdown", "calmar", "risk_reward_ratio", "trades_per_month", "worst_month_return", "positive_month_rate", "recent_positive", "qualified", ] ] ), "", "## Qualified portfolios: maker/taker", "", markdown_table( top_qualified[ [ "cost_model", "portfolio", "mode", "leg_count", "symbols", "families", "total_return", "annualized_return", "max_drawdown", "calmar", "trades_per_month", "worst_month_return", "positive_month_rate", "ret_3y", "ret_1y", "ret_6m", "ret_3m", ] ] ) if len(top_qualified) else "No portfolio cleared all target filters.", "", "## Taker/taker stress survivors", "", markdown_table( stress_qualified.head(10)[ [ "cost_model", "portfolio", "mode", "leg_count", "total_return", "annualized_return", "max_drawdown", "calmar", "trades_per_month", "positive_month_rate", "ret_3y", "ret_1y", "ret_6m", "ret_3m", ] ] ) if len(stress_qualified) else "No taker/taker portfolio cleared all target filters.", "", "## Best portfolio horizons", "", markdown_table( best_horizon[ [ "horizon", "total_return", "annualized_return", "max_drawdown", "calmar", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "trades", "trades_per_month", "worst_month", "worst_month_return", "positive_month_rate", "negative_months", ] ] ), "", "## Best portfolio leg contribution", "", markdown_table(best_contrib[best_contrib["horizon"].isin(["full", "1y", "6m", "3m"])]), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-legs-per-symbol", type=int, default=12) parser.add_argument("--max-leg-count", type=int, default=4) args = parser.parse_args() legs = build_legs() bars = sorted({leg.bar for leg in legs}) data = {(symbol, bar): load_candles(symbol, bar, args.years) for symbol in PRIMARY_SYMBOLS for bar in bars} leg_returns: dict[str, list[LegReturn]] = {} leg_rows: list[dict[str, object]] = [] daily: dict[str, pd.Series] = {} for index, leg in enumerate(legs, start=1): result = leg.run(data) start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True) end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True) for cost_model, roundtrip_cost in COST_MODELS.items(): cost_key = f"{cost_model}:{leg.key}" returns = cost_trade_returns(leg, result, roundtrip_cost) leg_returns[cost_key] = returns series = returns_to_daily_equity(cost_key, returns, start, end) monthly = monthly_rows(cost_key, series) worst = float(monthly["return"].min()) if len(monthly) else 0.0 row = { "leg": leg.key, "symbol": leg.symbol, "family": leg.family, "bar": leg.bar, "pair_signal": leg.pair, "start": start.strftime("%Y-%m-%d %H:%M"), "end": end.strftime("%Y-%m-%d %H:%M"), "trades": len(returns), "worst_month_return": worst, **metrics_from_daily(series), **trade_stats(returns, series.index[0], series.index[-1]), **monthly_stability(monthly), } add_cost_columns(row, cost_model, roundtrip_cost) leg_rows.append(row) daily[cost_key] = series print(f"done leg {index}/{len(legs)} {leg.key}") leg_total_all = pd.DataFrame(leg_rows).sort_values( ["calmar", "annualized_return", "trades_per_month"], ascending=[False, False, False], ) primary_leg_total = leg_total_all[leg_total_all["cost_model"] == PRIMARY_COST_MODEL] selected: list[str] = [] for symbol in PRIMARY_SYMBOLS: symbol_rows = primary_leg_total[primary_leg_total["symbol"] == symbol].head(args.max_legs_per_symbol) selected.extend(str(leg) for leg in symbol_rows["leg"]) if len(set(selected)) < 2: raise RuntimeError("not enough selected BTC/ETH legs to build portfolios") selected = list(dict.fromkeys(selected)) key_to_leg = {leg.key: leg for leg in legs} primary_daily_key = lambda key: f"{PRIMARY_COST_MODEL}:{key}" common_start = max(daily[primary_daily_key(key)].index[0] for key in selected) common_end = min(daily[primary_daily_key(key)].index[-1] for key in selected) selected_daily = { cost_model: { key: daily[f"{cost_model}:{key}"][(daily[f"{cost_model}:{key}"].index >= common_start) & (daily[f"{cost_model}:{key}"].index <= common_end)] for key in selected } for cost_model in COST_MODELS } leg_metrics = {row["leg"]: row for row in leg_rows if row["cost_model"] == PRIMARY_COST_MODEL} portfolio_rows: list[dict[str, object]] = [] horizon_rows_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] equity_frames: list[pd.DataFrame] = [] contribution_rows: list[dict[str, object]] = [] combo_index = 0 for leg_count in range(2, min(args.max_leg_count, len(selected)) + 1): for legs_tuple in combinations(selected, leg_count): symbols = {key_to_leg[key].symbol for key in legs_tuple} if not set(PRIMARY_SYMBOLS).issubset(symbols): continue if len({key_to_leg[key].family for key in legs_tuple}) < min(2, leg_count): continue for mode in ("equal", "risk"): combo_index += 1 if mode == "equal": weights = pd.Series(1.0 / leg_count, index=legs_tuple) else: raw = pd.Series({key: 1.0 / max(float(leg_metrics[key]["max_drawdown"]), 0.02) for key in legs_tuple}) weights = raw / raw.sum() name = f"{mode}-{leg_count}-hf{combo_index:05d}" for cost_model, roundtrip_cost in COST_MODELS.items(): returns = pd.DataFrame({key: selected_daily[cost_model][key].pct_change().fillna(0.0) for key in legs_tuple}).dropna() portfolio_returns = returns.mul(weights, axis=1).sum(axis=1) series = explore.INITIAL_EQUITY * (1.0 + portfolio_returns).cumprod() series.iloc[0] = explore.INITIAL_EQUITY series.name = name cost_leg_returns = {key: leg_returns[f"{cost_model}:{key}"] for key in legs_tuple} weighted_trade_returns = split_weighted_returns(legs_tuple, weights, cost_leg_returns) monthly = monthly_rows(name, series) monthly["cost_model"] = cost_model horizons = horizon_metrics(name, cost_model, series, weighted_trade_returns, monthly) recent = {row["horizon"]: float(row["total_return"]) for row in horizons} recent_positive = recent["1y"] > 0.0 and recent["6m"] > 0.0 and recent["3m"] > 0.0 stats = trade_stats(weighted_trade_returns, series.index[0], series.index[-1]) metrics = metrics_from_daily(series) worst = monthly.sort_values("return").iloc[0] qualified = ( stats["trades_per_month"] >= MIN_TRADES_PER_MONTH and metrics["max_drawdown"] < MAX_DRAWDOWN and recent_positive ) row = { "portfolio": name, "mode": mode, "leg_count": leg_count, "legs": ";".join(legs_tuple), "weights_json": json.dumps({key: float(weights[key]) for key in legs_tuple}, separators=(",", ":")), "symbols": ",".join(sorted(symbols)), "families": ",".join(sorted({key_to_leg[key].family for key in legs_tuple})), "start": series.index[0].strftime("%Y-%m-%d"), "end": series.index[-1].strftime("%Y-%m-%d"), "worst_month": str(worst["month"]), "worst_month_return": float(worst["return"]), "recent_positive": recent_positive, "qualified": qualified, **metrics, **stats, **monthly_stability(monthly), } add_cost_columns(row, cost_model, roundtrip_cost) portfolio_rows.append(row) horizon_rows_output.extend(horizons) monthly_frames.append(monthly) equity_frames.append( pd.DataFrame( { "portfolio": name, "cost_model": cost_model, "date": series.index.strftime("%Y-%m-%d"), "equity": series.to_numpy(), } ) ) contribution_rows.extend( leg_contribution_rows(name, cost_model, legs_tuple, weights, cost_leg_returns, series.index[-1]) ) portfolio_total = pd.DataFrame(portfolio_rows).sort_values( ["cost_model", "qualified", "calmar", "annualized_return", "trades_per_month", "worst_month_return"], ascending=[True, False, False, False, False, False], ) primary_top = portfolio_total[portfolio_total["cost_model"] == PRIMARY_COST_MODEL].head(50) top_pairs = set(zip(primary_top["cost_model"], primary_top["portfolio"])) top_pairs.update(zip(portfolio_total[portfolio_total["cost_model"] == "taker_taker"].head(50)["cost_model"], portfolio_total[portfolio_total["cost_model"] == "taker_taker"].head(50)["portfolio"])) horizon = pd.DataFrame(horizon_rows_output) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizon = horizon[ horizon.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1) ].sort_values(["cost_model", "portfolio", "horizon"]) portfolio_metadata = portfolio_total[ ["cost_model", "portfolio", "mode", "leg_count", "legs", "weights_json", "symbols", "families", "recent_positive"] ] qualified = risk_qualified(horizon).merge(portfolio_metadata, on=["cost_model", "portfolio"], how="left") robust = robust_survivors(horizon).merge(portfolio_metadata, on=["cost_model", "portfolio"], how="left") monthly_all = pd.concat(monthly_frames, ignore_index=True) monthly_all = monthly_all[monthly_all.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1)] worst_months = monthly_all.sort_values("return").head(100) equity = pd.concat(equity_frames, ignore_index=True) equity = equity[equity.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1)] leg_contrib = pd.DataFrame(contribution_rows) leg_contrib = leg_contrib[ leg_contrib.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1) ].sort_values(["cost_model", "portfolio", "horizon", "contribution_return_sum"], ascending=[True, True, True, False]) args.output_dir.mkdir(parents=True, exist_ok=True) leg_path = args.output_dir / f"{PREFIX}-legs.csv" total_path = args.output_dir / f"{PREFIX}-total.csv" qualified_path = args.output_dir / f"{PREFIX}-qualified.csv" robust_path = args.output_dir / f"{PREFIX}-robust-survivors.csv" horizon_path = args.output_dir / f"{PREFIX}-horizon.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly.csv" worst_path = args.output_dir / f"{PREFIX}-worst-months.csv" contrib_path = args.output_dir / f"{PREFIX}-leg-contribution.csv" equity_path = args.output_dir / f"{PREFIX}-equity.csv" summary_path = args.output_dir / f"{PREFIX}-summary.json" report_path = args.output_dir / f"{PREFIX}-report.md" leg_total_all.to_csv(leg_path, index=False) portfolio_total.to_csv(total_path, index=False) qualified.to_csv(qualified_path, index=False) robust.to_csv(robust_path, index=False) horizon.to_csv(horizon_path, index=False) monthly_all.to_csv(monthly_path, index=False) worst_months.to_csv(worst_path, index=False) leg_contrib.to_csv(contrib_path, index=False) equity.to_csv(equity_path, index=False) summary_path.write_text( json.dumps( { "portfolio_count": int(len(portfolio_total)), "qualified_count": int(len(qualified)), "robust_survivor_count": int(len(robust)), "selected_leg_count": int(len(selected)), "best_portfolio": portfolio_total.iloc[0].to_dict() if len(portfolio_total) else {}, }, indent=2, default=str, ) + "\n", encoding="utf-8", ) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years} --max-legs-per-symbol {args.max_legs_per_symbol} --max-leg-count {args.max_leg_count}" output_files = [leg_path, total_path, qualified_path, robust_path, horizon_path, monthly_path, worst_path, contrib_path, equity_path, summary_path, report_path] report_path.write_text(write_report(command, output_files, portfolio_total, qualified, robust, horizon, leg_contrib), encoding="utf-8") print(portfolio_total.head(20).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())