| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796 |
- from __future__ import annotations
- import argparse
- import importlib.util
- import json
- import sys
- from dataclasses import asdict, dataclass
- from pathlib import Path
- from statistics import median
- import pandas as pd
- from okx_codex_trader.models import Candle
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- INITIAL_EQUITY = 10_000.0
- LEVERAGE = 3
- BAR = "15m"
- REPORT_DIR = Path("reports/ultrashort")
- OUTPUT_PREFIX = "bbmr-risk"
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- COST_MODELS = {
- "gross": 0.0,
- "maker_taker": 0.0021,
- "taker_taker": 0.0030,
- }
- PRIMARY_COST_MODEL = "maker_taker"
- @dataclass(frozen=True)
- class RiskConfig:
- band_length: int
- std_multiplier: float
- bandwidth_lookback: int
- stop_loss_pct: float
- take_profit_pct: float | None
- max_hold_bars: int | None
- cooldown_bars: int
- vol_lookback: int
- vol_cap: float | None
- trend_sma: int
- long_regime: str
- short_regime: str
- neutral_distance: float | None
- session: str
- corr_lookback: int
- corr_min: float | None
- corr_max: float | None
- side_mode: str
- @property
- def name(self) -> str:
- fields = [
- f"l{self.band_length}",
- f"m{self.std_multiplier:g}",
- f"sl{self.stop_loss_pct:g}",
- ]
- if self.take_profit_pct is not None:
- fields.append(f"tp{self.take_profit_pct:g}")
- if self.max_hold_bars is not None:
- fields.append(f"mh{self.max_hold_bars}")
- if self.cooldown_bars:
- fields.append(f"cd{self.cooldown_bars}")
- if self.vol_cap is not None:
- fields.append(f"vc{self.vol_cap:g}")
- if self.long_regime != "any" or self.short_regime != "any":
- fields.append(f"lr{self.long_regime}-sr{self.short_regime}")
- if self.neutral_distance is not None:
- fields.append(f"nd{self.neutral_distance:g}")
- if self.session != "all":
- fields.append(f"s{self.session}")
- if self.corr_min is not None or self.corr_max is not None:
- fields.append(f"corr{self.corr_min}_{self.corr_max}")
- if self.side_mode != "both":
- fields.append(self.side_mode)
- return "bbmr-" + "-".join(fields)
- def load_explore_module():
- path = Path(__file__).resolve().with_name("explore_ultrashort.py")
- spec = importlib.util.spec_from_file_location("explore_ultrashort", path)
- if spec is None or spec.loader is None:
- raise RuntimeError("cannot load explore_ultrashort.py")
- module = importlib.util.module_from_spec(spec)
- sys.modules[spec.name] = module
- spec.loader.exec_module(module)
- return module
- def load_cached_history(explore, symbol: str, bar: str, years: float) -> list[Candle]:
- requested = explore.history_bars_for_years(bar, years)
- candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar)
- if not candles:
- raise RuntimeError(f"missing cached candles: {symbol} {bar}")
- return candles[-requested:] if len(candles) > requested else candles
- def align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]:
- right_by_ts = {candle.ts: candle for candle in right}
- left_out: list[Candle] = []
- right_out: list[Candle] = []
- for candle in left:
- other = right_by_ts.get(candle.ts)
- if other is None:
- continue
- left_out.append(candle)
- right_out.append(other)
- return left_out, right_out
- def format_ts(ts: int) -> str:
- return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M")
- def close_trade(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- candle: Candle,
- exit_price: float,
- roundtrip_cost: float,
- ) -> tuple[float, bool]:
- gross_exit_equity = trade_equity(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- cost = float(position["margin_used"]) * roundtrip_cost
- exit_equity = gross_exit_equity - cost
- pnl = exit_equity - float(position["margin_used"])
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": format_ts(int(position["entry_time"])),
- "exit_time": format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / float(position["margin_used"]) * 100.0, 4),
- "cost": round(cost, 4),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- return exit_equity, exit_equity > float(position["margin_used"])
- def in_regime(regime: str, close: float, trend: float, neutral_distance: float | None) -> bool:
- if regime == "any":
- return True
- if regime == "above":
- return close > trend
- if regime == "below":
- return close < trend
- if regime == "neutral":
- if neutral_distance is None:
- return True
- return abs(close / trend - 1.0) <= neutral_distance
- raise ValueError(f"unknown regime: {regime}")
- def in_session(session: str, ts: int) -> bool:
- if session == "all":
- return True
- hour = pd.to_datetime(ts, unit="ms", utc=True).hour
- if session == "asia":
- return 0 <= hour < 8
- if session == "eu_us":
- return 8 <= hour < 24
- if session == "us":
- return 13 <= hour < 22
- raise ValueError(f"unknown session: {session}")
- def exit_price_for_risk_hit(position: dict[str, object], candle: Candle, take_profit_price: float | None) -> float | None:
- side = str(position["side"])
- stop_price = float(position["stop_price"])
- if side == "long":
- if candle.open <= stop_price:
- return candle.open
- if take_profit_price is not None and candle.open >= take_profit_price:
- return candle.open
- stop_hit = candle.low <= stop_price
- take_profit_hit = take_profit_price is not None and candle.high >= take_profit_price
- else:
- if candle.open >= stop_price:
- return candle.open
- if take_profit_price is not None and candle.open <= take_profit_price:
- return candle.open
- stop_hit = candle.high >= stop_price
- take_profit_hit = take_profit_price is not None and candle.low <= take_profit_price
- if stop_hit:
- return stop_price
- if take_profit_hit:
- return take_profit_price
- return None
- def run_bbmr_risk_segment(
- *,
- candles: list[Candle],
- peer_candles: list[Candle],
- config: RiskConfig,
- roundtrip_cost: float = 0.0,
- ) -> SegmentResult:
- closes = pd.Series([candle.close for candle in candles], dtype=float)
- peer_closes = pd.Series([candle.close for candle in peer_candles], dtype=float)
- middle = closes.rolling(config.band_length).mean().tolist()
- stdev = closes.rolling(config.band_length).std(ddof=0).tolist()
- upper = [
- float("nan") if avg != avg or std != std else avg + config.std_multiplier * std
- for avg, std in zip(middle, stdev)
- ]
- lower = [
- float("nan") if avg != avg or std != std else avg - config.std_multiplier * std
- for avg, std in zip(middle, stdev)
- ]
- bandwidth = [
- float("nan") if up != up or lo != lo or avg != avg or avg == 0.0 else (up - lo) / avg
- for up, lo, avg in zip(upper, lower, middle)
- ]
- returns = closes.pct_change()
- realized_vol = returns.rolling(config.vol_lookback).std(ddof=1).tolist()
- trend = closes.rolling(config.trend_sma).mean().tolist()
- corr = returns.rolling(config.corr_lookback).corr(peer_closes.pct_change()).tolist()
- warmup_bars = max(
- config.band_length + config.bandwidth_lookback,
- config.vol_lookback,
- config.trend_sma,
- config.corr_lookback if config.corr_min is not None or config.corr_max is not None else 0,
- )
- equity = INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- cooldown_until = -1
- pending_entry_side: str | None = None
- pending_exit = False
- position: dict[str, object] | None = None
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- for index in range(warmup_bars, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- equity, won = close_trade(
- trades=trades,
- exits=exits,
- position=position,
- candle=candle,
- exit_price=candle.open,
- roundtrip_cost=roundtrip_cost,
- )
- wins += 1 if won else 0
- position = None
- pending_exit = False
- cooldown_until = index + config.cooldown_bars
- if pending_entry_side is not None and position is None and equity > 0.0:
- entry_price = candle.open
- position = {
- "side": pending_entry_side,
- "entry_time": candle.ts,
- "entry_price": entry_price,
- "entry_index": index,
- "margin_used": equity,
- "stop_price": entry_price * (1.0 - config.stop_loss_pct if pending_entry_side == "long" else 1.0 + config.stop_loss_pct),
- }
- entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side})
- pending_entry_side = None
- current_equity = equity
- if position is not None:
- take_profit_price = None
- if config.take_profit_pct is not None:
- take_profit_price = float(position["entry_price"]) * (
- 1.0 + config.take_profit_pct if position["side"] == "long" else 1.0 - config.take_profit_pct
- )
- exit_price = exit_price_for_risk_hit(position, candle, take_profit_price)
- if exit_price is not None:
- equity, won = close_trade(
- trades=trades,
- exits=exits,
- position=position,
- candle=candle,
- exit_price=exit_price,
- roundtrip_cost=roundtrip_cost,
- )
- wins += 1 if won else 0
- current_equity = equity
- position = None
- cooldown_until = index + config.cooldown_bars
- if position is not None:
- current_equity = mark_to_market(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1 or equity <= 0.0:
- continue
- current_middle = middle[index]
- if position is not None:
- middle_exit = (
- position["side"] == "long" and current_middle == current_middle and candle.close >= float(current_middle)
- ) or (
- position["side"] == "short" and current_middle == current_middle and candle.close <= float(current_middle)
- )
- max_hold_exit = config.max_hold_bars is not None and index - int(position["entry_index"]) >= config.max_hold_bars
- if middle_exit or max_hold_exit:
- pending_exit = True
- continue
- if index < cooldown_until or not in_session(config.session, candle.ts):
- continue
- current_vol = realized_vol[index]
- if config.vol_cap is not None and (current_vol != current_vol or current_vol > config.vol_cap):
- continue
- current_corr = corr[index]
- if config.corr_min is not None and (current_corr != current_corr or current_corr < config.corr_min):
- continue
- if config.corr_max is not None and (current_corr != current_corr or current_corr > config.corr_max):
- continue
- current_trend = trend[index]
- if current_trend != current_trend:
- continue
- previous_bandwidths = [value for value in bandwidth[max(0, index - config.bandwidth_lookback) : index] if value == value]
- current_bandwidth = bandwidth[index]
- if len(previous_bandwidths) < config.bandwidth_lookback or current_bandwidth != current_bandwidth:
- continue
- if current_bandwidth > median(previous_bandwidths):
- continue
- if (
- config.side_mode in ("both", "long")
- and lower[index] == lower[index]
- and candle.close < float(lower[index])
- and in_regime(config.long_regime, candle.close, float(current_trend), config.neutral_distance)
- ):
- pending_entry_side = "long"
- elif (
- config.side_mode in ("both", "short")
- and upper[index] == upper[index]
- and candle.close > float(upper[index])
- and in_regime(config.short_regime, candle.close, float(current_trend), config.neutral_distance)
- ):
- pending_entry_side = "short"
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
- win_rate=wins / trade_count if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=candles[warmup_bars:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def equity_frame(result: SegmentResult) -> pd.DataFrame:
- frame = pd.DataFrame(result.equity_curve)
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame[["ts", "equity"]]
- def drawdown(values: pd.Series) -> float:
- peak = values.cummax()
- return float(((peak - values) / peak).max()) if len(values) else 0.0
- def monthly_returns(frame: pd.DataFrame) -> pd.DataFrame:
- month_end = frame.set_index("ts")["equity"].resample("ME").last().ffill()
- month_start = month_end.shift(1)
- if len(month_end):
- month_start.iloc[0] = float(frame["equity"].iloc[0])
- return pd.DataFrame(
- {
- "period": month_end.index.tz_localize(None).to_period("M").astype(str),
- "return": month_end.to_numpy() / month_start.to_numpy() - 1.0,
- "end_equity": month_end.to_numpy(),
- }
- )
- def horizon_frame(frame: pd.DataFrame, offset: pd.DateOffset | None) -> pd.DataFrame:
- if offset is None:
- out = frame.copy()
- else:
- cutoff = frame["ts"].iloc[-1] - offset
- out = frame[frame["ts"] >= cutoff].copy()
- if out.empty:
- out = frame.copy()
- out["equity"] = out["equity"] / float(out["equity"].iloc[0]) * INITIAL_EQUITY
- return out
- def parse_exit_time(trade: dict[str, object]) -> pd.Timestamp:
- return pd.Timestamp(str(trade["exit_time"]), tz="UTC")
- def parse_entry_time(trade: dict[str, object]) -> pd.Timestamp:
- return pd.Timestamp(str(trade["entry_time"]), tz="UTC")
- def summarize(label: str, symbol: str, frame: pd.DataFrame, trades: list[dict[str, object]], horizon: str) -> dict[str, object]:
- start = frame["ts"].iloc[0]
- end = frame["ts"].iloc[-1]
- years = max((end - start).total_seconds() / (365.0 * 24 * 60 * 60), 1e-9)
- total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0)
- annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0
- max_dd = drawdown(frame["equity"])
- scoped_trades = [trade for trade in trades if parse_entry_time(trade) >= start and parse_exit_time(trade) >= start]
- wins = [float(trade["pnl"]) for trade in scoped_trades if float(trade["pnl"]) > 0.0]
- losses = [float(trade["pnl"]) for trade in scoped_trades if float(trade["pnl"]) < 0.0]
- gross_profit = sum(wins)
- gross_loss_abs = abs(sum(losses))
- months = max((end - start).total_seconds() / (30.4375 * 24 * 60 * 60), 1e-9)
- monthly = monthly_returns(frame)
- worst = monthly.loc[monthly["return"].idxmin()] if len(monthly) else None
- positive_months = int((monthly["return"] > 0.0).sum()) if len(monthly) else 0
- negative_months = int((monthly["return"] < 0.0).sum()) if len(monthly) else 0
- return {
- "label": label,
- "symbol": symbol,
- "horizon": horizon,
- "start": start.strftime("%Y-%m-%d"),
- "end": end.strftime("%Y-%m-%d"),
- "total_return": total_return,
- "annualized_return": annualized,
- "max_drawdown": max_dd,
- "calmar": annualized / max_dd if max_dd else 0.0,
- "win_rate": len(wins) / len(scoped_trades) if scoped_trades else 0.0,
- "payoff_ratio": (sum(wins) / len(wins)) / (gross_loss_abs / len(losses)) if wins and losses else 0.0,
- "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0,
- "return_drawdown_ratio": total_return / max_dd if max_dd else 0.0,
- "trades": len(scoped_trades),
- "trades_per_month": len(scoped_trades) / months,
- "worst_month": str(worst["period"]) if worst is not None else "",
- "worst_month_return": float(worst["return"]) if worst is not None else 0.0,
- "months": len(monthly),
- "positive_month_rate": positive_months / len(monthly) if len(monthly) else 0.0,
- "negative_months": negative_months,
- }
- def summarize_all(label: str, symbol: str, frame: pd.DataFrame, trades: list[dict[str, object]]) -> list[dict[str, object]]:
- return [
- summarize(label, symbol, horizon_frame(frame, offset), trades, horizon)
- for horizon, offset in HORIZONS
- ]
- def add_cost_columns(row: dict[str, object], cost_model: str, roundtrip_cost: float) -> None:
- row["cost_model"] = cost_model
- row["roundtrip_cost_on_margin"] = roundtrip_cost
- row["net_total_return"] = row["total_return"]
- row["net_annualized_return"] = row["annualized_return"]
- row["net_max_drawdown"] = row["max_drawdown"]
- row["net_calmar"] = row["calmar"]
- def combine_equity_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
- combined = pd.concat(
- [frame.set_index("ts")["equity"].rename(str(index)) for index, frame in enumerate(frames)],
- axis=1,
- sort=True,
- ).sort_index().ffill().dropna()
- return pd.DataFrame({"ts": combined.index, "equity": combined.sum(axis=1).to_numpy()})
- def build_configs() -> list[RiskConfig]:
- base_params = [
- (20, 2.0, 0.005),
- (20, 2.5, 0.005),
- (20, 2.5, 0.008),
- (30, 2.0, 0.005),
- (30, 2.5, 0.005),
- ]
- recipes = [
- {},
- {"vol_cap": 0.006},
- {"vol_cap": 0.009},
- {"cooldown_bars": 8},
- {"cooldown_bars": 32},
- {"max_hold_bars": 96},
- {"max_hold_bars": 288},
- {"take_profit_pct": 0.006},
- {"take_profit_pct": 0.012},
- {"session": "asia"},
- {"session": "eu_us"},
- {"long_regime": "neutral", "short_regime": "neutral", "neutral_distance": 0.03},
- {"long_regime": "neutral", "short_regime": "neutral", "neutral_distance": 0.06},
- {"long_regime": "below", "short_regime": "above"},
- {"long_regime": "above", "short_regime": "below"},
- {"side_mode": "long", "long_regime": "below"},
- {"side_mode": "short", "short_regime": "above"},
- {"vol_cap": 0.009, "cooldown_bars": 8, "max_hold_bars": 96},
- {"vol_cap": 0.009, "take_profit_pct": 0.012, "max_hold_bars": 96},
- {"vol_cap": 0.006, "long_regime": "neutral", "short_regime": "neutral", "neutral_distance": 0.06},
- {"cooldown_bars": 8, "long_regime": "below", "short_regime": "above", "max_hold_bars": 288},
- {"corr_min": 0.35},
- {"corr_max": 0.75},
- {"corr_min": -0.20, "corr_max": 0.75, "vol_cap": 0.009},
- ]
- configs: list[RiskConfig] = []
- for band_length, multiplier, stop_loss in base_params:
- for recipe in recipes:
- configs.append(
- RiskConfig(
- band_length=band_length,
- std_multiplier=multiplier,
- bandwidth_lookback=50,
- stop_loss_pct=stop_loss,
- take_profit_pct=recipe.get("take_profit_pct"),
- max_hold_bars=recipe.get("max_hold_bars"),
- cooldown_bars=int(recipe.get("cooldown_bars", 0)),
- vol_lookback=96,
- vol_cap=recipe.get("vol_cap"),
- trend_sma=480,
- long_regime=str(recipe.get("long_regime", "any")),
- short_regime=str(recipe.get("short_regime", "any")),
- neutral_distance=recipe.get("neutral_distance"),
- session=str(recipe.get("session", "all")),
- corr_lookback=192,
- corr_min=recipe.get("corr_min"),
- corr_max=recipe.get("corr_max"),
- side_mode=str(recipe.get("side_mode", "both")),
- )
- )
- return configs
- def score(full: dict[str, object], recent: dict[str, object]) -> float:
- trades_per_month = float(full["trades_per_month"])
- if trades_per_month < 10.0:
- return -999.0
- if float(recent["total_return"]) < -0.05:
- return -999.0
- return (
- float(full["calmar"])
- + float(recent["calmar"]) * 0.5
- + min(trades_per_month, 60.0) / 100.0
- - float(full["max_drawdown"]) * 1.5
- )
- def pct(value: float) -> str:
- return f"{value * 100:.2f}%"
- def write_report(summary: pd.DataFrame, portfolio: pd.DataFrame, selected: list[dict[str, object]]) -> None:
- primary_summary = summary[summary["cost_model"] == PRIMARY_COST_MODEL]
- stress_summary = summary[summary["cost_model"] == "taker_taker"]
- primary_portfolio = portfolio[portfolio["cost_model"] == PRIMARY_COST_MODEL]
- stress_portfolio = portfolio[portfolio["cost_model"] == "taker_taker"]
- robust_single = robust_survivors(summary[summary["cost_model"].isin(("maker_taker", "taker_taker"))])
- robust_combo = robust_survivors(portfolio[portfolio["cost_model"].isin(("maker_taker", "taker_taker"))])
- lines = [
- "# BTC/ETH BBMR Risk Variant Search",
- "",
- "Primary ranking uses maker/taker roundtrip margin cost 0.21%. Taker/taker stress uses 0.30%. Funding and slippage remain excluded.",
- "",
- f"Strict robust survivors with positive full/3y/1y/6m/3m net return and Calmar: single-symbol {len(robust_single)}, portfolio {len(robust_combo)}.",
- "",
- "Selection rule: rank candidates with at least 10 trades/month, lower full-period drawdown, and non-collapsing recent 1y results.",
- "",
- "## Risk-Qualified Single-Symbol Rows: maker/taker",
- "",
- ]
- for row in risk_qualified(primary_summary).head(12).to_dict("records"):
- lines.append(
- f"- {row['symbol']} {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, "
- f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, "
- f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}, "
- f"worst month {row['worst_month']} {pct(float(row['worst_month_return']))}, positive months {pct(float(row['positive_month_rate']))}"
- )
- lines.extend(["", "## Risk-Qualified Portfolio Rows: maker/taker", ""])
- for row in risk_qualified(primary_portfolio).head(10).to_dict("records"):
- lines.append(
- f"- {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, "
- f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, "
- f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}, "
- f"worst month {row['worst_month']} {pct(float(row['worst_month_return']))}, positive months {pct(float(row['positive_month_rate']))}"
- )
- lines.extend(["", "## Taker/taker stress survivors", ""])
- for row in risk_qualified(stress_summary).head(8).to_dict("records"):
- lines.append(
- f"- {row['symbol']} {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, "
- f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, "
- f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}"
- )
- for row in risk_qualified(stress_portfolio).head(5).to_dict("records"):
- lines.append(
- f"- portfolio {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, "
- f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, "
- f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}"
- )
- lines.extend(["", "## Maker/taker near misses", ""])
- near_miss_cols = ["symbol", "label", "total_return", "annualized_return", "max_drawdown", "calmar", "trades_per_month", "positive_month_rate", "worst_month_return"]
- for row in primary_summary[primary_summary["horizon"] == "full"].sort_values("score", ascending=False).head(5)[near_miss_cols].to_dict("records"):
- lines.append(
- f"- {row['symbol']} {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, "
- f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, "
- f"positive months {pct(float(row['positive_month_rate']))}, worst month {pct(float(row['worst_month_return']))}"
- )
- for row in primary_portfolio[primary_portfolio["horizon"] == "full"].sort_values("score", ascending=False).head(5)[near_miss_cols].to_dict("records"):
- lines.append(
- f"- portfolio {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, "
- f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, "
- f"positive months {pct(float(row['positive_month_rate']))}, worst month {pct(float(row['worst_month_return']))}"
- )
- lines.extend(["", "## Selected Configs", ""])
- for item in selected:
- lines.append(f"- {item['symbol']} {item['label']} ({item['cost_model']}): `{json.dumps(item['config'], separators=(',', ':'))}`")
- (REPORT_DIR / f"{OUTPUT_PREFIX}-summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- def risk_qualified(frame: pd.DataFrame) -> pd.DataFrame:
- full = frame[frame["horizon"] == "full"].copy()
- recent = full
- for horizon in ("3y", "1y", "6m", "3m"):
- part = frame[frame["horizon"] == horizon][["cost_model", "symbol", "label", "total_return", "max_drawdown", "calmar"]].rename(
- columns={"total_return": f"ret_{horizon}", "max_drawdown": f"dd_{horizon}", "calmar": f"calmar_{horizon}"}
- )
- recent = recent.merge(part, on=["cost_model", "symbol", "label"], how="inner")
- dd_limit = recent["symbol"].map({"BTC": 0.5788, "ETH": 0.7199, "BTC+ETH": 0.5788}).fillna(0.5788)
- qualified = recent[
- (recent["trades_per_month"] >= 10.0)
- & (recent["max_drawdown"] < dd_limit)
- & (recent["ret_1y"] > -0.05)
- & (recent["ret_6m"] > -0.05)
- & (recent["ret_3m"] > -0.05)
- ].copy()
- qualified["risk_rank"] = (
- qualified["calmar"] * 2.0
- + qualified["annualized_return"]
- + qualified["ret_1y"] * 0.5
- - qualified["max_drawdown"]
- )
- return qualified.sort_values(["symbol", "risk_rank"], ascending=[True, False])
- def robust_survivors(frame: pd.DataFrame) -> pd.DataFrame:
- full = frame[frame["horizon"] == "full"].copy()
- recent = full
- for horizon in ("3y", "1y", "6m", "3m"):
- part = frame[frame["horizon"] == horizon][["cost_model", "symbol", "label", "total_return", "calmar"]].rename(
- columns={"total_return": f"ret_{horizon}", "calmar": f"calmar_{horizon}"}
- )
- recent = recent.merge(part, on=["cost_model", "symbol", "label"], how="inner")
- return recent[
- (recent["trades_per_month"] >= 10.0)
- & (recent["total_return"] > 0.0)
- & (recent["calmar"] > 0.0)
- & (recent["ret_3y"] > 0.0)
- & (recent["ret_1y"] > 0.0)
- & (recent["ret_6m"] > 0.0)
- & (recent["ret_3m"] > 0.0)
- & (recent["calmar_3y"] > 0.0)
- & (recent["calmar_1y"] > 0.0)
- & (recent["calmar_6m"] > 0.0)
- & (recent["calmar_3m"] > 0.0)
- ].copy()
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=10.0)
- parser.add_argument("--top-per-symbol", type=int, default=12)
- args = parser.parse_args()
- REPORT_DIR.mkdir(parents=True, exist_ok=True)
- explore = load_explore_module()
- btc_raw = load_cached_history(explore, "BTC-USDT-SWAP", BAR, args.years)
- eth_raw = load_cached_history(explore, "ETH-USDT-SWAP", BAR, args.years)
- btc, eth = align_pair(btc_raw, eth_raw)
- symbols = {
- "BTC": (btc, eth),
- "ETH": (eth, btc),
- }
- result_rows: list[dict[str, object]] = []
- selected_rows: list[dict[str, object]] = []
- selected_configs: dict[tuple[str, str], RiskConfig] = {}
- result_cache: dict[tuple[str, str, str], tuple[pd.DataFrame, list[dict[str, object]]]] = {}
- configs = build_configs()
- for symbol, (candles, peer) in symbols.items():
- ranked: list[dict[str, object]] = []
- for config in configs:
- for cost_model, roundtrip_cost in COST_MODELS.items():
- result = run_bbmr_risk_segment(candles=candles, peer_candles=peer, config=config, roundtrip_cost=roundtrip_cost)
- frame = equity_frame(result)
- result_cache[(symbol, config.name, cost_model)] = (frame, result.trades)
- rows = summarize_all(config.name, symbol, frame, result.trades)
- full = next(row for row in rows if row["horizon"] == "full")
- one_year = next(row for row in rows if row["horizon"] == "1y")
- candidate_score = score(full, one_year)
- for row in rows:
- add_cost_columns(row, cost_model, roundtrip_cost)
- row["score"] = candidate_score
- row.update(asdict(config))
- result_rows.append(row)
- if cost_model == PRIMARY_COST_MODEL:
- ranked.append({"score": candidate_score, "config": config, "result": result, "frame": frame, "rows": rows})
- ranked.sort(key=lambda item: float(item["score"]), reverse=True)
- for item in ranked[: args.top_per_symbol]:
- config = item["config"]
- key = (symbol, config.name)
- selected_configs[key] = config
- selected_rows.append({"symbol": symbol, "label": config.name, "cost_model": PRIMARY_COST_MODEL, "score": item["score"], "config": asdict(config)})
- summary = pd.DataFrame(result_rows)
- summary["horizon"] = pd.Categorical(summary["horizon"], categories=[name for name, _ in HORIZONS], ordered=True)
- summary = summary.sort_values(["symbol", "score", "horizon"], ascending=[True, False, True])
- summary.to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-all.csv", index=False)
- summary[summary["horizon"] == "full"].sort_values("score", ascending=False).head(40).to_csv(
- REPORT_DIR / f"{OUTPUT_PREFIX}-top.csv",
- index=False,
- )
- risk_qualified(summary).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-qualified.csv", index=False)
- net_summary = summary[summary["cost_model"].isin(("maker_taker", "taker_taker"))]
- robust_survivors(net_summary).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-robust-survivors.csv", index=False)
- portfolio_rows: list[dict[str, object]] = []
- btc_keys = [key for key in selected_configs if key[0] == "BTC"][: args.top_per_symbol]
- eth_keys = [key for key in selected_configs if key[0] == "ETH"][: args.top_per_symbol]
- for btc_key in btc_keys:
- for eth_key in eth_keys:
- label = f"{btc_key[1]} + {eth_key[1]}"
- for cost_model, roundtrip_cost in COST_MODELS.items():
- btc_frame, btc_trades = result_cache[(btc_key[0], btc_key[1], cost_model)]
- eth_frame, eth_trades = result_cache[(eth_key[0], eth_key[1], cost_model)]
- frame = combine_equity_frames([btc_frame, eth_frame])
- trades = btc_trades + eth_trades
- rows = summarize_all(label, "BTC+ETH", frame, trades)
- full = next(row for row in rows if row["horizon"] == "full")
- one_year = next(row for row in rows if row["horizon"] == "1y")
- portfolio_score = score(full, one_year)
- for row in rows:
- add_cost_columns(row, cost_model, roundtrip_cost)
- row["score"] = portfolio_score
- portfolio_rows.append(row)
- portfolio = pd.DataFrame(portfolio_rows)
- portfolio["horizon"] = pd.Categorical(portfolio["horizon"], categories=[name for name, _ in HORIZONS], ordered=True)
- portfolio = portfolio.sort_values(["score", "horizon"], ascending=[False, True])
- portfolio.to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-portfolio.csv", index=False)
- risk_qualified(portfolio).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-portfolio-qualified.csv", index=False)
- net_portfolio = portfolio[portfolio["cost_model"].isin(("maker_taker", "taker_taker"))]
- robust_survivors(net_portfolio).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-portfolio-robust-survivors.csv", index=False)
- Path(REPORT_DIR / f"{OUTPUT_PREFIX}-selected.json").write_text(
- json.dumps(selected_rows, indent=2),
- encoding="utf-8",
- )
- write_report(summary, portfolio, selected_rows)
- print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-all.csv'}")
- print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-top.csv'}")
- print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-portfolio.csv'}")
- print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-summary.md'}")
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|