| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from dataclasses import dataclass
- from itertools import combinations
- from pathlib import Path
- from typing import Callable
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.bbmr_report import BBMRConfig, run_bbmr_segment
- from okx_codex_trader.bbsb_report import BBSBConfig, run_bbsb_segment
- from okx_codex_trader.models import Candle
- from scripts import explore_ultrashort as explore
- from scripts import search_eth_btc_nextgen_variants as nextgen
- OUTPUT_DIR = Path("reports/ultrashort")
- PREFIX = "high-frequency-portfolio"
- YEARS = 10.0
- COST_MODELS = {
- "maker_taker": 0.0021,
- "taker_taker": 0.0030,
- }
- PRIMARY_COST_MODEL = "maker_taker"
- MIN_TRADES_PER_MONTH = 15.0
- MAX_DRAWDOWN = 0.20
- PRIMARY_SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class LegSpec:
- key: str
- symbol: str
- family: str
- bar: str
- pair: bool
- warmup_bars: int
- run: Callable[[dict[tuple[str, str], list[Candle]]], explore.SegmentResult]
- @dataclass(frozen=True)
- class LegReturn:
- leg: str
- symbol: str
- family: str
- exit_time: pd.Timestamp
- value: float
- def load_candles(symbol: str, bar: str, years: float) -> list[Candle]:
- candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar)
- if not candles:
- raise FileNotFoundError(f"missing cached candles for {symbol} {bar}")
- requested = explore.history_bars_for_years(bar, years)
- return candles[-requested:] if len(candles) > requested else candles
- def single_leg(symbol: str, family: str, bar: str, candidate: explore.Candidate) -> LegSpec:
- key = f"{symbol}:{family}:{bar}:{candidate.name}"
- return LegSpec(
- key=key,
- symbol=symbol,
- family=family,
- bar=bar,
- pair=False,
- warmup_bars=candidate.warmup_bars,
- run=lambda data, symbol=symbol, bar=bar, candidate=candidate: candidate.run(
- candles=data[(symbol, bar)],
- leverage=explore.LEVERAGE,
- warmup_bars=candidate.warmup_bars,
- ),
- )
- def pair_leg(family: str, bar: str, candidate: explore.PairCandidate) -> LegSpec:
- key = f"ETH-USDT-SWAP:{family}:{bar}:{candidate.name}"
- return LegSpec(
- key=key,
- symbol="ETH-USDT-SWAP",
- family=family,
- bar=bar,
- pair=True,
- warmup_bars=candidate.warmup_bars,
- run=lambda data, bar=bar, candidate=candidate: run_pair_candidate(candidate, data, bar),
- )
- def run_pair_candidate(
- candidate: explore.PairCandidate,
- data: dict[tuple[str, str], list[Candle]],
- bar: str,
- ) -> explore.SegmentResult:
- eth, btc = explore.align_pair_candles(data[("ETH-USDT-SWAP", bar)], data[("BTC-USDT-SWAP", bar)])
- return candidate.run(
- eth_candles=eth,
- btc_candles=btc,
- leverage=explore.LEVERAGE,
- warmup_bars=candidate.warmup_bars,
- )
- def build_single_symbol_candidates() -> list[tuple[str, explore.Candidate]]:
- candidates: list[tuple[str, explore.Candidate]] = [
- (
- "bbmr",
- explore.Candidate(
- "bbmr-default",
- 69,
- lambda candles, leverage, warmup_bars: run_bbmr_segment(
- candles=candles,
- leverage=leverage,
- warmup_bars=warmup_bars,
- config=BBMRConfig(),
- ),
- ),
- ),
- ]
- for trend in (30, 50):
- for long_threshold, short_threshold in ((8.0, 92.0), (12.0, 88.0)):
- candidates.append(("rsi", explore.build_rsi2_side_candidate(trend, long_threshold, short_threshold, 50.0, "both")))
- for fast, slow in ((8, 21), (13, 34)):
- candidates.append(("ma", explore.build_ma_cross_candidate(fast, slow, "both")))
- for window in (24, 48):
- candidates.append(
- (
- "vwap",
- explore.Candidate(
- f"vwap-revert-w{window}-z1.5-sl0.006",
- window * 2,
- lambda candles, leverage, warmup_bars, window=window: explore.run_vwap_reversion_segment(
- candles=candles,
- leverage=leverage,
- warmup_bars=warmup_bars,
- window=window,
- entry_z=1.5,
- exit_z=0.2,
- stop_loss_pct=0.006,
- ),
- ),
- )
- )
- candidates.append(
- (
- "bbsb",
- explore.Candidate(
- "bbsb-default",
- 69,
- lambda candles, leverage, warmup_bars: run_bbsb_segment(
- candles=candles,
- leverage=leverage,
- warmup_bars=warmup_bars,
- config=BBSBConfig(),
- ),
- ),
- )
- )
- return candidates
- def build_legs() -> list[LegSpec]:
- legs: list[LegSpec] = []
- single_candidates = build_single_symbol_candidates()
- for symbol in PRIMARY_SYMBOLS:
- for family, candidate in single_candidates:
- legs.append(single_leg(symbol, family, "15m", candidate))
- nextgen_keep = {
- "btc_trend_eth_rsi:15m:eth-btc-rsi-filter-et50-l3.0-x55.0-bt480-bm240-br0.0",
- "btc_shock_guard_eth_rsi:15m:eth-btc-shock-filter-et50-l3.0-x55.0-bt480-bm240-br0.01-sw96-sv0.01-sd0.05",
- "btc_lead_eth_lag:5m:btc-lead-eth-lag-lb16-br0.012-gap0.006-mh8-sl0.006-tp0.018",
- "btc_lead_eth_lag:5m:btc-lead-eth-lag-lb16-br0.012-gap0.006-mh32-sl0.006-tp0.018",
- "btc_lead_eth_lag:15m:btc-lead-eth-lag-lb8-br0.018-gap0.006-mh8-sl0.006-tp0.018",
- "btc_lead_eth_lag:15m:btc-lead-eth-lag-lb16-br0.024-gap0.006-mh32-sl0.006-tp0.018",
- }
- for strategy in nextgen.build_strategies():
- key = f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}"
- if key in nextgen_keep:
- legs.append(pair_leg(f"nextgen_{strategy.family}", strategy.bar, strategy.candidate))
- return legs
- def cost_trade_returns(spec: LegSpec, result: explore.SegmentResult, roundtrip_cost: float) -> list[LegReturn]:
- rows: list[LegReturn] = []
- for trade in result.trades:
- value = float(trade["return_pct"]) / 100.0 - roundtrip_cost * float(trade.get("cost_weight", 1.0))
- rows.append(
- LegReturn(
- leg=spec.key,
- symbol=spec.symbol,
- family=spec.family,
- exit_time=pd.to_datetime(str(trade["exit_time"]), utc=True),
- value=value,
- )
- )
- return rows
- def returns_to_daily_equity(
- name: str,
- returns: list[LegReturn],
- start: pd.Timestamp,
- end: pd.Timestamp,
- ) -> pd.Series:
- index = pd.date_range(start.normalize(), end.normalize(), freq="1D", tz="UTC")
- if not returns:
- series = pd.Series(explore.INITIAL_EQUITY, index=index, name=name, dtype=float)
- return series
- frame = pd.DataFrame({"date": [row.exit_time.normalize() for row in returns], "return": [row.value for row in returns]})
- daily_returns = frame.groupby("date")["return"].sum().reindex(index, fill_value=0.0)
- equity = explore.INITIAL_EQUITY * (1.0 + daily_returns).cumprod()
- equity.iloc[0] = explore.INITIAL_EQUITY
- equity.name = name
- return equity
- def metrics_from_daily(series: pd.Series) -> dict[str, float]:
- years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365
- total_return = float(series.iloc[-1] / series.iloc[0] - 1.0)
- annualized_return = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0
- max_drawdown = explore.max_drawdown_from_equity([float(value) for value in series])
- daily_returns = series.pct_change().dropna()
- daily_std = float(daily_returns.std(ddof=1)) if len(daily_returns) > 1 else 0.0
- risk_reward = float(daily_returns.mean()) / daily_std * (365**0.5) if daily_std else 0.0
- return {
- "total_return": total_return,
- "annualized_return": annualized_return,
- "max_drawdown": max_drawdown,
- "calmar": annualized_return / max_drawdown if max_drawdown else 0.0,
- "risk_reward_ratio": risk_reward,
- }
- def trade_stats(returns: list[LegReturn], start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float]:
- values = [row.value for row in returns if start <= row.exit_time <= end]
- wins = [value for value in values if value > 0.0]
- losses = [value for value in values if value < 0.0]
- avg_win = sum(wins) / len(wins) if wins else 0.0
- avg_loss_abs = abs(sum(losses) / len(losses)) if losses else 0.0
- gross_profit = sum(wins)
- gross_loss_abs = abs(sum(losses))
- months = max((end - start).total_seconds() / 86_400 / 30.4375, 1e-9)
- return {
- "trades": len(values),
- "trades_per_month": len(values) / months,
- "win_rate": len(wins) / len(values) if values else 0.0,
- "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0,
- "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0,
- }
- def monthly_rows(name: str, series: pd.Series) -> pd.DataFrame:
- monthly = series.resample("ME").last()
- frame = pd.DataFrame(
- {
- "portfolio": name,
- "month": monthly.index.strftime("%Y-%m"),
- "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(),
- "end_equity": monthly.to_numpy(),
- }
- )
- frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0
- return frame
- def monthly_stability(monthly: pd.DataFrame) -> dict[str, float | int]:
- positive_months = int((monthly["return"] > 0.0).sum()) if len(monthly) else 0
- negative_months = int((monthly["return"] < 0.0).sum()) if len(monthly) else 0
- return {
- "months": len(monthly),
- "positive_month_rate": positive_months / len(monthly) if len(monthly) else 0.0,
- "negative_months": negative_months,
- }
- def horizon_metrics(
- portfolio: str,
- cost_model: str,
- series: pd.Series,
- returns: list[LegReturn],
- monthly: pd.DataFrame,
- ) -> list[dict[str, object]]:
- output: list[dict[str, object]] = []
- end = series.index[-1]
- for label, offset in HORIZONS:
- horizon = series if offset is None else series[series.index >= end - offset]
- if len(horizon) < 2:
- horizon = series
- start = horizon.index[0]
- horizon_monthly = monthly[monthly["month"] >= start.strftime("%Y-%m")]
- worst = horizon_monthly.sort_values("return").iloc[0] if len(horizon_monthly) else None
- output.append(
- {
- "portfolio": portfolio,
- "cost_model": cost_model,
- "horizon": label,
- "horizon_start": start.strftime("%Y-%m-%d"),
- "horizon_end": end.strftime("%Y-%m-%d"),
- "worst_month": "" if worst is None else str(worst["month"]),
- "worst_month_return": 0.0 if worst is None else float(worst["return"]),
- **metrics_from_daily(horizon),
- **trade_stats(returns, start, end),
- **monthly_stability(horizon_monthly),
- }
- )
- return output
- def split_weighted_returns(legs: tuple[str, ...], weights: pd.Series, leg_returns: dict[str, list[LegReturn]]) -> list[LegReturn]:
- rows: list[LegReturn] = []
- for leg in legs:
- weight = float(weights[leg])
- for row in leg_returns[leg]:
- rows.append(LegReturn(row.leg, row.symbol, row.family, row.exit_time, row.value * weight))
- return rows
- def leg_contribution_rows(
- portfolio: str,
- cost_model: str,
- legs: tuple[str, ...],
- weights: pd.Series,
- leg_returns: dict[str, list[LegReturn]],
- end: pd.Timestamp,
- ) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- for label, offset in HORIZONS:
- start = pd.Timestamp.min.tz_localize("UTC") if offset is None else end - offset
- total = 0.0
- by_leg: dict[str, float] = {}
- for leg in legs:
- contribution = sum(row.value * float(weights[leg]) for row in leg_returns[leg] if row.exit_time >= start)
- by_leg[leg] = contribution
- total += contribution
- for leg in legs:
- rows.append(
- {
- "portfolio": portfolio,
- "cost_model": cost_model,
- "horizon": label,
- "leg": leg,
- "weight": float(weights[leg]),
- "contribution_return_sum": by_leg[leg],
- "contribution_share": by_leg[leg] / total if total else 0.0,
- }
- )
- return rows
- def add_cost_columns(row: dict[str, object], cost_model: str, roundtrip_cost: float) -> None:
- row["cost_model"] = cost_model
- row["roundtrip_cost_on_margin"] = roundtrip_cost
- row["net_total_return"] = row["total_return"]
- row["net_annualized_return"] = row["annualized_return"]
- row["net_max_drawdown"] = row["max_drawdown"]
- row["net_calmar"] = row["calmar"]
- def risk_qualified(frame: pd.DataFrame) -> pd.DataFrame:
- full = frame[frame["horizon"] == "full"].copy()
- recent = full
- for horizon in ("3y", "1y", "6m", "3m"):
- part = frame[frame["horizon"] == horizon][["cost_model", "portfolio", "total_return", "max_drawdown", "calmar"]].rename(
- columns={"total_return": f"ret_{horizon}", "max_drawdown": f"dd_{horizon}", "calmar": f"calmar_{horizon}"}
- )
- recent = recent.merge(part, on=["cost_model", "portfolio"], how="inner")
- qualified = recent[
- (recent["trades_per_month"] >= MIN_TRADES_PER_MONTH)
- & (recent["max_drawdown"] < MAX_DRAWDOWN)
- & (recent["ret_1y"] > 0.0)
- & (recent["ret_6m"] > 0.0)
- & (recent["ret_3m"] > 0.0)
- ].copy()
- qualified["risk_rank"] = (
- qualified["calmar"] * 2.0
- + qualified["annualized_return"]
- + qualified["ret_1y"] * 0.5
- - qualified["max_drawdown"]
- )
- return qualified.sort_values(["cost_model", "risk_rank"], ascending=[True, False])
- def robust_survivors(frame: pd.DataFrame) -> pd.DataFrame:
- full = frame[frame["horizon"] == "full"].copy()
- recent = full
- for horizon in ("3y", "1y", "6m", "3m"):
- part = frame[frame["horizon"] == horizon][["cost_model", "portfolio", "total_return", "calmar"]].rename(
- columns={"total_return": f"ret_{horizon}", "calmar": f"calmar_{horizon}"}
- )
- recent = recent.merge(part, on=["cost_model", "portfolio"], how="inner")
- return recent[
- (recent["trades_per_month"] >= MIN_TRADES_PER_MONTH)
- & (recent["total_return"] > 0.0)
- & (recent["calmar"] > 0.0)
- & (recent["ret_3y"] > 0.0)
- & (recent["ret_1y"] > 0.0)
- & (recent["ret_6m"] > 0.0)
- & (recent["ret_3m"] > 0.0)
- & (recent["calmar_3y"] > 0.0)
- & (recent["calmar_1y"] > 0.0)
- & (recent["calmar_6m"] > 0.0)
- & (recent["calmar_3m"] > 0.0)
- ].copy()
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def write_report(
- command: str,
- output_files: list[Path],
- portfolio_total: pd.DataFrame,
- qualified: pd.DataFrame,
- robust: pd.DataFrame,
- horizon: pd.DataFrame,
- leg_contrib: pd.DataFrame,
- ) -> str:
- primary_total = portfolio_total[portfolio_total["cost_model"] == PRIMARY_COST_MODEL]
- primary_qualified = qualified[qualified["cost_model"] == PRIMARY_COST_MODEL]
- stress_qualified = qualified[qualified["cost_model"] == "taker_taker"]
- top = primary_total.head(10)
- top_qualified = primary_qualified.head(10)
- best_name = str(top.iloc[0]["portfolio"]) if len(top) else ""
- best_horizon = horizon[(horizon["cost_model"] == PRIMARY_COST_MODEL) & (horizon["portfolio"] == best_name)]
- best_contrib = leg_contrib[(leg_contrib["cost_model"] == PRIMARY_COST_MODEL) & (leg_contrib["portfolio"] == best_name)]
- lines = [
- "# Cross-symbol high-frequency portfolio ranking",
- "",
- f"Run command: `{command}`",
- "",
- "No exchange API, order placement, or live trading path is used. The search reads local BTC/ETH candle data and existing local strategy modules only.",
- "Primary ranking uses maker/taker roundtrip margin cost 0.21%. Taker/taker stress uses 0.30%. Funding and slippage remain excluded.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in output_files],
- "",
- "Selection target: at least 15 trades/month, max DD below 20%, and positive 1y/6m/3m return.",
- f"Strict robust survivors with positive full/3y/1y/6m/3m net return and Calmar: {len(robust)}.",
- "",
- "## Top portfolios: maker/taker",
- "",
- markdown_table(
- top[
- [
- "cost_model",
- "portfolio",
- "mode",
- "leg_count",
- "symbols",
- "families",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "risk_reward_ratio",
- "trades_per_month",
- "worst_month_return",
- "positive_month_rate",
- "recent_positive",
- "qualified",
- ]
- ]
- ),
- "",
- "## Qualified portfolios: maker/taker",
- "",
- markdown_table(
- top_qualified[
- [
- "cost_model",
- "portfolio",
- "mode",
- "leg_count",
- "symbols",
- "families",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades_per_month",
- "worst_month_return",
- "positive_month_rate",
- "ret_3y",
- "ret_1y",
- "ret_6m",
- "ret_3m",
- ]
- ]
- )
- if len(top_qualified)
- else "No portfolio cleared all target filters.",
- "",
- "## Taker/taker stress survivors",
- "",
- markdown_table(
- stress_qualified.head(10)[
- [
- "cost_model",
- "portfolio",
- "mode",
- "leg_count",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades_per_month",
- "positive_month_rate",
- "ret_3y",
- "ret_1y",
- "ret_6m",
- "ret_3m",
- ]
- ]
- )
- if len(stress_qualified)
- else "No taker/taker portfolio cleared all target filters.",
- "",
- "## Best portfolio horizons",
- "",
- markdown_table(
- best_horizon[
- [
- "horizon",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "win_rate",
- "payoff_ratio",
- "profit_factor",
- "risk_reward_ratio",
- "trades",
- "trades_per_month",
- "worst_month",
- "worst_month_return",
- "positive_month_rate",
- "negative_months",
- ]
- ]
- ),
- "",
- "## Best portfolio leg contribution",
- "",
- markdown_table(best_contrib[best_contrib["horizon"].isin(["full", "1y", "6m", "3m"])]),
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--max-legs-per-symbol", type=int, default=12)
- parser.add_argument("--max-leg-count", type=int, default=4)
- args = parser.parse_args()
- legs = build_legs()
- bars = sorted({leg.bar for leg in legs})
- data = {(symbol, bar): load_candles(symbol, bar, args.years) for symbol in PRIMARY_SYMBOLS for bar in bars}
- leg_returns: dict[str, list[LegReturn]] = {}
- leg_rows: list[dict[str, object]] = []
- daily: dict[str, pd.Series] = {}
- for index, leg in enumerate(legs, start=1):
- result = leg.run(data)
- start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True)
- end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True)
- for cost_model, roundtrip_cost in COST_MODELS.items():
- cost_key = f"{cost_model}:{leg.key}"
- returns = cost_trade_returns(leg, result, roundtrip_cost)
- leg_returns[cost_key] = returns
- series = returns_to_daily_equity(cost_key, returns, start, end)
- monthly = monthly_rows(cost_key, series)
- worst = float(monthly["return"].min()) if len(monthly) else 0.0
- row = {
- "leg": leg.key,
- "symbol": leg.symbol,
- "family": leg.family,
- "bar": leg.bar,
- "pair_signal": leg.pair,
- "start": start.strftime("%Y-%m-%d %H:%M"),
- "end": end.strftime("%Y-%m-%d %H:%M"),
- "trades": len(returns),
- "worst_month_return": worst,
- **metrics_from_daily(series),
- **trade_stats(returns, series.index[0], series.index[-1]),
- **monthly_stability(monthly),
- }
- add_cost_columns(row, cost_model, roundtrip_cost)
- leg_rows.append(row)
- daily[cost_key] = series
- print(f"done leg {index}/{len(legs)} {leg.key}")
- leg_total_all = pd.DataFrame(leg_rows).sort_values(
- ["calmar", "annualized_return", "trades_per_month"],
- ascending=[False, False, False],
- )
- primary_leg_total = leg_total_all[leg_total_all["cost_model"] == PRIMARY_COST_MODEL]
- selected: list[str] = []
- for symbol in PRIMARY_SYMBOLS:
- symbol_rows = primary_leg_total[primary_leg_total["symbol"] == symbol].head(args.max_legs_per_symbol)
- selected.extend(str(leg) for leg in symbol_rows["leg"])
- if len(set(selected)) < 2:
- raise RuntimeError("not enough selected BTC/ETH legs to build portfolios")
- selected = list(dict.fromkeys(selected))
- key_to_leg = {leg.key: leg for leg in legs}
- primary_daily_key = lambda key: f"{PRIMARY_COST_MODEL}:{key}"
- common_start = max(daily[primary_daily_key(key)].index[0] for key in selected)
- common_end = min(daily[primary_daily_key(key)].index[-1] for key in selected)
- selected_daily = {
- cost_model: {
- key: daily[f"{cost_model}:{key}"][(daily[f"{cost_model}:{key}"].index >= common_start) & (daily[f"{cost_model}:{key}"].index <= common_end)]
- for key in selected
- }
- for cost_model in COST_MODELS
- }
- leg_metrics = {row["leg"]: row for row in leg_rows if row["cost_model"] == PRIMARY_COST_MODEL}
- portfolio_rows: list[dict[str, object]] = []
- horizon_rows_output: list[dict[str, object]] = []
- monthly_frames: list[pd.DataFrame] = []
- equity_frames: list[pd.DataFrame] = []
- contribution_rows: list[dict[str, object]] = []
- combo_index = 0
- for leg_count in range(2, min(args.max_leg_count, len(selected)) + 1):
- for legs_tuple in combinations(selected, leg_count):
- symbols = {key_to_leg[key].symbol for key in legs_tuple}
- if not set(PRIMARY_SYMBOLS).issubset(symbols):
- continue
- if len({key_to_leg[key].family for key in legs_tuple}) < min(2, leg_count):
- continue
- for mode in ("equal", "risk"):
- combo_index += 1
- if mode == "equal":
- weights = pd.Series(1.0 / leg_count, index=legs_tuple)
- else:
- raw = pd.Series({key: 1.0 / max(float(leg_metrics[key]["max_drawdown"]), 0.02) for key in legs_tuple})
- weights = raw / raw.sum()
- name = f"{mode}-{leg_count}-hf{combo_index:05d}"
- for cost_model, roundtrip_cost in COST_MODELS.items():
- returns = pd.DataFrame({key: selected_daily[cost_model][key].pct_change().fillna(0.0) for key in legs_tuple}).dropna()
- portfolio_returns = returns.mul(weights, axis=1).sum(axis=1)
- series = explore.INITIAL_EQUITY * (1.0 + portfolio_returns).cumprod()
- series.iloc[0] = explore.INITIAL_EQUITY
- series.name = name
- cost_leg_returns = {key: leg_returns[f"{cost_model}:{key}"] for key in legs_tuple}
- weighted_trade_returns = split_weighted_returns(legs_tuple, weights, cost_leg_returns)
- monthly = monthly_rows(name, series)
- monthly["cost_model"] = cost_model
- horizons = horizon_metrics(name, cost_model, series, weighted_trade_returns, monthly)
- recent = {row["horizon"]: float(row["total_return"]) for row in horizons}
- recent_positive = recent["1y"] > 0.0 and recent["6m"] > 0.0 and recent["3m"] > 0.0
- stats = trade_stats(weighted_trade_returns, series.index[0], series.index[-1])
- metrics = metrics_from_daily(series)
- worst = monthly.sort_values("return").iloc[0]
- qualified = (
- stats["trades_per_month"] >= MIN_TRADES_PER_MONTH
- and metrics["max_drawdown"] < MAX_DRAWDOWN
- and recent_positive
- )
- row = {
- "portfolio": name,
- "mode": mode,
- "leg_count": leg_count,
- "legs": ";".join(legs_tuple),
- "weights_json": json.dumps({key: float(weights[key]) for key in legs_tuple}, separators=(",", ":")),
- "symbols": ",".join(sorted(symbols)),
- "families": ",".join(sorted({key_to_leg[key].family for key in legs_tuple})),
- "start": series.index[0].strftime("%Y-%m-%d"),
- "end": series.index[-1].strftime("%Y-%m-%d"),
- "worst_month": str(worst["month"]),
- "worst_month_return": float(worst["return"]),
- "recent_positive": recent_positive,
- "qualified": qualified,
- **metrics,
- **stats,
- **monthly_stability(monthly),
- }
- add_cost_columns(row, cost_model, roundtrip_cost)
- portfolio_rows.append(row)
- horizon_rows_output.extend(horizons)
- monthly_frames.append(monthly)
- equity_frames.append(
- pd.DataFrame(
- {
- "portfolio": name,
- "cost_model": cost_model,
- "date": series.index.strftime("%Y-%m-%d"),
- "equity": series.to_numpy(),
- }
- )
- )
- contribution_rows.extend(
- leg_contribution_rows(name, cost_model, legs_tuple, weights, cost_leg_returns, series.index[-1])
- )
- portfolio_total = pd.DataFrame(portfolio_rows).sort_values(
- ["cost_model", "qualified", "calmar", "annualized_return", "trades_per_month", "worst_month_return"],
- ascending=[True, False, False, False, False, False],
- )
- primary_top = portfolio_total[portfolio_total["cost_model"] == PRIMARY_COST_MODEL].head(50)
- top_pairs = set(zip(primary_top["cost_model"], primary_top["portfolio"]))
- top_pairs.update(zip(portfolio_total[portfolio_total["cost_model"] == "taker_taker"].head(50)["cost_model"], portfolio_total[portfolio_total["cost_model"] == "taker_taker"].head(50)["portfolio"]))
- horizon = pd.DataFrame(horizon_rows_output)
- horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True)
- horizon = horizon[
- horizon.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1)
- ].sort_values(["cost_model", "portfolio", "horizon"])
- portfolio_metadata = portfolio_total[
- ["cost_model", "portfolio", "mode", "leg_count", "legs", "weights_json", "symbols", "families", "recent_positive"]
- ]
- qualified = risk_qualified(horizon).merge(portfolio_metadata, on=["cost_model", "portfolio"], how="left")
- robust = robust_survivors(horizon).merge(portfolio_metadata, on=["cost_model", "portfolio"], how="left")
- monthly_all = pd.concat(monthly_frames, ignore_index=True)
- monthly_all = monthly_all[monthly_all.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1)]
- worst_months = monthly_all.sort_values("return").head(100)
- equity = pd.concat(equity_frames, ignore_index=True)
- equity = equity[equity.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1)]
- leg_contrib = pd.DataFrame(contribution_rows)
- leg_contrib = leg_contrib[
- leg_contrib.apply(lambda row: (row["cost_model"], row["portfolio"]) in top_pairs, axis=1)
- ].sort_values(["cost_model", "portfolio", "horizon", "contribution_return_sum"], ascending=[True, True, True, False])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- leg_path = args.output_dir / f"{PREFIX}-legs.csv"
- total_path = args.output_dir / f"{PREFIX}-total.csv"
- qualified_path = args.output_dir / f"{PREFIX}-qualified.csv"
- robust_path = args.output_dir / f"{PREFIX}-robust-survivors.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizon.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly.csv"
- worst_path = args.output_dir / f"{PREFIX}-worst-months.csv"
- contrib_path = args.output_dir / f"{PREFIX}-leg-contribution.csv"
- equity_path = args.output_dir / f"{PREFIX}-equity.csv"
- summary_path = args.output_dir / f"{PREFIX}-summary.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- leg_total_all.to_csv(leg_path, index=False)
- portfolio_total.to_csv(total_path, index=False)
- qualified.to_csv(qualified_path, index=False)
- robust.to_csv(robust_path, index=False)
- horizon.to_csv(horizon_path, index=False)
- monthly_all.to_csv(monthly_path, index=False)
- worst_months.to_csv(worst_path, index=False)
- leg_contrib.to_csv(contrib_path, index=False)
- equity.to_csv(equity_path, index=False)
- summary_path.write_text(
- json.dumps(
- {
- "portfolio_count": int(len(portfolio_total)),
- "qualified_count": int(len(qualified)),
- "robust_survivor_count": int(len(robust)),
- "selected_leg_count": int(len(selected)),
- "best_portfolio": portfolio_total.iloc[0].to_dict() if len(portfolio_total) else {},
- },
- indent=2,
- default=str,
- )
- + "\n",
- encoding="utf-8",
- )
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years} --max-legs-per-symbol {args.max_legs_per_symbol} --max-leg-count {args.max_leg_count}"
- output_files = [leg_path, total_path, qualified_path, robust_path, horizon_path, monthly_path, worst_path, contrib_path, equity_path, summary_path, report_path]
- report_path.write_text(write_report(command, output_files, portfolio_total, qualified, robust, horizon, leg_contrib), encoding="utf-8")
- print(portfolio_total.head(20).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|