from __future__ import annotations import argparse import importlib.util import json import sys from dataclasses import asdict, dataclass from pathlib import Path from statistics import median import pandas as pd from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity INITIAL_EQUITY = 10_000.0 LEVERAGE = 3 BAR = "15m" REPORT_DIR = Path("reports/ultrashort") OUTPUT_PREFIX = "bbmr-risk" HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) COST_MODELS = { "gross": 0.0, "maker_taker": 0.0021, "taker_taker": 0.0030, } PRIMARY_COST_MODEL = "maker_taker" @dataclass(frozen=True) class RiskConfig: band_length: int std_multiplier: float bandwidth_lookback: int stop_loss_pct: float take_profit_pct: float | None max_hold_bars: int | None cooldown_bars: int vol_lookback: int vol_cap: float | None trend_sma: int long_regime: str short_regime: str neutral_distance: float | None session: str corr_lookback: int corr_min: float | None corr_max: float | None side_mode: str @property def name(self) -> str: fields = [ f"l{self.band_length}", f"m{self.std_multiplier:g}", f"sl{self.stop_loss_pct:g}", ] if self.take_profit_pct is not None: fields.append(f"tp{self.take_profit_pct:g}") if self.max_hold_bars is not None: fields.append(f"mh{self.max_hold_bars}") if self.cooldown_bars: fields.append(f"cd{self.cooldown_bars}") if self.vol_cap is not None: fields.append(f"vc{self.vol_cap:g}") if self.long_regime != "any" or self.short_regime != "any": fields.append(f"lr{self.long_regime}-sr{self.short_regime}") if self.neutral_distance is not None: fields.append(f"nd{self.neutral_distance:g}") if self.session != "all": fields.append(f"s{self.session}") if self.corr_min is not None or self.corr_max is not None: fields.append(f"corr{self.corr_min}_{self.corr_max}") if self.side_mode != "both": fields.append(self.side_mode) return "bbmr-" + "-".join(fields) def load_explore_module(): path = Path(__file__).resolve().with_name("explore_ultrashort.py") spec = importlib.util.spec_from_file_location("explore_ultrashort", path) if spec is None or spec.loader is None: raise RuntimeError("cannot load explore_ultrashort.py") module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module spec.loader.exec_module(module) return module def load_cached_history(explore, symbol: str, bar: str, years: float) -> list[Candle]: requested = explore.history_bars_for_years(bar, years) candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar) if not candles: raise RuntimeError(f"missing cached candles: {symbol} {bar}") return candles[-requested:] if len(candles) > requested else candles def align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_out: list[Candle] = [] right_out: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is None: continue left_out.append(candle) right_out.append(other) return left_out, right_out def format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def close_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, roundtrip_cost: float, ) -> tuple[float, bool]: gross_exit_equity = trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) cost = float(position["margin_used"]) * roundtrip_cost exit_equity = gross_exit_equity - cost pnl = exit_equity - float(position["margin_used"]) trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": format_ts(int(position["entry_time"])), "exit_time": format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / float(position["margin_used"]) * 100.0, 4), "cost": round(cost, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, exit_equity > float(position["margin_used"]) def in_regime(regime: str, close: float, trend: float, neutral_distance: float | None) -> bool: if regime == "any": return True if regime == "above": return close > trend if regime == "below": return close < trend if regime == "neutral": if neutral_distance is None: return True return abs(close / trend - 1.0) <= neutral_distance raise ValueError(f"unknown regime: {regime}") def in_session(session: str, ts: int) -> bool: if session == "all": return True hour = pd.to_datetime(ts, unit="ms", utc=True).hour if session == "asia": return 0 <= hour < 8 if session == "eu_us": return 8 <= hour < 24 if session == "us": return 13 <= hour < 22 raise ValueError(f"unknown session: {session}") def exit_price_for_risk_hit(position: dict[str, object], candle: Candle, take_profit_price: float | None) -> float | None: side = str(position["side"]) stop_price = float(position["stop_price"]) if side == "long": if candle.open <= stop_price: return candle.open if take_profit_price is not None and candle.open >= take_profit_price: return candle.open stop_hit = candle.low <= stop_price take_profit_hit = take_profit_price is not None and candle.high >= take_profit_price else: if candle.open >= stop_price: return candle.open if take_profit_price is not None and candle.open <= take_profit_price: return candle.open stop_hit = candle.high >= stop_price take_profit_hit = take_profit_price is not None and candle.low <= take_profit_price if stop_hit: return stop_price if take_profit_hit: return take_profit_price return None def run_bbmr_risk_segment( *, candles: list[Candle], peer_candles: list[Candle], config: RiskConfig, roundtrip_cost: float = 0.0, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) peer_closes = pd.Series([candle.close for candle in peer_candles], dtype=float) middle = closes.rolling(config.band_length).mean().tolist() stdev = closes.rolling(config.band_length).std(ddof=0).tolist() upper = [ float("nan") if avg != avg or std != std else avg + config.std_multiplier * std for avg, std in zip(middle, stdev) ] lower = [ float("nan") if avg != avg or std != std else avg - config.std_multiplier * std for avg, std in zip(middle, stdev) ] bandwidth = [ float("nan") if up != up or lo != lo or avg != avg or avg == 0.0 else (up - lo) / avg for up, lo, avg in zip(upper, lower, middle) ] returns = closes.pct_change() realized_vol = returns.rolling(config.vol_lookback).std(ddof=1).tolist() trend = closes.rolling(config.trend_sma).mean().tolist() corr = returns.rolling(config.corr_lookback).corr(peer_closes.pct_change()).tolist() warmup_bars = max( config.band_length + config.bandwidth_lookback, config.vol_lookback, config.trend_sma, config.corr_lookback if config.corr_min is not None or config.corr_max is not None else 0, ) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 cooldown_until = -1 pending_entry_side: str | None = None pending_exit = False position: dict[str, object] | None = None trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, roundtrip_cost=roundtrip_cost, ) wins += 1 if won else 0 position = None pending_exit = False cooldown_until = index + config.cooldown_bars if pending_entry_side is not None and position is None and equity > 0.0: entry_price = candle.open position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": entry_price, "entry_index": index, "margin_used": equity, "stop_price": entry_price * (1.0 - config.stop_loss_pct if pending_entry_side == "long" else 1.0 + config.stop_loss_pct), } entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: take_profit_price = None if config.take_profit_pct is not None: take_profit_price = float(position["entry_price"]) * ( 1.0 + config.take_profit_pct if position["side"] == "long" else 1.0 - config.take_profit_pct ) exit_price = exit_price_for_risk_hit(position, candle, take_profit_price) if exit_price is not None: equity, won = close_trade( trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, roundtrip_cost=roundtrip_cost, ) wins += 1 if won else 0 current_equity = equity position = None cooldown_until = index + config.cooldown_bars if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue current_middle = middle[index] if position is not None: middle_exit = ( position["side"] == "long" and current_middle == current_middle and candle.close >= float(current_middle) ) or ( position["side"] == "short" and current_middle == current_middle and candle.close <= float(current_middle) ) max_hold_exit = config.max_hold_bars is not None and index - int(position["entry_index"]) >= config.max_hold_bars if middle_exit or max_hold_exit: pending_exit = True continue if index < cooldown_until or not in_session(config.session, candle.ts): continue current_vol = realized_vol[index] if config.vol_cap is not None and (current_vol != current_vol or current_vol > config.vol_cap): continue current_corr = corr[index] if config.corr_min is not None and (current_corr != current_corr or current_corr < config.corr_min): continue if config.corr_max is not None and (current_corr != current_corr or current_corr > config.corr_max): continue current_trend = trend[index] if current_trend != current_trend: continue previous_bandwidths = [value for value in bandwidth[max(0, index - config.bandwidth_lookback) : index] if value == value] current_bandwidth = bandwidth[index] if len(previous_bandwidths) < config.bandwidth_lookback or current_bandwidth != current_bandwidth: continue if current_bandwidth > median(previous_bandwidths): continue if ( config.side_mode in ("both", "long") and lower[index] == lower[index] and candle.close < float(lower[index]) and in_regime(config.long_regime, candle.close, float(current_trend), config.neutral_distance) ): pending_entry_side = "long" elif ( config.side_mode in ("both", "short") and upper[index] == upper[index] and candle.close > float(upper[index]) and in_regime(config.short_regime, candle.close, float(current_trend), config.neutral_distance) ): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def equity_frame(result: SegmentResult) -> pd.DataFrame: frame = pd.DataFrame(result.equity_curve) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame[["ts", "equity"]] def drawdown(values: pd.Series) -> float: peak = values.cummax() return float(((peak - values) / peak).max()) if len(values) else 0.0 def monthly_returns(frame: pd.DataFrame) -> pd.DataFrame: month_end = frame.set_index("ts")["equity"].resample("ME").last().ffill() month_start = month_end.shift(1) if len(month_end): month_start.iloc[0] = float(frame["equity"].iloc[0]) return pd.DataFrame( { "period": month_end.index.tz_localize(None).to_period("M").astype(str), "return": month_end.to_numpy() / month_start.to_numpy() - 1.0, "end_equity": month_end.to_numpy(), } ) def horizon_frame(frame: pd.DataFrame, offset: pd.DateOffset | None) -> pd.DataFrame: if offset is None: out = frame.copy() else: cutoff = frame["ts"].iloc[-1] - offset out = frame[frame["ts"] >= cutoff].copy() if out.empty: out = frame.copy() out["equity"] = out["equity"] / float(out["equity"].iloc[0]) * INITIAL_EQUITY return out def parse_exit_time(trade: dict[str, object]) -> pd.Timestamp: return pd.Timestamp(str(trade["exit_time"]), tz="UTC") def parse_entry_time(trade: dict[str, object]) -> pd.Timestamp: return pd.Timestamp(str(trade["entry_time"]), tz="UTC") def summarize(label: str, symbol: str, frame: pd.DataFrame, trades: list[dict[str, object]], horizon: str) -> dict[str, object]: start = frame["ts"].iloc[0] end = frame["ts"].iloc[-1] years = max((end - start).total_seconds() / (365.0 * 24 * 60 * 60), 1e-9) total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0 max_dd = drawdown(frame["equity"]) scoped_trades = [trade for trade in trades if parse_entry_time(trade) >= start and parse_exit_time(trade) >= start] wins = [float(trade["pnl"]) for trade in scoped_trades if float(trade["pnl"]) > 0.0] losses = [float(trade["pnl"]) for trade in scoped_trades if float(trade["pnl"]) < 0.0] gross_profit = sum(wins) gross_loss_abs = abs(sum(losses)) months = max((end - start).total_seconds() / (30.4375 * 24 * 60 * 60), 1e-9) monthly = monthly_returns(frame) worst = monthly.loc[monthly["return"].idxmin()] if len(monthly) else None positive_months = int((monthly["return"] > 0.0).sum()) if len(monthly) else 0 negative_months = int((monthly["return"] < 0.0).sum()) if len(monthly) else 0 return { "label": label, "symbol": symbol, "horizon": horizon, "start": start.strftime("%Y-%m-%d"), "end": end.strftime("%Y-%m-%d"), "total_return": total_return, "annualized_return": annualized, "max_drawdown": max_dd, "calmar": annualized / max_dd if max_dd else 0.0, "win_rate": len(wins) / len(scoped_trades) if scoped_trades else 0.0, "payoff_ratio": (sum(wins) / len(wins)) / (gross_loss_abs / len(losses)) if wins and losses else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, "return_drawdown_ratio": total_return / max_dd if max_dd else 0.0, "trades": len(scoped_trades), "trades_per_month": len(scoped_trades) / months, "worst_month": str(worst["period"]) if worst is not None else "", "worst_month_return": float(worst["return"]) if worst is not None else 0.0, "months": len(monthly), "positive_month_rate": positive_months / len(monthly) if len(monthly) else 0.0, "negative_months": negative_months, } def summarize_all(label: str, symbol: str, frame: pd.DataFrame, trades: list[dict[str, object]]) -> list[dict[str, object]]: return [ summarize(label, symbol, horizon_frame(frame, offset), trades, horizon) for horizon, offset in HORIZONS ] def add_cost_columns(row: dict[str, object], cost_model: str, roundtrip_cost: float) -> None: row["cost_model"] = cost_model row["roundtrip_cost_on_margin"] = roundtrip_cost row["net_total_return"] = row["total_return"] row["net_annualized_return"] = row["annualized_return"] row["net_max_drawdown"] = row["max_drawdown"] row["net_calmar"] = row["calmar"] def combine_equity_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: combined = pd.concat( [frame.set_index("ts")["equity"].rename(str(index)) for index, frame in enumerate(frames)], axis=1, sort=True, ).sort_index().ffill().dropna() return pd.DataFrame({"ts": combined.index, "equity": combined.sum(axis=1).to_numpy()}) def build_configs() -> list[RiskConfig]: base_params = [ (20, 2.0, 0.005), (20, 2.5, 0.005), (20, 2.5, 0.008), (30, 2.0, 0.005), (30, 2.5, 0.005), ] recipes = [ {}, {"vol_cap": 0.006}, {"vol_cap": 0.009}, {"cooldown_bars": 8}, {"cooldown_bars": 32}, {"max_hold_bars": 96}, {"max_hold_bars": 288}, {"take_profit_pct": 0.006}, {"take_profit_pct": 0.012}, {"session": "asia"}, {"session": "eu_us"}, {"long_regime": "neutral", "short_regime": "neutral", "neutral_distance": 0.03}, {"long_regime": "neutral", "short_regime": "neutral", "neutral_distance": 0.06}, {"long_regime": "below", "short_regime": "above"}, {"long_regime": "above", "short_regime": "below"}, {"side_mode": "long", "long_regime": "below"}, {"side_mode": "short", "short_regime": "above"}, {"vol_cap": 0.009, "cooldown_bars": 8, "max_hold_bars": 96}, {"vol_cap": 0.009, "take_profit_pct": 0.012, "max_hold_bars": 96}, {"vol_cap": 0.006, "long_regime": "neutral", "short_regime": "neutral", "neutral_distance": 0.06}, {"cooldown_bars": 8, "long_regime": "below", "short_regime": "above", "max_hold_bars": 288}, {"corr_min": 0.35}, {"corr_max": 0.75}, {"corr_min": -0.20, "corr_max": 0.75, "vol_cap": 0.009}, ] configs: list[RiskConfig] = [] for band_length, multiplier, stop_loss in base_params: for recipe in recipes: configs.append( RiskConfig( band_length=band_length, std_multiplier=multiplier, bandwidth_lookback=50, stop_loss_pct=stop_loss, take_profit_pct=recipe.get("take_profit_pct"), max_hold_bars=recipe.get("max_hold_bars"), cooldown_bars=int(recipe.get("cooldown_bars", 0)), vol_lookback=96, vol_cap=recipe.get("vol_cap"), trend_sma=480, long_regime=str(recipe.get("long_regime", "any")), short_regime=str(recipe.get("short_regime", "any")), neutral_distance=recipe.get("neutral_distance"), session=str(recipe.get("session", "all")), corr_lookback=192, corr_min=recipe.get("corr_min"), corr_max=recipe.get("corr_max"), side_mode=str(recipe.get("side_mode", "both")), ) ) return configs def score(full: dict[str, object], recent: dict[str, object]) -> float: trades_per_month = float(full["trades_per_month"]) if trades_per_month < 10.0: return -999.0 if float(recent["total_return"]) < -0.05: return -999.0 return ( float(full["calmar"]) + float(recent["calmar"]) * 0.5 + min(trades_per_month, 60.0) / 100.0 - float(full["max_drawdown"]) * 1.5 ) def pct(value: float) -> str: return f"{value * 100:.2f}%" def write_report(summary: pd.DataFrame, portfolio: pd.DataFrame, selected: list[dict[str, object]]) -> None: primary_summary = summary[summary["cost_model"] == PRIMARY_COST_MODEL] stress_summary = summary[summary["cost_model"] == "taker_taker"] primary_portfolio = portfolio[portfolio["cost_model"] == PRIMARY_COST_MODEL] stress_portfolio = portfolio[portfolio["cost_model"] == "taker_taker"] robust_single = robust_survivors(summary[summary["cost_model"].isin(("maker_taker", "taker_taker"))]) robust_combo = robust_survivors(portfolio[portfolio["cost_model"].isin(("maker_taker", "taker_taker"))]) lines = [ "# BTC/ETH BBMR Risk Variant Search", "", "Primary ranking uses maker/taker roundtrip margin cost 0.21%. Taker/taker stress uses 0.30%. Funding and slippage remain excluded.", "", f"Strict robust survivors with positive full/3y/1y/6m/3m net return and Calmar: single-symbol {len(robust_single)}, portfolio {len(robust_combo)}.", "", "Selection rule: rank candidates with at least 10 trades/month, lower full-period drawdown, and non-collapsing recent 1y results.", "", "## Risk-Qualified Single-Symbol Rows: maker/taker", "", ] for row in risk_qualified(primary_summary).head(12).to_dict("records"): lines.append( f"- {row['symbol']} {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, " f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, " f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}, " f"worst month {row['worst_month']} {pct(float(row['worst_month_return']))}, positive months {pct(float(row['positive_month_rate']))}" ) lines.extend(["", "## Risk-Qualified Portfolio Rows: maker/taker", ""]) for row in risk_qualified(primary_portfolio).head(10).to_dict("records"): lines.append( f"- {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, " f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, " f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}, " f"worst month {row['worst_month']} {pct(float(row['worst_month_return']))}, positive months {pct(float(row['positive_month_rate']))}" ) lines.extend(["", "## Taker/taker stress survivors", ""]) for row in risk_qualified(stress_summary).head(8).to_dict("records"): lines.append( f"- {row['symbol']} {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, " f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, " f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}" ) for row in risk_qualified(stress_portfolio).head(5).to_dict("records"): lines.append( f"- portfolio {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, " f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, " f"3y {pct(float(row['ret_3y']))}, 1y {pct(float(row['ret_1y']))}, 6m {pct(float(row['ret_6m']))}, 3m {pct(float(row['ret_3m']))}" ) lines.extend(["", "## Maker/taker near misses", ""]) near_miss_cols = ["symbol", "label", "total_return", "annualized_return", "max_drawdown", "calmar", "trades_per_month", "positive_month_rate", "worst_month_return"] for row in primary_summary[primary_summary["horizon"] == "full"].sort_values("score", ascending=False).head(5)[near_miss_cols].to_dict("records"): lines.append( f"- {row['symbol']} {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, " f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, " f"positive months {pct(float(row['positive_month_rate']))}, worst month {pct(float(row['worst_month_return']))}" ) for row in primary_portfolio[primary_portfolio["horizon"] == "full"].sort_values("score", ascending=False).head(5)[near_miss_cols].to_dict("records"): lines.append( f"- portfolio {row['label']}: total {pct(float(row['total_return']))}, annualized {pct(float(row['annualized_return']))}, " f"DD {pct(float(row['max_drawdown']))}, Calmar {float(row['calmar']):.2f}, trades/month {float(row['trades_per_month']):.1f}, " f"positive months {pct(float(row['positive_month_rate']))}, worst month {pct(float(row['worst_month_return']))}" ) lines.extend(["", "## Selected Configs", ""]) for item in selected: lines.append(f"- {item['symbol']} {item['label']} ({item['cost_model']}): `{json.dumps(item['config'], separators=(',', ':'))}`") (REPORT_DIR / f"{OUTPUT_PREFIX}-summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8") def risk_qualified(frame: pd.DataFrame) -> pd.DataFrame: full = frame[frame["horizon"] == "full"].copy() recent = full for horizon in ("3y", "1y", "6m", "3m"): part = frame[frame["horizon"] == horizon][["cost_model", "symbol", "label", "total_return", "max_drawdown", "calmar"]].rename( columns={"total_return": f"ret_{horizon}", "max_drawdown": f"dd_{horizon}", "calmar": f"calmar_{horizon}"} ) recent = recent.merge(part, on=["cost_model", "symbol", "label"], how="inner") dd_limit = recent["symbol"].map({"BTC": 0.5788, "ETH": 0.7199, "BTC+ETH": 0.5788}).fillna(0.5788) qualified = recent[ (recent["trades_per_month"] >= 10.0) & (recent["max_drawdown"] < dd_limit) & (recent["ret_1y"] > -0.05) & (recent["ret_6m"] > -0.05) & (recent["ret_3m"] > -0.05) ].copy() qualified["risk_rank"] = ( qualified["calmar"] * 2.0 + qualified["annualized_return"] + qualified["ret_1y"] * 0.5 - qualified["max_drawdown"] ) return qualified.sort_values(["symbol", "risk_rank"], ascending=[True, False]) def robust_survivors(frame: pd.DataFrame) -> pd.DataFrame: full = frame[frame["horizon"] == "full"].copy() recent = full for horizon in ("3y", "1y", "6m", "3m"): part = frame[frame["horizon"] == horizon][["cost_model", "symbol", "label", "total_return", "calmar"]].rename( columns={"total_return": f"ret_{horizon}", "calmar": f"calmar_{horizon}"} ) recent = recent.merge(part, on=["cost_model", "symbol", "label"], how="inner") return recent[ (recent["trades_per_month"] >= 10.0) & (recent["total_return"] > 0.0) & (recent["calmar"] > 0.0) & (recent["ret_3y"] > 0.0) & (recent["ret_1y"] > 0.0) & (recent["ret_6m"] > 0.0) & (recent["ret_3m"] > 0.0) & (recent["calmar_3y"] > 0.0) & (recent["calmar_1y"] > 0.0) & (recent["calmar_6m"] > 0.0) & (recent["calmar_3m"] > 0.0) ].copy() def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=10.0) parser.add_argument("--top-per-symbol", type=int, default=12) args = parser.parse_args() REPORT_DIR.mkdir(parents=True, exist_ok=True) explore = load_explore_module() btc_raw = load_cached_history(explore, "BTC-USDT-SWAP", BAR, args.years) eth_raw = load_cached_history(explore, "ETH-USDT-SWAP", BAR, args.years) btc, eth = align_pair(btc_raw, eth_raw) symbols = { "BTC": (btc, eth), "ETH": (eth, btc), } result_rows: list[dict[str, object]] = [] selected_rows: list[dict[str, object]] = [] selected_configs: dict[tuple[str, str], RiskConfig] = {} result_cache: dict[tuple[str, str, str], tuple[pd.DataFrame, list[dict[str, object]]]] = {} configs = build_configs() for symbol, (candles, peer) in symbols.items(): ranked: list[dict[str, object]] = [] for config in configs: for cost_model, roundtrip_cost in COST_MODELS.items(): result = run_bbmr_risk_segment(candles=candles, peer_candles=peer, config=config, roundtrip_cost=roundtrip_cost) frame = equity_frame(result) result_cache[(symbol, config.name, cost_model)] = (frame, result.trades) rows = summarize_all(config.name, symbol, frame, result.trades) full = next(row for row in rows if row["horizon"] == "full") one_year = next(row for row in rows if row["horizon"] == "1y") candidate_score = score(full, one_year) for row in rows: add_cost_columns(row, cost_model, roundtrip_cost) row["score"] = candidate_score row.update(asdict(config)) result_rows.append(row) if cost_model == PRIMARY_COST_MODEL: ranked.append({"score": candidate_score, "config": config, "result": result, "frame": frame, "rows": rows}) ranked.sort(key=lambda item: float(item["score"]), reverse=True) for item in ranked[: args.top_per_symbol]: config = item["config"] key = (symbol, config.name) selected_configs[key] = config selected_rows.append({"symbol": symbol, "label": config.name, "cost_model": PRIMARY_COST_MODEL, "score": item["score"], "config": asdict(config)}) summary = pd.DataFrame(result_rows) summary["horizon"] = pd.Categorical(summary["horizon"], categories=[name for name, _ in HORIZONS], ordered=True) summary = summary.sort_values(["symbol", "score", "horizon"], ascending=[True, False, True]) summary.to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-all.csv", index=False) summary[summary["horizon"] == "full"].sort_values("score", ascending=False).head(40).to_csv( REPORT_DIR / f"{OUTPUT_PREFIX}-top.csv", index=False, ) risk_qualified(summary).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-qualified.csv", index=False) net_summary = summary[summary["cost_model"].isin(("maker_taker", "taker_taker"))] robust_survivors(net_summary).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-robust-survivors.csv", index=False) portfolio_rows: list[dict[str, object]] = [] btc_keys = [key for key in selected_configs if key[0] == "BTC"][: args.top_per_symbol] eth_keys = [key for key in selected_configs if key[0] == "ETH"][: args.top_per_symbol] for btc_key in btc_keys: for eth_key in eth_keys: label = f"{btc_key[1]} + {eth_key[1]}" for cost_model, roundtrip_cost in COST_MODELS.items(): btc_frame, btc_trades = result_cache[(btc_key[0], btc_key[1], cost_model)] eth_frame, eth_trades = result_cache[(eth_key[0], eth_key[1], cost_model)] frame = combine_equity_frames([btc_frame, eth_frame]) trades = btc_trades + eth_trades rows = summarize_all(label, "BTC+ETH", frame, trades) full = next(row for row in rows if row["horizon"] == "full") one_year = next(row for row in rows if row["horizon"] == "1y") portfolio_score = score(full, one_year) for row in rows: add_cost_columns(row, cost_model, roundtrip_cost) row["score"] = portfolio_score portfolio_rows.append(row) portfolio = pd.DataFrame(portfolio_rows) portfolio["horizon"] = pd.Categorical(portfolio["horizon"], categories=[name for name, _ in HORIZONS], ordered=True) portfolio = portfolio.sort_values(["score", "horizon"], ascending=[False, True]) portfolio.to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-portfolio.csv", index=False) risk_qualified(portfolio).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-portfolio-qualified.csv", index=False) net_portfolio = portfolio[portfolio["cost_model"].isin(("maker_taker", "taker_taker"))] robust_survivors(net_portfolio).to_csv(REPORT_DIR / f"{OUTPUT_PREFIX}-portfolio-robust-survivors.csv", index=False) Path(REPORT_DIR / f"{OUTPUT_PREFIX}-selected.json").write_text( json.dumps(selected_rows, indent=2), encoding="utf-8", ) write_report(summary, portfolio, selected_rows) print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-all.csv'}") print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-top.csv'}") print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-portfolio.csv'}") print(f"wrote {REPORT_DIR / f'{OUTPUT_PREFIX}-summary.md'}") return 0 if __name__ == "__main__": raise SystemExit(main())