| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- from __future__ import annotations
- import argparse
- import sys
- from dataclasses import dataclass
- from itertools import product
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- from scripts import explore_ultrashort as explore
- OUTPUT_DIR = Path("reports/strategy-expansion")
- PREFIX = "rotation"
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP")
- BASE_BAR = "15m"
- BARS = ("1h", "4h", "1d")
- OKX_BARS = {"1h": "1H", "4h": "4H", "1d": "1Dutc"}
- BAR_MS = {"1h": 3_600_000, "4h": 14_400_000, "1d": 86_400_000}
- LEVERAGE = 3
- INITIAL_EQUITY = 10_000.0
- TAKER_FEE = 0.0004
- HORIZONS = (
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Params:
- family: str
- bar: str
- lookback: int
- trend: int
- btc_trend: int
- rebalance: int
- top_n: int
- min_momentum: float
- btc_min_momentum: float
- vol_lookback: int
- max_vol: float
- @property
- def name(self) -> str:
- return (
- f"{self.family}-{self.bar}-lb{self.lookback}-tr{self.trend}-bt{self.btc_trend}"
- f"-rb{self.rebalance}-top{self.top_n}-mm{self.min_momentum:.3f}"
- f"-bm{self.btc_min_momentum:.3f}-vw{self.vol_lookback}-vc{self.max_vol:.4f}"
- )
- def load_local_candles(symbol: str, bar: str) -> list[Candle]:
- candles, _ = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar)
- return candles
- def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
- frame = pd.DataFrame(
- {
- "ts": [pd.to_datetime(candle.ts, unit="ms", utc=True) for candle in candles],
- "open": [candle.open for candle in candles],
- "high": [candle.high for candle in candles],
- "low": [candle.low for candle in candles],
- "close": [candle.close for candle in candles],
- "volume": [candle.volume for candle in candles],
- }
- )
- return frame.set_index("ts").sort_index()
- def aggregate_frame(frame: pd.DataFrame, bar: str) -> pd.DataFrame:
- rule = {"1h": "1h", "4h": "4h", "1d": "1D"}[bar]
- aggregated = frame.resample(rule, label="left", closed="left").agg(
- {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}
- )
- return aggregated.dropna()
- def fetch_okx_frame(symbol: str, bar: str, years: float) -> pd.DataFrame:
- interval_ms = BAR_MS[bar]
- limit = int(years * 365 * 86_400_000 / interval_ms) + 500
- candles = OkxClient().get_candles(symbol, OKX_BARS[bar], limit)
- if not candles:
- raise FileNotFoundError(f"missing OKX candles for {symbol} {bar}")
- return frame_from_candles(candles)
- def load_symbol_bar_frames(years: float) -> dict[tuple[str, str], pd.DataFrame]:
- frames: dict[tuple[str, str], pd.DataFrame] = {}
- for symbol in SYMBOLS:
- local = load_local_candles(symbol, BASE_BAR)
- base = frame_from_candles(local) if local else None
- for bar in BARS:
- if base is not None:
- frame = aggregate_frame(base, bar)
- else:
- frame = fetch_okx_frame(symbol, bar, years)
- cutoff = frame.index[-1] - pd.DateOffset(years=years)
- frames[(symbol, bar)] = frame[frame.index >= cutoff]
- return frames
- def aligned_closes(frames: dict[tuple[str, str], pd.DataFrame], params: Params) -> pd.DataFrame:
- required = max(params.lookback, params.trend, params.btc_trend, params.vol_lookback) + max(params.rebalance * 6, 120)
- full = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in SYMBOLS}).dropna()
- if len(full) >= required:
- return full
- btc_eth = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in SYMBOLS[:2]}).dropna()
- if len(btc_eth) < required:
- raise ValueError(f"insufficient aligned candles for {params.name}")
- return btc_eth
- def build_params() -> list[Params]:
- params: list[Params] = []
- for bar in BARS:
- if bar == "1h":
- lookbacks = (24 * 14, 24 * 30)
- trends = (24 * 30, 24 * 60)
- btc_trends = (24 * 120,)
- rebalances = (24 * 3, 24 * 7)
- vol_lookbacks = (24 * 14,)
- elif bar == "4h":
- lookbacks = (6 * 30, 6 * 60)
- trends = (6 * 60, 6 * 120)
- btc_trends = (6 * 180,)
- rebalances = (6 * 7, 6 * 14)
- vol_lookbacks = (6 * 30,)
- else:
- lookbacks = (60, 120)
- trends = (120, 200)
- btc_trends = (200,)
- rebalances = (14, 30)
- vol_lookbacks = (30,)
- for family, lookback, trend, btc_trend, rebalance, top_n, min_momentum, btc_min_momentum, max_vol in product(
- ("dual_momentum", "trend_basket"),
- lookbacks,
- trends,
- btc_trends,
- rebalances,
- (1, 2),
- (0.0, 0.03),
- (0.0,),
- (0.055,),
- ):
- if family == "dual_momentum" and top_n != 1:
- continue
- params.append(
- Params(
- family=family,
- bar=bar,
- lookback=lookback,
- trend=trend,
- btc_trend=btc_trend,
- rebalance=rebalance,
- top_n=top_n,
- min_momentum=min_momentum,
- btc_min_momentum=btc_min_momentum,
- vol_lookback=vol_lookbacks[0],
- max_vol=max_vol,
- )
- )
- return params
- def target_weights(closes: pd.DataFrame, params: Params) -> pd.DataFrame:
- momentum = closes / closes.shift(params.lookback) - 1.0
- trend = closes > closes.rolling(params.trend).mean()
- btc_trend = closes["BTC-USDT-SWAP"] > closes["BTC-USDT-SWAP"].rolling(params.btc_trend).mean()
- btc_momentum = closes["BTC-USDT-SWAP"] / closes["BTC-USDT-SWAP"].shift(params.lookback) - 1.0
- btc_vol = closes["BTC-USDT-SWAP"].pct_change().rolling(params.vol_lookback).std(ddof=1)
- risk_on = btc_trend & (btc_momentum >= params.btc_min_momentum) & (btc_vol <= params.max_vol)
- weights = pd.DataFrame(0.0, index=closes.index, columns=closes.columns)
- for index in range(max(params.lookback, params.trend, params.btc_trend, params.vol_lookback), len(closes), params.rebalance):
- if not bool(risk_on.iloc[index]):
- continue
- current_momentum = momentum.iloc[index]
- eligible = current_momentum[(current_momentum >= params.min_momentum) & trend.iloc[index]]
- if eligible.empty:
- continue
- if params.family == "dual_momentum":
- selected = eligible.sort_values(ascending=False).head(1).index
- else:
- selected = eligible.sort_values(ascending=False).head(params.top_n).index
- weights.loc[closes.index[index], selected] = 1.0 / len(selected)
- return weights.replace(0.0, pd.NA).ffill(limit=max(params.rebalance - 1, 1)).fillna(0.0)
- def trade_stats(weights: pd.DataFrame, closes: pd.DataFrame) -> dict[str, float | int]:
- returns = closes.pct_change().shift(-1)
- wins = 0
- losses = 0
- gross_profit = 0.0
- gross_loss = 0.0
- trades = 0
- for symbol in closes.columns:
- active = weights[symbol] > 0.0
- group = (active != active.shift(1)).cumsum()
- for _, mask in active.groupby(group):
- if not bool(mask.iloc[0]):
- continue
- trade_return = float((1.0 + returns.loc[mask.index, symbol].dropna() * LEVERAGE).prod() - 1.0)
- trade_return -= TAKER_FEE * LEVERAGE * 2.0
- trades += 1
- if trade_return > 0.0:
- wins += 1
- gross_profit += trade_return
- else:
- losses += 1
- gross_loss += abs(trade_return)
- return {
- "trades": trades,
- "win_rate": wins / trades if trades else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- }
- def equity_curve(closes: pd.DataFrame, weights: pd.DataFrame) -> pd.Series:
- returns = closes.pct_change().fillna(0.0)
- executed = weights.shift(1).fillna(0.0)
- turnover = executed.diff().abs().sum(axis=1).fillna(executed.abs().sum(axis=1))
- net_returns = (executed * returns * LEVERAGE).sum(axis=1) - turnover * TAKER_FEE * LEVERAGE
- equity = INITIAL_EQUITY * (1.0 + net_returns).cumprod()
- equity.name = "equity"
- return equity
- def metrics(series: pd.Series) -> dict[str, float]:
- years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365
- total = float(series.iloc[-1] / series.iloc[0] - 1.0)
- annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0
- drawdown = float((series.cummax() - series).div(series.cummax()).max())
- return {
- "total_return": total,
- "annualized_return": annualized,
- "max_drawdown": drawdown,
- "calmar": annualized / drawdown if drawdown else 0.0,
- }
- def horizon_rows(name: str, series: pd.Series) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- end = series.index[-1]
- for label, offset in HORIZONS:
- horizon = series[series.index >= end - offset]
- if len(horizon) < 2:
- horizon = series
- rows.append(
- {
- "strategy": name,
- "horizon": label,
- "start": horizon.index[0].strftime("%Y-%m-%d"),
- "end": horizon.index[-1].strftime("%Y-%m-%d"),
- **metrics(horizon),
- }
- )
- return rows
- def monthly_rows(name: str, series: pd.Series) -> pd.DataFrame:
- monthly = series.resample("ME").last()
- frame = pd.DataFrame(
- {
- "strategy": name,
- "month": monthly.index.strftime("%Y-%m"),
- "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(),
- "end_equity": monthly.to_numpy(),
- }
- )
- frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0
- return frame
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def report_text(command: str, paths: list[Path], top: pd.DataFrame, horizons: pd.DataFrame, monthly: pd.DataFrame) -> str:
- best_names = set(top.head(3)["strategy"])
- recent = horizons[horizons["strategy"].isin(best_names)]
- best_monthly = monthly[monthly["strategy"].isin(best_names)]
- return "\n".join(
- [
- "# Rotation strategy expansion",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Cost model: 0.04% one-way taker fee, charged on leveraged notional at each portfolio weight change.",
- "Universe: BTC-USDT-SWAP, ETH-USDT-SWAP, SOL-USDT-SWAP OKX perpetual swaps when the aligned history is long enough for that candidate; otherwise the row records the BTC/ETH universe used. BTC/ETH use local 15m cache aggregated upward; SOL uses OKX historical candles in memory.",
- "",
- "## Top candidates",
- "",
- markdown_table(
- top.head(10)[
- [
- "strategy",
- "family",
- "bar",
- "universe",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades",
- "win_rate",
- "profit_factor",
- "turnover_per_year",
- ]
- ]
- ),
- "",
- "## Recent horizons for top 3",
- "",
- markdown_table(recent),
- "",
- "## Monthly returns for top 3",
- "",
- markdown_table(best_monthly.tail(36)),
- "",
- ]
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=8.0)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--top", type=int, default=30)
- args = parser.parse_args()
- frames = load_symbol_bar_frames(args.years)
- totals: list[dict[str, object]] = []
- horizon_output: list[dict[str, object]] = []
- monthly_output: list[pd.DataFrame] = []
- params = build_params()
- for index, param in enumerate(params, start=1):
- closes = aligned_closes(frames, param)
- weights = target_weights(closes, param)
- equity = equity_curve(closes, weights)
- stat = trade_stats(weights, closes)
- years = (equity.index[-1] - equity.index[0]).total_seconds() / 86_400 / 365
- turnover_per_year = float(weights.shift(1).fillna(0.0).diff().abs().sum(axis=1).sum() / years)
- row = {
- "strategy": param.name,
- "family": param.family,
- "bar": param.bar,
- "universe": ",".join(closes.columns),
- "lookback": param.lookback,
- "trend": param.trend,
- "btc_trend": param.btc_trend,
- "rebalance": param.rebalance,
- "top_n": param.top_n,
- "min_momentum": param.min_momentum,
- "btc_min_momentum": param.btc_min_momentum,
- "vol_lookback": param.vol_lookback,
- "max_vol": param.max_vol,
- "first_candle": equity.index[0].strftime("%Y-%m-%d %H:%M"),
- "last_candle": equity.index[-1].strftime("%Y-%m-%d %H:%M"),
- "years": years,
- "turnover_per_year": turnover_per_year,
- **metrics(equity),
- **stat,
- }
- totals.append(row)
- for horizon in horizon_rows(param.name, equity):
- horizon_output.append(horizon)
- monthly_output.append(monthly_rows(param.name, equity))
- if index % 100 == 0:
- print(f"done {index}/{len(params)}")
- total = pd.DataFrame(totals)
- positive_recent = pd.DataFrame(horizon_output).pivot(index="strategy", columns="horizon", values="total_return")
- stable = total[
- total["strategy"].isin(
- positive_recent[
- (positive_recent["3y"] > 0.0)
- & (positive_recent["1y"] > 0.0)
- & (positive_recent["6m"] > -0.05)
- & (positive_recent["3m"] > -0.05)
- ].index
- )
- ]
- ranked = stable.sort_values(
- ["calmar", "max_drawdown", "annualized_return", "turnover_per_year"],
- ascending=[False, True, False, True],
- )
- if ranked.empty:
- ranked = total.sort_values(["calmar", "max_drawdown", "annualized_return"], ascending=[False, True, False])
- top = ranked.head(args.top)
- top_names = set(top["strategy"])
- horizons = pd.DataFrame(horizon_output)
- horizons = horizons[horizons["strategy"].isin(top_names)]
- monthly = pd.concat(monthly_output, ignore_index=True)
- monthly = monthly[monthly["strategy"].isin(top_names)]
- args.output_dir.mkdir(parents=True, exist_ok=True)
- total_path = args.output_dir / f"{PREFIX}-total.csv"
- top_path = args.output_dir / f"{PREFIX}-top.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizons.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly.csv"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- paths = [total_path, top_path, horizon_path, monthly_path, report_path]
- total.to_csv(total_path, index=False)
- top.to_csv(top_path, index=False)
- horizons.to_csv(horizon_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- command = f"rtk .venv/bin/python scripts/search_expansion_rotation.py --years {args.years} --top {args.top}"
- report_path.write_text(report_text(command, paths, top, horizons, monthly), encoding="utf-8")
- print(top.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|