from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import pandas as pd CACHE_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-relative-momentum" INITIAL_EQUITY = 10_000.0 TAKER_FEE = 0.0004 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Params: bar: str lookback: int trend: int rel_entry: float vol_quantile: float short_weight: float long_weight: float @property def name(self) -> str: return ( f"eth_relmom-{self.bar}-lb{self.lookback}-tr{self.trend}" f"-re{self.rel_entry:.3f}-vq{self.vol_quantile:.1f}" f"-sw{self.short_weight:.2f}-lw{self.long_weight:.2f}" ) def load_15m(symbol: str) -> pd.DataFrame: path = CACHE_DIR / symbol / "15m.csv" frame = pd.read_csv(path) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") def resample(frame: pd.DataFrame, bar: str) -> pd.DataFrame: rule = {"1H": "1h", "4H": "4h"}[bar] out = frame.resample(rule, label="left", closed="left").agg( open=("open", "first"), high=("high", "max"), low=("low", "min"), close=("close", "last"), volume=("volume", "sum"), ) return out.dropna() def build_params() -> list[Params]: params: list[Params] = [] for bar, lookbacks, trends in ( ("1H", (24, 72, 168), (24 * 30, 24 * 60)), ("4H", (12, 42, 84), (6 * 30, 6 * 60)), ): for lookback in lookbacks: for trend in trends: for rel_entry in (0.015, 0.025, 0.04): for vol_quantile in (0.4, 0.7): for short_weight, long_weight in ((1.0, 0.0), (1.0, 0.25), (0.75, 0.25)): params.append(Params(bar, lookback, trend, rel_entry, vol_quantile, short_weight, long_weight)) return params def target_position(closes: pd.DataFrame, params: Params) -> pd.Series: eth = closes["ETH-USDT-SWAP"] btc = closes["BTC-USDT-SWAP"] eth_momentum = eth / eth.shift(params.lookback) - 1.0 btc_momentum = btc / btc.shift(params.lookback) - 1.0 relative = eth_momentum - btc_momentum trend = eth.ewm(span=params.trend, adjust=False).mean() btc_trend = btc.ewm(span=params.trend, adjust=False).mean() eth_vol = eth.pct_change().rolling(params.lookback).std(ddof=1) vol_gate = eth_vol >= eth_vol.rolling(params.trend).quantile(params.vol_quantile) position = pd.Series(0.0, index=closes.index) short_signal = (relative <= -params.rel_entry) & (eth < trend) & vol_gate long_signal = (relative >= params.rel_entry) & (eth > trend) & (btc > btc_trend) & vol_gate position.loc[short_signal] = -params.short_weight position.loc[long_signal] = params.long_weight return position.fillna(0.0) def equity_curve(closes: pd.DataFrame, position: pd.Series) -> pd.Series: eth_returns = closes["ETH-USDT-SWAP"].pct_change().fillna(0.0) executed = position.shift(1).fillna(0.0) turnover = executed.diff().abs().fillna(executed.abs()) net_returns = executed * eth_returns - turnover * TAKER_FEE equity = INITIAL_EQUITY * (1.0 + net_returns).cumprod() equity.name = "equity" return equity def trade_returns(closes: pd.DataFrame, position: pd.Series) -> list[dict[str, object]]: eth_returns = closes["ETH-USDT-SWAP"].pct_change().fillna(0.0) executed = position.shift(1).fillna(0.0) turnover = executed.diff().abs().fillna(executed.abs()) net_returns = executed * eth_returns - turnover * TAKER_FEE active = executed != 0.0 groups = (active.ne(active.shift(1)) | executed.ne(executed.shift(1))).cumsum() trades: list[dict[str, object]] = [] for _, mask in active.groupby(groups): if not bool(mask.iloc[0]): continue index = mask.index returns = net_returns.loc[index] value = float((1.0 + returns).prod() - 1.0) side = "short" if float(executed.loc[index[0]]) < 0.0 else "long" trades.append({"side": side, "entry_time": index[0], "exit_time": index[-1], "return": value}) return trades def series_metrics(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = float((series.cummax() - series).div(series.cummax()).max()) return {"total_return": total, "annualized_return": annualized, "max_drawdown": drawdown} def trade_metrics(trades: list[dict[str, object]], start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float | int]: scoped = [float(trade["return"]) for trade in trades if start <= pd.Timestamp(trade["exit_time"]) <= end] wins = [value for value in scoped if value > 0.0] losses = [value for value in scoped if value < 0.0] gross_profit = sum(wins) gross_loss = abs(sum(losses)) return { "win_rate": len(wins) / len(scoped) if scoped else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, "trades": len(scoped), } def horizon_rows(name: str, params: Params, series: pd.Series, trades: list[dict[str, object]]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end = series.index[-1] for horizon, offset in HORIZONS: scoped = series if offset is None else series[series.index >= end - offset] if len(scoped) < 2: scoped = series start = scoped.index[0] rows.append( { "name": name, "horizon": horizon, "start": start.strftime("%Y-%m-%d"), "end": scoped.index[-1].strftime("%Y-%m-%d"), "bar": params.bar, "lookback": params.lookback, "trend": params.trend, "rel_entry": params.rel_entry, "vol_quantile": params.vol_quantile, "short_weight": params.short_weight, "long_weight": params.long_weight, **series_metrics(scoped), **trade_metrics(trades, start, scoped.index[-1]), } ) return rows def markdown_table(frame: pd.DataFrame) -> str: values = [list(frame.columns), ["---" for _ in frame.columns]] values.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) lines = [] for row in values: cells = [] for value in row: cells.append(f"{value:.6g}" if isinstance(value, float) else str(value).replace("|", "\\|")) lines.append("| " + " | ".join(cells) + " |") return "\n".join(lines) def report_text(command: str, paths: list[Path], selected: pd.DataFrame, horizons: pd.DataFrame, qualified_count: int) -> str: names = set(selected["name"]) selected_horizons = horizons[horizons["name"].isin(names)] conclusion = ( "Worth continuing: at least one candidate is positive across full/3y/1y/6m/3m with controlled drawdown." if qualified_count else "Not worth continuing as a standalone direction: no candidate passed the positive full/3y/1y/6m/3m filter." ) return "\n".join( [ "# ETH Relative Momentum Exploration", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: offline ETH-USDT-SWAP strategy using cached OKX 15m candles resampled to 1H/4H, with BTC-USDT-SWAP only as a relative-momentum filter. No live code or exchange API path was used.", "Direction: bidirectional but short-biased; tested short-only and small-long variants.", "Cost: 0.04% taker fee on absolute notional turnover.", "", f"Conclusion: {conclusion}", "", "## Selected Candidates", "", markdown_table(selected), "", "## Required Horizons", "", markdown_table(selected_horizons), "", ] ) def score(row: dict[str, object]) -> float: return ( float(row["annualized_return"]) - float(row["max_drawdown"]) + 0.7 * float(row["return_1y"]) + 0.4 * float(row["return_6m"]) + 0.2 * float(row["return_3m"]) ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--top", type=int, default=25) args = parser.parse_args() args.output_dir.mkdir(parents=True, exist_ok=True) source = {symbol: load_15m(symbol) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP")} closes_by_bar = { bar: pd.DataFrame( { symbol: resample(frame, bar)["close"] for symbol, frame in source.items() } ).dropna() for bar in ("1H", "4H") } totals: list[dict[str, object]] = [] all_horizons: list[dict[str, object]] = [] for params in build_params(): closes = closes_by_bar[params.bar] position = target_position(closes, params) equity = equity_curve(closes, position) trades = trade_returns(closes, position) horizons = horizon_rows(params.name, params, equity, trades) by_horizon = {row["horizon"]: row for row in horizons} full = by_horizon["full"] row = { **full, "return_3y": float(by_horizon["3y"]["total_return"]), "return_1y": float(by_horizon["1y"]["total_return"]), "return_6m": float(by_horizon["6m"]["total_return"]), "return_3m": float(by_horizon["3m"]["total_return"]), } row["score"] = score(row) totals.append(row) all_horizons.extend(horizons) total = pd.DataFrame(totals).sort_values(["score", "annualized_return"], ascending=[False, False]) qualified = total[ (total["total_return"] > 0.0) & (total["return_3y"] > 0.0) & (total["return_1y"] > 0.0) & (total["return_6m"] > 0.0) & (total["return_3m"] > 0.0) & (total["max_drawdown"] <= 0.35) & (total["trades"] >= 20) & (total["profit_factor"] > 1.0) ].head(args.top) selected = qualified if len(qualified) else total.head(args.top) horizons = pd.DataFrame(all_horizons) selected_horizons = horizons[horizons["name"].isin(set(selected["name"]))] total_path = args.output_dir / f"{PREFIX}-totals.csv" selected_path = args.output_dir / f"{PREFIX}-selected.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" report_path = args.output_dir / f"{PREFIX}-report.md" total.head(200).to_csv(total_path, index=False) selected.to_csv(selected_path, index=False) selected_horizons.to_csv(horizon_path, index=False) report_path.write_text( report_text( "rtk .venv/bin/python scripts/search_eth_relative_momentum.py", [total_path, selected_path, horizon_path, report_path], selected, selected_horizons, len(qualified), ), encoding="utf-8", ) print(report_path) print(selected.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())