| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- from __future__ import annotations
- import argparse
- import sys
- from dataclasses import dataclass, replace
- from itertools import product
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts import search_expansion_rotation as rotation
- OUTPUT_DIR = Path("reports/strategy-expansion")
- PREFIX = "rotation-risk"
- INITIAL_EQUITY = rotation.INITIAL_EQUITY
- TAKER_FEE = rotation.TAKER_FEE
- rotation.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- @dataclass(frozen=True)
- class RiskParams:
- base: rotation.Params
- leverage: float
- exposure: float
- vol_target: float
- @property
- def name(self) -> str:
- return (
- f"{self.base.name}-lev{self.leverage:.2f}"
- f"-exp{self.exposure:.2f}-vt{self.vol_target:.4f}"
- )
- def params_from_row(row: pd.Series) -> rotation.Params:
- return rotation.Params(
- family=str(row["family"]),
- bar=str(row["bar"]),
- lookback=int(row["lookback"]),
- trend=int(row["trend"]),
- btc_trend=int(row["btc_trend"]),
- rebalance=int(row["rebalance"]),
- top_n=int(row["top_n"]),
- min_momentum=float(row["min_momentum"]),
- btc_min_momentum=float(row["btc_min_momentum"]),
- vol_lookback=int(row["vol_lookback"]),
- max_vol=float(row["max_vol"]),
- )
- def bar_scale(bar: str) -> float:
- return {"1h": 24.0 * 365.0, "4h": 6.0 * 365.0, "1d": 365.0}[bar] ** 0.5
- def source_bases(limit: int) -> list[rotation.Params]:
- report_path = OUTPUT_DIR / "rotation-top.csv"
- frame = pd.read_csv(report_path).head(limit)
- return [params_from_row(row) for _, row in frame.iterrows()]
- def build_signal_params(limit: int) -> list[rotation.Params]:
- params: dict[str, rotation.Params] = {}
- for base in source_bases(limit):
- if base.bar == "1h":
- rebalances = (base.rebalance,)
- btc_trends = tuple(sorted({base.btc_trend, 24 * 180}))
- max_vols = (0.030,)
- elif base.bar == "4h":
- rebalances = (base.rebalance,)
- btc_trends = tuple(sorted({base.btc_trend, 6 * 180}))
- max_vols = (0.055,)
- else:
- rebalances = (base.rebalance,)
- btc_trends = tuple(sorted({base.btc_trend, 240}))
- max_vols = (0.090,)
- top_ns = (1,) if base.family == "dual_momentum" else (1, 2)
- for rebalance, top_n, btc_trend, btc_min_momentum, max_vol in product(
- rebalances,
- top_ns,
- btc_trends,
- (0.00, 0.05),
- max_vols,
- ):
- tuned = replace(
- base,
- rebalance=rebalance,
- top_n=top_n,
- btc_trend=btc_trend,
- btc_min_momentum=btc_min_momentum,
- max_vol=max_vol,
- )
- params[tuned.name] = tuned
- return list(params.values())
- def risk_variants(base: rotation.Params) -> list[RiskParams]:
- return [
- RiskParams(base, leverage, exposure, vol_target)
- for leverage, exposure, vol_target in product(
- (0.75, 1.0, 1.25, 1.5, 2.0),
- (0.50, 0.65, 0.80),
- (0.0, 0.20, 0.30),
- )
- ]
- def apply_risk_controls(closes: pd.DataFrame, weights: pd.DataFrame, params: RiskParams) -> pd.DataFrame:
- controlled = weights * params.exposure
- if params.vol_target <= 0.0:
- return controlled
- returns = closes.pct_change().fillna(0.0)
- portfolio_vol = (controlled.shift(1).fillna(0.0) * returns).sum(axis=1).rolling(params.base.vol_lookback).std(ddof=1)
- target_bar_vol = params.vol_target / bar_scale(params.base.bar)
- scale = (target_bar_vol / portfolio_vol).clip(upper=1.0).fillna(1.0)
- return controlled.mul(scale, axis=0)
- def equity_curve(closes: pd.DataFrame, weights: pd.DataFrame, params: RiskParams) -> pd.Series:
- returns = closes.pct_change().fillna(0.0)
- executed = weights.shift(1).fillna(0.0)
- turnover = executed.diff().abs().sum(axis=1).fillna(executed.abs().sum(axis=1))
- net_returns = (executed * returns * params.leverage).sum(axis=1) - turnover * TAKER_FEE * params.leverage
- equity = INITIAL_EQUITY * (1.0 + net_returns).cumprod()
- equity.name = "equity"
- return equity
- def trade_stats(weights: pd.DataFrame, closes: pd.DataFrame, leverage: float) -> dict[str, float | int]:
- returns = closes.pct_change().fillna(0.0)
- executed = weights.shift(1).fillna(0.0)
- turnover = executed.diff().abs().fillna(executed.abs())
- wins = 0
- gross_profit = 0.0
- gross_loss = 0.0
- trades = 0
- for symbol in closes.columns:
- active = executed[symbol] > 0.0
- group = (active != active.shift(1)).cumsum()
- for _, mask in active.groupby(group):
- if not bool(mask.iloc[0]):
- continue
- segment_index = mask.index
- net_returns = (
- returns.loc[segment_index, symbol] * executed.loc[segment_index, symbol] * leverage
- - turnover.loc[segment_index, symbol] * TAKER_FEE * leverage
- )
- trade_equity = float((1.0 + net_returns).prod())
- last_position = executed.index.get_loc(segment_index[-1])
- if last_position + 1 < len(executed.index):
- close_index = executed.index[last_position + 1]
- if executed.loc[close_index, symbol] == 0.0:
- trade_equity *= 1.0 - turnover.loc[close_index, symbol] * TAKER_FEE * leverage
- trade_return = trade_equity - 1.0
- trades += 1
- if trade_return > 0.0:
- wins += 1
- gross_profit += trade_return
- else:
- gross_loss += abs(trade_return)
- return {
- "trades": trades,
- "win_rate": wins / trades if trades else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- }
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(rotation.format_cell(value) for value in row) + " |" for row in rows)
- def report_text(command: str, paths: list[Path], top: pd.DataFrame, horizons: pd.DataFrame, monthly: pd.DataFrame) -> str:
- best_names = set(top.head(3)["strategy"])
- recent = horizons[horizons["strategy"].isin(best_names)]
- best_monthly = monthly[monthly["strategy"].isin(best_names)]
- next_step = top.head(3)[
- [
- "strategy",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "h3y_return",
- "h1y_return",
- "h6m_return",
- "h3m_return",
- "next_validation",
- ]
- ]
- return "\n".join(
- [
- "# Rotation risk refinement",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Objective: reduce maximum drawdown to <=50%, preferably <=35%, while keeping annualized return positive and recent 3y/1y/6m/3m returns from deteriorating excessively.",
- "Cost model: 0.04% one-way taker fee, charged on leveraged notional at each portfolio weight change.",
- "",
- "## Top 3 low-drawdown candidates",
- "",
- markdown_table(next_step),
- "",
- "## Ranked candidates",
- "",
- markdown_table(
- top.head(20)[
- [
- "strategy",
- "family",
- "bar",
- "universe",
- "leverage",
- "exposure",
- "vol_target",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades",
- "win_rate",
- "profit_factor",
- ]
- ]
- ),
- "",
- "## Recent horizons for top 3",
- "",
- markdown_table(recent),
- "",
- "## Monthly returns for top 3",
- "",
- markdown_table(best_monthly),
- "",
- ]
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=8.0)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--top", type=int, default=30)
- parser.add_argument("--base-limit", type=int, default=6)
- args = parser.parse_args()
- frames = rotation.load_symbol_bar_frames(args.years)
- totals: list[dict[str, object]] = []
- horizon_output: list[dict[str, object]] = []
- monthly_output: list[pd.DataFrame] = []
- signal_params = build_signal_params(args.base_limit)
- total_runs = sum(len(risk_variants(base)) for base in signal_params)
- run_index = 0
- for base in signal_params:
- closes = rotation.aligned_closes(frames, base)
- signal_weights = rotation.target_weights(closes, base)
- for param in risk_variants(base):
- run_index += 1
- weights = apply_risk_controls(closes, signal_weights, param)
- equity = equity_curve(closes, weights, param)
- stat = trade_stats(weights, closes, param.leverage)
- years = (equity.index[-1] - equity.index[0]).total_seconds() / 86_400 / 365
- turnover_per_year = float(weights.shift(1).fillna(0.0).diff().abs().sum(axis=1).sum() / years)
- horizon_rows = rotation.horizon_rows(param.name, equity)
- horizons_by_label = {row["horizon"]: row for row in horizon_rows}
- row = {
- "strategy": param.name,
- "family": base.family,
- "bar": base.bar,
- "universe": ",".join(closes.columns),
- "lookback": base.lookback,
- "trend": base.trend,
- "btc_trend": base.btc_trend,
- "rebalance": base.rebalance,
- "top_n": base.top_n,
- "min_momentum": base.min_momentum,
- "btc_min_momentum": base.btc_min_momentum,
- "vol_lookback": base.vol_lookback,
- "max_vol": base.max_vol,
- "leverage": param.leverage,
- "exposure": param.exposure,
- "vol_target": param.vol_target,
- "first_candle": equity.index[0].strftime("%Y-%m-%d %H:%M"),
- "last_candle": equity.index[-1].strftime("%Y-%m-%d %H:%M"),
- "years": years,
- "turnover_per_year": turnover_per_year,
- "h3y_return": horizons_by_label["3y"]["total_return"],
- "h1y_return": horizons_by_label["1y"]["total_return"],
- "h6m_return": horizons_by_label["6m"]["total_return"],
- "h3m_return": horizons_by_label["3m"]["total_return"],
- **rotation.metrics(equity),
- **stat,
- }
- totals.append(row)
- horizon_output.extend(horizon_rows)
- monthly_output.append(rotation.monthly_rows(param.name, equity))
- if run_index % 1000 == 0:
- print(f"done {run_index}/{total_runs}")
- total = pd.DataFrame(totals)
- viable = total[
- (total["annualized_return"] > 0.0)
- & (total["max_drawdown"] <= 0.50)
- & (total["h3y_return"] > 0.0)
- & (total["h1y_return"] > 0.0)
- & (total["h6m_return"] > -0.10)
- & (total["h3m_return"] > -0.10)
- ].copy()
- if viable.empty:
- viable = total[(total["annualized_return"] > 0.0) & (total["max_drawdown"] <= 0.50)].copy()
- if viable.empty:
- viable = total.copy()
- viable["next_validation"] = viable.apply(
- lambda row: "yes" if row["max_drawdown"] <= 0.50 and row["annualized_return"] > 0.0 else "no",
- axis=1,
- )
- ranked = viable.sort_values(
- ["max_drawdown", "calmar", "annualized_return", "h1y_return"],
- ascending=[True, False, False, False],
- )
- top = ranked.head(args.top)
- top_names = set(top["strategy"])
- horizons = pd.DataFrame(horizon_output)
- horizons = horizons[horizons["strategy"].isin(top_names)]
- monthly = pd.concat(monthly_output, ignore_index=True)
- monthly = monthly[monthly["strategy"].isin(top_names)]
- args.output_dir.mkdir(parents=True, exist_ok=True)
- total_path = args.output_dir / f"{PREFIX}-total.csv"
- top_path = args.output_dir / f"{PREFIX}-top.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizons.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly.csv"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- paths = [total_path, top_path, horizon_path, monthly_path, report_path]
- total.to_csv(total_path, index=False)
- top.to_csv(top_path, index=False)
- horizons.to_csv(horizon_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- command = (
- "rtk .venv/bin/python scripts/refine_expansion_rotation_risk.py"
- f" --years {args.years} --top {args.top} --base-limit {args.base_limit}"
- )
- report_path.write_text(report_text(command, paths, top, horizons, monthly), encoding="utf-8")
- print(top.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|