from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts import explore_ultrashort as explore from scripts import search_eth_btc_nextgen_variants as nextgen from scripts import search_eth_microstructure_variants as micro OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-nextgen-micro-portfolio" PRIMARY_COST = "maker_taker" COST_MODELS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } NEXTGEN_BASELINE = "equal-2-c0003" NEXTGEN_LEGS = ( "btc_trend_eth_rsi:15m:eth-btc-rsi-filter-et50-l3.0-x55.0-bt480-bm240-br0.0", "btc_shock_guard_eth_rsi:15m:eth-btc-shock-filter-et50-l3.0-x55.0-bt480-bm240-br0.01-sw96-sv0.01-sd0.05", ) MICRO_NAMES = ( "atr-compress-expand-r48-q0.15-sl0.008-tp0.016-mf0.25-us", "atr-compress-expand-r48-q0.15-sl0.008-tp0.016-mf0.4-us", "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.25-us", "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.4-us", ) HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class LegReturn: source: str exit_date: pd.Timestamp value: float def daily_equity_from_frame(frame: pd.DataFrame, index: pd.DatetimeIndex) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() daily = series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).ffill() daily.iloc[0] = explore.INITIAL_EQUITY return daily def metrics_from_daily(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = explore.max_drawdown_from_equity([float(value) for value in series]) returns = series.pct_change().dropna() daily_std = float(returns.std(ddof=1)) if len(returns) > 1 else 0.0 risk_reward = float(returns.mean()) / daily_std * (365**0.5) if daily_std else 0.0 return { "net_total_return": total, "net_annualized_return": annualized, "net_max_drawdown": drawdown, "net_calmar": annualized / drawdown if drawdown else 0.0, "risk_reward_ratio": risk_reward, } def monthly_rows(name: str, cost_model: str, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "name": name, "cost_model": cost_model, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def horizon_rows(name: str, cost_model: str, series: pd.Series, trades: list[LegReturn], monthly: pd.DataFrame) -> list[dict[str, object]]: end = series.index[-1] rows: list[dict[str, object]] = [] for label, offset in HORIZONS: horizon = series if offset is None else series[series.index >= end - offset] if len(horizon) < 2: horizon = series start = horizon.index[0] horizon_trades = [trade for trade in trades if trade.exit_date >= start] horizon_monthly = monthly[monthly["month"] >= start.strftime("%Y-%m")] worst = horizon_monthly.sort_values("return").iloc[0] if len(horizon_monthly) else None rows.append( { "name": name, "cost_model": cost_model, "horizon": label, "horizon_start": horizon.index[0].strftime("%Y-%m-%d"), "horizon_end": horizon.index[-1].strftime("%Y-%m-%d"), "worst_month": "" if worst is None else str(worst["month"]), "worst_month_return": 0.0 if worst is None else float(worst["return"]), **trade_return_stats(horizon_trades), **metrics_from_daily(horizon), } ) return rows def returns_to_equity(name: str, returns: pd.Series) -> pd.Series: equity = explore.INITIAL_EQUITY * (1.0 + returns.fillna(0.0)).cumprod() equity.iloc[0] = explore.INITIAL_EQUITY equity.name = name return equity def trade_return_stats(returns: list[LegReturn]) -> dict[str, float]: values = [row.value for row in returns] wins = [value for value in values if value > 0.0] losses = [value for value in values if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss_abs = abs(sum(losses) / len(losses)) if losses else 0.0 gross_profit = sum(wins) gross_loss_abs = abs(sum(losses)) return { "trades": len(values), "win_rate": len(wins) / len(values) if values else 0.0, "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, } def nextgen_trade_returns(result: object, weight: float, roundtrip_cost: float) -> list[LegReturn]: rows: list[LegReturn] = [] for trade in result.trades: value = (float(trade["return_pct"]) / 100.0 - roundtrip_cost * float(trade.get("cost_weight", 1.0))) * weight rows.append(LegReturn("nextgen", pd.to_datetime(str(trade["exit_time"]), utc=True).normalize(), value)) return rows def micro_trade_returns(result: micro.SegmentResult, weight: float, roundtrip_cost: float) -> list[LegReturn]: rows: list[LegReturn] = [] for trade in result.trades: value = (float(trade["return_pct"]) / 100.0 - roundtrip_cost * float(trade["cost_weight"])) * weight rows.append(LegReturn("micro", pd.to_datetime(str(trade["exit_time"]), utc=True).normalize(), value)) return rows def combine_trade_returns( *, nextgen_returns: list[LegReturn], micro_returns: list[LegReturn], nextgen_weight: float, micro_weight: float, micro_mask: pd.Series | None = None, ) -> list[LegReturn]: values = [LegReturn(row.source, row.exit_date, row.value * nextgen_weight) for row in nextgen_returns] for row in micro_returns: if micro_mask is None or bool(micro_mask.reindex([row.exit_date]).fillna(False).iloc[0]): values.append(LegReturn(row.source, row.exit_date, row.value * micro_weight)) return values def load_nextgen(index: pd.DatetimeIndex, roundtrip_cost: float) -> tuple[pd.Series, list[LegReturn]]: strategies = { f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}": strategy for strategy in nextgen.build_strategies() } missing = [key for key in NEXTGEN_LEGS if key not in strategies] if missing: raise KeyError(f"missing nextgen legs: {missing}") data = { (symbol, "15m"): nextgen.load_candles(symbol, "15m", 20.0) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP") } leg_series: list[pd.Series] = [] trade_returns: list[LegReturn] = [] for key in NEXTGEN_LEGS: result = nextgen.run_strategy(strategies[key], data) frame = explore.cost_adjusted_trade_equity_frame(result, roundtrip_cost) leg_series.append(daily_equity_from_frame(frame, index)) trade_returns.extend(nextgen_trade_returns(result, 0.5, roundtrip_cost)) returns = pd.DataFrame({key: series.pct_change().fillna(0.0) for key, series in zip(NEXTGEN_LEGS, leg_series)}).mean(axis=1) return returns_to_equity(NEXTGEN_BASELINE, returns), trade_returns def load_micro_candidates(index: pd.DatetimeIndex, roundtrip_cost: float) -> dict[str, tuple[pd.Series, list[LegReturn]]]: candles = micro._load_candles(micro.SYMBOL, micro.BAR) requested = int(20.0 * 365 * 24 * 60 / 15) candles = candles[-requested:] variants = {variant.name: variant for variant in micro.build_variants()} missing = [name for name in MICRO_NAMES if name not in variants] if missing: raise KeyError(f"missing micro variants: {missing}") output: dict[str, tuple[pd.Series, list[LegReturn]]] = {} for name in MICRO_NAMES: result = variants[name].run(candles) frame = micro.cost_equity_frame(result, roundtrip_cost) output[name] = (daily_equity_from_frame(frame, index), micro_trade_returns(result, 1.0, roundtrip_cost)) return output def evaluate_portfolio( *, name: str, cost_model: str, roundtrip_cost: float, kind: str, series: pd.Series, nextgen_weight: float, micro_weight: float, micro_name: str, trade_returns: list[LegReturn], ) -> tuple[dict[str, object], list[dict[str, object]], pd.DataFrame]: monthly = monthly_rows(name, cost_model, series) worst = monthly.sort_values("return").iloc[0] horizons = horizon_rows(name, cost_model, series, trade_returns, monthly) min_recent = min(float(row["net_total_return"]) for row in horizons if row["horizon"] != "full") row = { "name": name, "cost_model": cost_model, "roundtrip_cost_on_margin": roundtrip_cost, "kind": kind, "micro_name": micro_name, "nextgen_weight": nextgen_weight, "micro_weight": micro_weight, "worst_month": str(worst["month"]), "worst_month_return": float(worst["return"]), "min_recent_total_return": min_recent, **trade_return_stats(trade_returns), **metrics_from_daily(series), } return row, horizons, monthly def build_portfolios( *, cost_model: str, roundtrip_cost: float, nextgen_series: pd.Series, nextgen_trade_returns: list[LegReturn], micro_candidates: dict[str, tuple[pd.Series, list[LegReturn]]], ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] equity_frames: list[pd.DataFrame] = [] nextgen_returns = nextgen_series.pct_change().fillna(0.0) baseline_row, baseline_horizons, baseline_monthly = evaluate_portfolio( name=NEXTGEN_BASELINE, cost_model=cost_model, roundtrip_cost=roundtrip_cost, kind="baseline", series=nextgen_series, nextgen_weight=1.0, micro_weight=0.0, micro_name="", trade_returns=nextgen_trade_returns, ) rows.append(baseline_row) horizon_output.extend(baseline_horizons) monthly_frames.append(baseline_monthly) equity_frames.append(equity_frame(NEXTGEN_BASELINE, cost_model, nextgen_series)) for micro_name, (micro_series, micro_trade_rows) in micro_candidates.items(): micro_returns = micro_series.pct_change().fillna(0.0) for micro_weight in (0.10, 0.15, 0.20, 0.25, 0.30): nextgen_weight = 1.0 - micro_weight returns = nextgen_returns * nextgen_weight + micro_returns * micro_weight name = f"blend-ng{nextgen_weight:.2f}-{short_micro_name(micro_name)}" series = returns_to_equity(name, returns) trades = combine_trade_returns( nextgen_returns=nextgen_trade_returns, micro_returns=micro_trade_rows, nextgen_weight=nextgen_weight, micro_weight=micro_weight, ) add_result(rows, horizon_output, monthly_frames, equity_frames, name, cost_model, roundtrip_cost, "equity_blend", micro_name, nextgen_weight, micro_weight, series, trades) flat_mask = nextgen_returns.abs() < 1e-12 for micro_weight in (0.25, 0.40): returns = nextgen_returns + micro_returns.where(flat_mask, 0.0) * micro_weight name = f"nonoverlap-m{micro_weight:.2f}-{short_micro_name(micro_name)}" series = returns_to_equity(name, returns) trades = combine_trade_returns( nextgen_returns=nextgen_trade_returns, micro_returns=micro_trade_rows, nextgen_weight=1.0, micro_weight=micro_weight, micro_mask=flat_mask, ) add_result(rows, horizon_output, monthly_frames, equity_frames, name, cost_model, roundtrip_cost, "signal_non_overlap", micro_name, 1.0, micro_weight, series, trades) for lookback in (30, 60, 90, 120): nextgen_regime = nextgen_series / nextgen_series.shift(lookback) - 1.0 micro_regime = micro_series / micro_series.shift(lookback) - 1.0 active = ((nextgen_regime < 0.0) & (micro_regime > 0.0)).shift(1).fillna(False).astype(bool) switch_returns = nextgen_returns.where(~active, micro_returns) switch_name = f"switch-l{lookback}-{short_micro_name(micro_name)}" switch_series = returns_to_equity(switch_name, switch_returns) switch_trades = combine_trade_returns( nextgen_returns=[row for row in nextgen_trade_returns if not bool(active.reindex([row.exit_date]).fillna(False).iloc[0])], micro_returns=micro_trade_rows, nextgen_weight=1.0, micro_weight=1.0, micro_mask=active, ) add_result(rows, horizon_output, monthly_frames, equity_frames, switch_name, cost_model, roundtrip_cost, "recent_regime_switch", micro_name, 1.0, 1.0, switch_series, switch_trades) for micro_weight in (0.25, 0.40): overlay_returns = nextgen_returns + micro_returns.where(active, 0.0) * micro_weight overlay_name = f"riskoff-overlay-l{lookback}-m{micro_weight:.2f}-{short_micro_name(micro_name)}" overlay_series = returns_to_equity(overlay_name, overlay_returns) overlay_trades = combine_trade_returns( nextgen_returns=nextgen_trade_returns, micro_returns=micro_trade_rows, nextgen_weight=1.0, micro_weight=micro_weight, micro_mask=active, ) add_result( rows, horizon_output, monthly_frames, equity_frames, overlay_name, cost_model, roundtrip_cost, "riskoff_overlay", micro_name, 1.0, micro_weight, overlay_series, overlay_trades, ) summary = pd.DataFrame(rows).sort_values( ["cost_model", "net_calmar", "net_annualized_return", "min_recent_total_return", "worst_month_return"], ascending=[True, False, False, False, False], ) horizons = pd.DataFrame(horizon_output) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizons = horizons.sort_values(["cost_model", "name", "horizon"]) monthly = pd.concat(monthly_frames, ignore_index=True) equity = pd.concat(equity_frames, ignore_index=True) return summary, horizons, monthly, equity def add_result( rows: list[dict[str, object]], horizon_output: list[dict[str, object]], monthly_frames: list[pd.DataFrame], equity_frames: list[pd.DataFrame], name: str, cost_model: str, roundtrip_cost: float, kind: str, micro_name: str, nextgen_weight: float, micro_weight: float, series: pd.Series, trades: list[LegReturn], ) -> None: row, horizons, monthly = evaluate_portfolio( name=name, cost_model=cost_model, roundtrip_cost=roundtrip_cost, kind=kind, series=series, nextgen_weight=nextgen_weight, micro_weight=micro_weight, micro_name=micro_name, trade_returns=trades, ) rows.append(row) horizon_output.extend(horizons) monthly_frames.append(monthly) equity_frames.append(equity_frame(name, cost_model, series)) def equity_frame(name: str, cost_model: str, series: pd.Series) -> pd.DataFrame: return pd.DataFrame({"name": name, "cost_model": cost_model, "date": series.index.strftime("%Y-%m-%d"), "equity": series.to_numpy()}) def short_micro_name(name: str) -> str: return name.replace("atr-compress-expand-", "").replace("sl0.008-tp0.016-", "").replace("-", "_") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def monthly_stability(monthly: pd.DataFrame) -> pd.DataFrame: return ( monthly.groupby(["cost_model", "name"], as_index=False) .agg( months=("return", "count"), positive_month_rate=("return", lambda values: float((values > 0.0).mean())), avg_month_return=("return", "mean"), median_month_return=("return", "median"), worst_month_return=("return", "min"), best_month_return=("return", "max"), ) .sort_values(["cost_model", "positive_month_rate", "worst_month_return"], ascending=[True, False, False]) ) def robust_survivors(horizons: pd.DataFrame) -> pd.DataFrame: full = horizons[horizons["horizon"] == "full"].copy() recent = full for horizon in ("3y", "1y", "6m", "3m"): part = horizons[horizons["horizon"] == horizon][["cost_model", "name", "net_total_return", "net_calmar"]].rename( columns={"net_total_return": f"ret_{horizon}", "net_calmar": f"calmar_{horizon}"} ) recent = recent.merge(part, on=["cost_model", "name"], how="inner") return recent[ (recent["net_total_return"] > 0.0) & (recent["net_calmar"] > 0.0) & (recent["ret_3y"] > 0.0) & (recent["ret_1y"] > 0.0) & (recent["ret_6m"] > 0.0) & (recent["ret_3m"] > 0.0) & (recent["calmar_3y"] > 0.0) & (recent["calmar_1y"] > 0.0) & (recent["calmar_6m"] > 0.0) & (recent["calmar_3m"] > 0.0) ].copy() def write_report( *, command: str, output_files: list[Path], summary: pd.DataFrame, horizons: pd.DataFrame, stability: pd.DataFrame, worst_months: pd.DataFrame, survivors: pd.DataFrame, ) -> str: primary_summary = summary[summary["cost_model"] == PRIMARY_COST] stress_summary = summary[summary["cost_model"] == "taker_taker"] baseline = primary_summary[primary_summary["name"] == NEXTGEN_BASELINE].iloc[0] best = primary_summary.iloc[0] best_horizons = horizons[(horizons["cost_model"] == PRIMARY_COST) & (horizons["name"] == best["name"])] baseline_horizons = horizons[(horizons["cost_model"] == PRIMARY_COST) & (horizons["name"] == NEXTGEN_BASELINE)] dilution = ( "The best combination improves risk-adjusted return versus nextgen equal-2-c0003." if float(best["net_calmar"]) > float(baseline["net_calmar"]) and float(best["net_total_return"]) >= float(baseline["net_total_return"]) * 0.98 else "The best combination is mainly return dilution unless its lower drawdown or better worst month is preferred." ) lines = [ "# ETH nextgen + microstructure portfolio exploration", "", f"Run command: `{command}`", "", "No order placement or exchange API path is used; this script reads local candle/report data only.", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Base nextgen: `equal-2-c0003` with the two documented maker_taker legs.", "Micro candidates: ATR compression/expansion US-session robust candidates only.", "Costs: maker_taker=0.0021 and taker_taker=0.0030 roundtrip on margin. Funding and slippage remain excluded.", "", "## Conclusion", "", dilution, f"Strict robust survivors with positive full/3y/1y/6m/3m net return and Calmar: {len(survivors)}.", "", "## Top maker_taker combinations", "", markdown_table( primary_summary.head(12)[ [ "name", "cost_model", "kind", "micro_name", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "risk_reward_ratio", "worst_month", "worst_month_return", "min_recent_total_return", "trades", "win_rate", "payoff_ratio", "profit_factor", ] ] ), "", "## Taker/taker stress combinations", "", markdown_table( stress_summary.head(12)[ [ "name", "cost_model", "kind", "micro_name", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "risk_reward_ratio", "worst_month", "worst_month_return", "min_recent_total_return", "trades", ] ] ), "", "## Strict robust survivors", "", markdown_table( survivors.head(20)[ [ "name", "cost_model", "net_total_return", "net_calmar", "ret_3y", "ret_1y", "ret_6m", "ret_3m", "calmar_3y", "calmar_1y", "calmar_6m", "calmar_3m", ] ] ), "", "## Best horizon metrics", "", markdown_table( best_horizons[ [ "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "worst_month", "worst_month_return", ] ] ), "", "## Baseline horizon metrics", "", markdown_table( baseline_horizons[ [ "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "worst_month", "worst_month_return", ] ] ), "", "## Monthly stability", "", markdown_table(stability[stability["cost_model"] == PRIMARY_COST].head(20)), "", "## Worst months", "", markdown_table(worst_months.head(20)[["name", "cost_model", "month", "return"]]), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() existing_equity = pd.read_csv(args.output_dir / "eth-btc-nextgen-equity.csv") base = existing_equity[(existing_equity["cost_model"] == PRIMARY_COST) & (existing_equity["name"] == NEXTGEN_BASELINE)].copy() if base.empty: raise KeyError(f"missing existing nextgen equity for {NEXTGEN_BASELINE}") index = pd.DatetimeIndex(pd.to_datetime(base["date"], utc=True)) summary_frames: list[pd.DataFrame] = [] horizon_frames: list[pd.DataFrame] = [] monthly_frames: list[pd.DataFrame] = [] equity_frames: list[pd.DataFrame] = [] for cost_model, roundtrip_cost in COST_MODELS.items(): nextgen_series, nextgen_returns = load_nextgen(index, roundtrip_cost) micro_candidates = load_micro_candidates(index, roundtrip_cost) summary, horizons, monthly, equity = build_portfolios( cost_model=cost_model, roundtrip_cost=roundtrip_cost, nextgen_series=nextgen_series, nextgen_trade_returns=nextgen_returns, micro_candidates=micro_candidates, ) summary_frames.append(summary) horizon_frames.append(horizons) monthly_frames.append(monthly) equity_frames.append(equity) summary = pd.concat(summary_frames, ignore_index=True) primary = summary[summary["cost_model"] == PRIMARY_COST] stress = summary[summary["cost_model"] != PRIMARY_COST] summary = pd.concat([primary, stress], ignore_index=True) top_pairs = set(zip(primary.head(25)["cost_model"], primary.head(25)["name"])) top_pairs.update(zip(stress.head(25)["cost_model"], stress.head(25)["name"])) top_pairs.add((PRIMARY_COST, NEXTGEN_BASELINE)) for cost_model in COST_MODELS: top_pairs.add((cost_model, NEXTGEN_BASELINE)) horizons = pd.concat(horizon_frames, ignore_index=True) monthly = pd.concat(monthly_frames, ignore_index=True) equity = pd.concat(equity_frames, ignore_index=True) survivors = robust_survivors(horizons).merge( summary[ [ "cost_model", "name", "kind", "micro_name", "nextgen_weight", "micro_weight", "roundtrip_cost_on_margin", ] ], on=["cost_model", "name"], how="left", ).sort_values(["cost_model", "net_calmar", "net_total_return"], ascending=[True, False, False]) stability = monthly_stability(monthly) worst_months = monthly.sort_values("return").head(100) horizons = horizons[horizons.apply(lambda row: (row["cost_model"], row["name"]) in top_pairs, axis=1)] monthly = monthly[monthly.apply(lambda row: (row["cost_model"], row["name"]) in top_pairs, axis=1)] equity = equity[equity.apply(lambda row: (row["cost_model"], row["name"]) in top_pairs, axis=1)] args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / f"{PREFIX}-summary.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly.csv" stability_path = args.output_dir / f"{PREFIX}-monthly-stability.csv" worst_path = args.output_dir / f"{PREFIX}-worst-months.csv" robust_path = args.output_dir / f"{PREFIX}-robust-survivors.csv" equity_path = args.output_dir / f"{PREFIX}-equity.csv" json_path = args.output_dir / f"{PREFIX}-top.json" report_path = args.output_dir / f"{PREFIX}-report.md" output_files = [summary_path, horizon_path, monthly_path, stability_path, worst_path, robust_path, equity_path, json_path, report_path] summary.to_csv(summary_path, index=False) horizons.to_csv(horizon_path, index=False) monthly.to_csv(monthly_path, index=False) stability.to_csv(stability_path, index=False) worst_months.to_csv(worst_path, index=False) survivors.to_csv(robust_path, index=False) equity.to_csv(equity_path, index=False) json_path.write_text(json.dumps(primary.head(20).to_dict("records"), indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()}" report_path.write_text( write_report( command=command, output_files=output_files, summary=summary, horizons=horizons, stability=stability, worst_months=worst_months, survivors=survivors, ), encoding="utf-8", ) print(primary.head(12).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())