| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- from __future__ import annotations
- import argparse
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- CACHE_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/eth-exploration")
- PREFIX = "eth-relative-momentum"
- INITIAL_EQUITY = 10_000.0
- TAKER_FEE = 0.0004
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Params:
- bar: str
- lookback: int
- trend: int
- rel_entry: float
- vol_quantile: float
- short_weight: float
- long_weight: float
- @property
- def name(self) -> str:
- return (
- f"eth_relmom-{self.bar}-lb{self.lookback}-tr{self.trend}"
- f"-re{self.rel_entry:.3f}-vq{self.vol_quantile:.1f}"
- f"-sw{self.short_weight:.2f}-lw{self.long_weight:.2f}"
- )
- def load_15m(symbol: str) -> pd.DataFrame:
- path = CACHE_DIR / symbol / "15m.csv"
- frame = pd.read_csv(path)
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts")
- def resample(frame: pd.DataFrame, bar: str) -> pd.DataFrame:
- rule = {"1H": "1h", "4H": "4h"}[bar]
- out = frame.resample(rule, label="left", closed="left").agg(
- open=("open", "first"),
- high=("high", "max"),
- low=("low", "min"),
- close=("close", "last"),
- volume=("volume", "sum"),
- )
- return out.dropna()
- def build_params() -> list[Params]:
- params: list[Params] = []
- for bar, lookbacks, trends in (
- ("1H", (24, 72, 168), (24 * 30, 24 * 60)),
- ("4H", (12, 42, 84), (6 * 30, 6 * 60)),
- ):
- for lookback in lookbacks:
- for trend in trends:
- for rel_entry in (0.015, 0.025, 0.04):
- for vol_quantile in (0.4, 0.7):
- for short_weight, long_weight in ((1.0, 0.0), (1.0, 0.25), (0.75, 0.25)):
- params.append(Params(bar, lookback, trend, rel_entry, vol_quantile, short_weight, long_weight))
- return params
- def target_position(closes: pd.DataFrame, params: Params) -> pd.Series:
- eth = closes["ETH-USDT-SWAP"]
- btc = closes["BTC-USDT-SWAP"]
- eth_momentum = eth / eth.shift(params.lookback) - 1.0
- btc_momentum = btc / btc.shift(params.lookback) - 1.0
- relative = eth_momentum - btc_momentum
- trend = eth.ewm(span=params.trend, adjust=False).mean()
- btc_trend = btc.ewm(span=params.trend, adjust=False).mean()
- eth_vol = eth.pct_change().rolling(params.lookback).std(ddof=1)
- vol_gate = eth_vol >= eth_vol.rolling(params.trend).quantile(params.vol_quantile)
- position = pd.Series(0.0, index=closes.index)
- short_signal = (relative <= -params.rel_entry) & (eth < trend) & vol_gate
- long_signal = (relative >= params.rel_entry) & (eth > trend) & (btc > btc_trend) & vol_gate
- position.loc[short_signal] = -params.short_weight
- position.loc[long_signal] = params.long_weight
- return position.fillna(0.0)
- def equity_curve(closes: pd.DataFrame, position: pd.Series) -> pd.Series:
- eth_returns = closes["ETH-USDT-SWAP"].pct_change().fillna(0.0)
- executed = position.shift(1).fillna(0.0)
- turnover = executed.diff().abs().fillna(executed.abs())
- net_returns = executed * eth_returns - turnover * TAKER_FEE
- equity = INITIAL_EQUITY * (1.0 + net_returns).cumprod()
- equity.name = "equity"
- return equity
- def trade_returns(closes: pd.DataFrame, position: pd.Series) -> list[dict[str, object]]:
- eth_returns = closes["ETH-USDT-SWAP"].pct_change().fillna(0.0)
- executed = position.shift(1).fillna(0.0)
- turnover = executed.diff().abs().fillna(executed.abs())
- net_returns = executed * eth_returns - turnover * TAKER_FEE
- active = executed != 0.0
- groups = (active.ne(active.shift(1)) | executed.ne(executed.shift(1))).cumsum()
- trades: list[dict[str, object]] = []
- for _, mask in active.groupby(groups):
- if not bool(mask.iloc[0]):
- continue
- index = mask.index
- returns = net_returns.loc[index]
- value = float((1.0 + returns).prod() - 1.0)
- side = "short" if float(executed.loc[index[0]]) < 0.0 else "long"
- trades.append({"side": side, "entry_time": index[0], "exit_time": index[-1], "return": value})
- return trades
- def series_metrics(series: pd.Series) -> dict[str, float]:
- years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365
- total = float(series.iloc[-1] / series.iloc[0] - 1.0)
- annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0
- drawdown = float((series.cummax() - series).div(series.cummax()).max())
- return {"total_return": total, "annualized_return": annualized, "max_drawdown": drawdown}
- def trade_metrics(trades: list[dict[str, object]], start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float | int]:
- scoped = [float(trade["return"]) for trade in trades if start <= pd.Timestamp(trade["exit_time"]) <= end]
- wins = [value for value in scoped if value > 0.0]
- losses = [value for value in scoped if value < 0.0]
- gross_profit = sum(wins)
- gross_loss = abs(sum(losses))
- return {
- "win_rate": len(wins) / len(scoped) if scoped else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- "trades": len(scoped),
- }
- def horizon_rows(name: str, params: Params, series: pd.Series, trades: list[dict[str, object]]) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- end = series.index[-1]
- for horizon, offset in HORIZONS:
- scoped = series if offset is None else series[series.index >= end - offset]
- if len(scoped) < 2:
- scoped = series
- start = scoped.index[0]
- rows.append(
- {
- "name": name,
- "horizon": horizon,
- "start": start.strftime("%Y-%m-%d"),
- "end": scoped.index[-1].strftime("%Y-%m-%d"),
- "bar": params.bar,
- "lookback": params.lookback,
- "trend": params.trend,
- "rel_entry": params.rel_entry,
- "vol_quantile": params.vol_quantile,
- "short_weight": params.short_weight,
- "long_weight": params.long_weight,
- **series_metrics(scoped),
- **trade_metrics(trades, start, scoped.index[-1]),
- }
- )
- return rows
- def markdown_table(frame: pd.DataFrame) -> str:
- values = [list(frame.columns), ["---" for _ in frame.columns]]
- values.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- lines = []
- for row in values:
- cells = []
- for value in row:
- cells.append(f"{value:.6g}" if isinstance(value, float) else str(value).replace("|", "\\|"))
- lines.append("| " + " | ".join(cells) + " |")
- return "\n".join(lines)
- def report_text(command: str, paths: list[Path], selected: pd.DataFrame, horizons: pd.DataFrame, qualified_count: int) -> str:
- names = set(selected["name"])
- selected_horizons = horizons[horizons["name"].isin(names)]
- conclusion = (
- "Worth continuing: at least one candidate is positive across full/3y/1y/6m/3m with controlled drawdown."
- if qualified_count
- else "Not worth continuing as a standalone direction: no candidate passed the positive full/3y/1y/6m/3m filter."
- )
- return "\n".join(
- [
- "# ETH Relative Momentum Exploration",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Scope: offline ETH-USDT-SWAP strategy using cached OKX 15m candles resampled to 1H/4H, with BTC-USDT-SWAP only as a relative-momentum filter. No live code or exchange API path was used.",
- "Direction: bidirectional but short-biased; tested short-only and small-long variants.",
- "Cost: 0.04% taker fee on absolute notional turnover.",
- "",
- f"Conclusion: {conclusion}",
- "",
- "## Selected Candidates",
- "",
- markdown_table(selected),
- "",
- "## Required Horizons",
- "",
- markdown_table(selected_horizons),
- "",
- ]
- )
- def score(row: dict[str, object]) -> float:
- return (
- float(row["annualized_return"])
- - float(row["max_drawdown"])
- + 0.7 * float(row["return_1y"])
- + 0.4 * float(row["return_6m"])
- + 0.2 * float(row["return_3m"])
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--top", type=int, default=25)
- args = parser.parse_args()
- args.output_dir.mkdir(parents=True, exist_ok=True)
- source = {symbol: load_15m(symbol) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP")}
- closes_by_bar = {
- bar: pd.DataFrame(
- {
- symbol: resample(frame, bar)["close"]
- for symbol, frame in source.items()
- }
- ).dropna()
- for bar in ("1H", "4H")
- }
- totals: list[dict[str, object]] = []
- all_horizons: list[dict[str, object]] = []
- for params in build_params():
- closes = closes_by_bar[params.bar]
- position = target_position(closes, params)
- equity = equity_curve(closes, position)
- trades = trade_returns(closes, position)
- horizons = horizon_rows(params.name, params, equity, trades)
- by_horizon = {row["horizon"]: row for row in horizons}
- full = by_horizon["full"]
- row = {
- **full,
- "return_3y": float(by_horizon["3y"]["total_return"]),
- "return_1y": float(by_horizon["1y"]["total_return"]),
- "return_6m": float(by_horizon["6m"]["total_return"]),
- "return_3m": float(by_horizon["3m"]["total_return"]),
- }
- row["score"] = score(row)
- totals.append(row)
- all_horizons.extend(horizons)
- total = pd.DataFrame(totals).sort_values(["score", "annualized_return"], ascending=[False, False])
- qualified = total[
- (total["total_return"] > 0.0)
- & (total["return_3y"] > 0.0)
- & (total["return_1y"] > 0.0)
- & (total["return_6m"] > 0.0)
- & (total["return_3m"] > 0.0)
- & (total["max_drawdown"] <= 0.35)
- & (total["trades"] >= 20)
- & (total["profit_factor"] > 1.0)
- ].head(args.top)
- selected = qualified if len(qualified) else total.head(args.top)
- horizons = pd.DataFrame(all_horizons)
- selected_horizons = horizons[horizons["name"].isin(set(selected["name"]))]
- total_path = args.output_dir / f"{PREFIX}-totals.csv"
- selected_path = args.output_dir / f"{PREFIX}-selected.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizons.csv"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- total.head(200).to_csv(total_path, index=False)
- selected.to_csv(selected_path, index=False)
- selected_horizons.to_csv(horizon_path, index=False)
- report_path.write_text(
- report_text(
- "rtk .venv/bin/python scripts/search_eth_relative_momentum.py",
- [total_path, selected_path, horizon_path, report_path],
- selected,
- selected_horizons,
- len(qualified),
- ),
- encoding="utf-8",
- )
- print(report_path)
- print(selected.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|