from __future__ import annotations import argparse import sys from dataclasses import dataclass from itertools import product from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient from scripts import explore_ultrashort as explore OUTPUT_DIR = Path("reports/strategy-expansion") PREFIX = "rotation" SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP") BASE_BAR = "15m" BARS = ("1h", "4h", "1d") OKX_BARS = {"1h": "1H", "4h": "4H", "1d": "1Dutc"} BAR_MS = {"1h": 3_600_000, "4h": 14_400_000, "1d": 86_400_000} LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 TAKER_FEE = 0.0004 HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Params: family: str bar: str lookback: int trend: int btc_trend: int rebalance: int top_n: int min_momentum: float btc_min_momentum: float vol_lookback: int max_vol: float @property def name(self) -> str: return ( f"{self.family}-{self.bar}-lb{self.lookback}-tr{self.trend}-bt{self.btc_trend}" f"-rb{self.rebalance}-top{self.top_n}-mm{self.min_momentum:.3f}" f"-bm{self.btc_min_momentum:.3f}-vw{self.vol_lookback}-vc{self.max_vol:.4f}" ) def load_local_candles(symbol: str, bar: str) -> list[Candle]: candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar) return candles def frame_from_candles(candles: list[Candle]) -> pd.DataFrame: frame = pd.DataFrame( { "ts": [pd.to_datetime(candle.ts, unit="ms", utc=True) for candle in candles], "open": [candle.open for candle in candles], "high": [candle.high for candle in candles], "low": [candle.low for candle in candles], "close": [candle.close for candle in candles], "volume": [candle.volume for candle in candles], } ) return frame.set_index("ts").sort_index() def aggregate_frame(frame: pd.DataFrame, bar: str) -> pd.DataFrame: rule = {"1h": "1h", "4h": "4h", "1d": "1D"}[bar] aggregated = frame.resample(rule, label="left", closed="left").agg( {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} ) return aggregated.dropna() def fetch_okx_frame(symbol: str, bar: str, years: float) -> pd.DataFrame: interval_ms = BAR_MS[bar] limit = int(years * 365 * 86_400_000 / interval_ms) + 500 candles = OkxClient().get_candles(symbol, OKX_BARS[bar], limit) if not candles: raise FileNotFoundError(f"missing OKX candles for {symbol} {bar}") return frame_from_candles(candles) def load_symbol_bar_frames(years: float) -> dict[tuple[str, str], pd.DataFrame]: frames: dict[tuple[str, str], pd.DataFrame] = {} for symbol in SYMBOLS: local = load_local_candles(symbol, BASE_BAR) base = frame_from_candles(local) if local else None for bar in BARS: if base is not None: frame = aggregate_frame(base, bar) else: frame = fetch_okx_frame(symbol, bar, years) cutoff = frame.index[-1] - pd.DateOffset(years=years) frames[(symbol, bar)] = frame[frame.index >= cutoff] return frames def aligned_closes(frames: dict[tuple[str, str], pd.DataFrame], params: Params) -> pd.DataFrame: required = max(params.lookback, params.trend, params.btc_trend, params.vol_lookback) + max(params.rebalance * 6, 120) full = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in SYMBOLS}).dropna() if len(full) >= required: return full btc_eth = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in SYMBOLS[:2]}).dropna() if len(btc_eth) < required: raise ValueError(f"insufficient aligned candles for {params.name}") return btc_eth def build_params() -> list[Params]: params: list[Params] = [] for bar in BARS: if bar == "1h": lookbacks = (24 * 14, 24 * 30) trends = (24 * 30, 24 * 60) btc_trends = (24 * 120,) rebalances = (24 * 3, 24 * 7) vol_lookbacks = (24 * 14,) elif bar == "4h": lookbacks = (6 * 30, 6 * 60) trends = (6 * 60, 6 * 120) btc_trends = (6 * 180,) rebalances = (6 * 7, 6 * 14) vol_lookbacks = (6 * 30,) else: lookbacks = (60, 120) trends = (120, 200) btc_trends = (200,) rebalances = (14, 30) vol_lookbacks = (30,) for family, lookback, trend, btc_trend, rebalance, top_n, min_momentum, btc_min_momentum, max_vol in product( ("dual_momentum", "trend_basket"), lookbacks, trends, btc_trends, rebalances, (1, 2), (0.0, 0.03), (0.0,), (0.055,), ): if family == "dual_momentum" and top_n != 1: continue params.append( Params( family=family, bar=bar, lookback=lookback, trend=trend, btc_trend=btc_trend, rebalance=rebalance, top_n=top_n, min_momentum=min_momentum, btc_min_momentum=btc_min_momentum, vol_lookback=vol_lookbacks[0], max_vol=max_vol, ) ) return params def target_weights(closes: pd.DataFrame, params: Params) -> pd.DataFrame: momentum = closes / closes.shift(params.lookback) - 1.0 trend = closes > closes.rolling(params.trend).mean() btc_trend = closes["BTC-USDT-SWAP"] > closes["BTC-USDT-SWAP"].rolling(params.btc_trend).mean() btc_momentum = closes["BTC-USDT-SWAP"] / closes["BTC-USDT-SWAP"].shift(params.lookback) - 1.0 btc_vol = closes["BTC-USDT-SWAP"].pct_change().rolling(params.vol_lookback).std(ddof=1) risk_on = btc_trend & (btc_momentum >= params.btc_min_momentum) & (btc_vol <= params.max_vol) weights = pd.DataFrame(0.0, index=closes.index, columns=closes.columns) for index in range(max(params.lookback, params.trend, params.btc_trend, params.vol_lookback), len(closes), params.rebalance): if not bool(risk_on.iloc[index]): continue current_momentum = momentum.iloc[index] eligible = current_momentum[(current_momentum >= params.min_momentum) & trend.iloc[index]] if eligible.empty: continue if params.family == "dual_momentum": selected = eligible.sort_values(ascending=False).head(1).index else: selected = eligible.sort_values(ascending=False).head(params.top_n).index weights.loc[closes.index[index], selected] = 1.0 / len(selected) return weights.replace(0.0, pd.NA).ffill(limit=max(params.rebalance - 1, 1)).fillna(0.0) def trade_stats(weights: pd.DataFrame, closes: pd.DataFrame) -> dict[str, float | int]: returns = closes.pct_change().shift(-1) wins = 0 losses = 0 gross_profit = 0.0 gross_loss = 0.0 trades = 0 for symbol in closes.columns: active = weights[symbol] > 0.0 group = (active != active.shift(1)).cumsum() for _, mask in active.groupby(group): if not bool(mask.iloc[0]): continue trade_return = float((1.0 + returns.loc[mask.index, symbol].dropna() * LEVERAGE).prod() - 1.0) trade_return -= TAKER_FEE * LEVERAGE * 2.0 trades += 1 if trade_return > 0.0: wins += 1 gross_profit += trade_return else: losses += 1 gross_loss += abs(trade_return) return { "trades": trades, "win_rate": wins / trades if trades else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, } def equity_curve(closes: pd.DataFrame, weights: pd.DataFrame) -> pd.Series: returns = closes.pct_change().fillna(0.0) executed = weights.shift(1).fillna(0.0) turnover = executed.diff().abs().sum(axis=1).fillna(executed.abs().sum(axis=1)) net_returns = (executed * returns * LEVERAGE).sum(axis=1) - turnover * TAKER_FEE * LEVERAGE equity = INITIAL_EQUITY * (1.0 + net_returns).cumprod() equity.name = "equity" return equity def metrics(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = float((series.cummax() - series).div(series.cummax()).max()) return { "total_return": total, "annualized_return": annualized, "max_drawdown": drawdown, "calmar": annualized / drawdown if drawdown else 0.0, } def horizon_rows(name: str, series: pd.Series) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end = series.index[-1] for label, offset in HORIZONS: horizon = series[series.index >= end - offset] if len(horizon) < 2: horizon = series rows.append( { "strategy": name, "horizon": label, "start": horizon.index[0].strftime("%Y-%m-%d"), "end": horizon.index[-1].strftime("%Y-%m-%d"), **metrics(horizon), } ) return rows def monthly_rows(name: str, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "strategy": name, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def report_text(command: str, paths: list[Path], top: pd.DataFrame, horizons: pd.DataFrame, monthly: pd.DataFrame) -> str: best_names = set(top.head(3)["strategy"]) recent = horizons[horizons["strategy"].isin(best_names)] best_monthly = monthly[monthly["strategy"].isin(best_names)] return "\n".join( [ "# Rotation strategy expansion", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Cost model: 0.04% one-way taker fee, charged on leveraged notional at each portfolio weight change.", "Universe: BTC-USDT-SWAP, ETH-USDT-SWAP, SOL-USDT-SWAP OKX perpetual swaps when the aligned history is long enough for that candidate; otherwise the row records the BTC/ETH universe used. BTC/ETH use local 15m cache aggregated upward; SOL uses OKX historical candles in memory.", "", "## Top candidates", "", markdown_table( top.head(10)[ [ "strategy", "family", "bar", "universe", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "win_rate", "profit_factor", "turnover_per_year", ] ] ), "", "## Recent horizons for top 3", "", markdown_table(recent), "", "## Monthly returns for top 3", "", markdown_table(best_monthly.tail(36)), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=8.0) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--top", type=int, default=30) args = parser.parse_args() frames = load_symbol_bar_frames(args.years) totals: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_output: list[pd.DataFrame] = [] params = build_params() for index, param in enumerate(params, start=1): closes = aligned_closes(frames, param) weights = target_weights(closes, param) equity = equity_curve(closes, weights) stat = trade_stats(weights, closes) years = (equity.index[-1] - equity.index[0]).total_seconds() / 86_400 / 365 turnover_per_year = float(weights.shift(1).fillna(0.0).diff().abs().sum(axis=1).sum() / years) row = { "strategy": param.name, "family": param.family, "bar": param.bar, "universe": ",".join(closes.columns), "lookback": param.lookback, "trend": param.trend, "btc_trend": param.btc_trend, "rebalance": param.rebalance, "top_n": param.top_n, "min_momentum": param.min_momentum, "btc_min_momentum": param.btc_min_momentum, "vol_lookback": param.vol_lookback, "max_vol": param.max_vol, "first_candle": equity.index[0].strftime("%Y-%m-%d %H:%M"), "last_candle": equity.index[-1].strftime("%Y-%m-%d %H:%M"), "years": years, "turnover_per_year": turnover_per_year, **metrics(equity), **stat, } totals.append(row) for horizon in horizon_rows(param.name, equity): horizon_output.append(horizon) monthly_output.append(monthly_rows(param.name, equity)) if index % 100 == 0: print(f"done {index}/{len(params)}") total = pd.DataFrame(totals) positive_recent = pd.DataFrame(horizon_output).pivot(index="strategy", columns="horizon", values="total_return") stable = total[ total["strategy"].isin( positive_recent[ (positive_recent["3y"] > 0.0) & (positive_recent["1y"] > 0.0) & (positive_recent["6m"] > -0.05) & (positive_recent["3m"] > -0.05) ].index ) ] ranked = stable.sort_values( ["calmar", "max_drawdown", "annualized_return", "turnover_per_year"], ascending=[False, True, False, True], ) if ranked.empty: ranked = total.sort_values(["calmar", "max_drawdown", "annualized_return"], ascending=[False, True, False]) top = ranked.head(args.top) top_names = set(top["strategy"]) horizons = pd.DataFrame(horizon_output) horizons = horizons[horizons["strategy"].isin(top_names)] monthly = pd.concat(monthly_output, ignore_index=True) monthly = monthly[monthly["strategy"].isin(top_names)] args.output_dir.mkdir(parents=True, exist_ok=True) total_path = args.output_dir / f"{PREFIX}-total.csv" top_path = args.output_dir / f"{PREFIX}-top.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly.csv" report_path = args.output_dir / f"{PREFIX}-report.md" paths = [total_path, top_path, horizon_path, monthly_path, report_path] total.to_csv(total_path, index=False) top.to_csv(top_path, index=False) horizons.to_csv(horizon_path, index=False) monthly.to_csv(monthly_path, index=False) command = f"rtk .venv/bin/python scripts/search_expansion_rotation.py --years {args.years} --top {args.top}" report_path.write_text(report_text(command, paths, top, horizons, monthly), encoding="utf-8") print(top.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())