from __future__ import annotations import argparse import json from dataclasses import dataclass from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUT_DIR = Path("reports/ultrashort") SYMBOL = "ETH-USDT-SWAP" INITIAL_EQUITY = 10_000.0 LEVERAGE = 3.0 TAKER_FEE = 0.0004 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ("14d", pd.DateOffset(days=14)), ) @dataclass(frozen=True) class Candidate: family: str bar: str params: dict[str, float | int | str] @property def name(self) -> str: body = "-".join(f"{key}{value:g}" if isinstance(value, float) else f"{key}{value}" for key, value in self.params.items()) return f"{self.family}-{self.bar}-{body}" def load_frame(bar: str) -> pd.DataFrame: frame = pd.read_csv(DATA_DIR / SYMBOL / f"{bar}.csv") frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True) def load_cache_summary(bars: list[str]) -> pd.DataFrame: rows = [] for bar in bars: meta_path = DATA_DIR / SYMBOL / f"{bar}.meta.json" meta = json.loads(meta_path.read_text(encoding="utf-8")) rows.append( { "bar": bar, "rows": int(meta["rows"]), "first_time": pd.to_datetime(int(meta["first_ts"]), unit="ms", utc=True).strftime("%Y-%m-%d %H:%M"), "last_time": pd.to_datetime(int(meta["last_ts"]), unit="ms", utc=True).strftime("%Y-%m-%d %H:%M"), "history_exhausted": bool(meta["history_exhausted"]), } ) return pd.DataFrame(rows) def rsi(close: pd.Series, length: int) -> pd.Series: delta = close.diff() gain = delta.clip(lower=0.0).rolling(length).mean() loss = (-delta.clip(upper=0.0)).rolling(length).mean() return 100.0 - 100.0 / (1.0 + gain / loss) def build_candidates(bars: list[str]) -> list[Candidate]: candidates: list[Candidate] = [] for bar in bars: for window in (48, 96): base = {"window": window, "entry_z": 1.5, "exit_z": 0.20, "stop": 0.006, "take": 0.009, "hold": 12} candidates.append(Candidate("vwap_bidir", bar, base)) candidates.append(Candidate("vwap_short", bar, base)) for trend in (96, 192): candidates.append( Candidate( "rsi_short", bar, {"trend": trend, "entry": 90, "exit": 45, "stop": 0.0075, "take": 0.010, "hold": 12}, ) ) candidates.append( Candidate( "rsi_bidir", bar, {"trend": trend, "entry": 10, "exit": 55, "stop": 0.0075, "take": 0.010, "hold": 12}, ) ) for lookback in (48, 96): candidates.append( Candidate( "breakdown_short", bar, {"lookback": lookback, "stop": 0.006, "take": 0.012, "hold": 12}, ) ) return candidates def signal_columns(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.Series]: close = frame["close"] params = candidate.params if candidate.family in ("vwap_bidir", "vwap_short"): window = int(params["window"]) volume = frame["volume"] vwap = (close * volume).rolling(window).sum() / volume.rolling(window).sum() stdev = close.rolling(window).std(ddof=0) zscore = (close - vwap) / stdev entry = pd.Series("", index=frame.index, dtype=object) entry.loc[zscore >= float(params["entry_z"])] = "short" if candidate.family == "vwap_bidir": entry.loc[zscore <= -float(params["entry_z"])] = "long" return entry, zscore.abs() <= float(params["exit_z"]) if candidate.family in ("rsi_short", "rsi_bidir"): trend = close.rolling(int(params["trend"])).mean() value = rsi(close, 2) entry = pd.Series("", index=frame.index, dtype=object) entry.loc[(close < trend) & (value >= float(params["entry"]))] = "short" if candidate.family == "rsi_bidir": entry.loc[(close > trend) & (value <= float(params["entry"]))] = "long" return entry, (value <= 100.0 - float(params["exit"])) | (value >= float(params["exit"])) lookback = int(params["lookback"]) prior_low = frame["low"].shift(1).rolling(lookback).min() entry = pd.Series("", index=frame.index, dtype=object) entry.loc[close < prior_low] = "short" return entry, pd.Series(False, index=frame.index) def close_return(side: str, entry: float, exit_price: float) -> float: price_return = exit_price / entry - 1.0 if side == "long" else entry / exit_price - 1.0 return LEVERAGE * price_return - LEVERAGE * TAKER_FEE * (1.0 + exit_price / entry) def mark_return(side: str, entry: float, close: float) -> float: price_return = close / entry - 1.0 if side == "long" else entry / close - 1.0 return LEVERAGE * price_return - LEVERAGE * TAKER_FEE def backtest(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.DataFrame]: entry_signal, exit_signal = signal_columns(frame, candidate) warmup = max(int(value) for key, value in candidate.params.items() if key in {"window", "trend", "lookback"}) + 2 equity = INITIAL_EQUITY position: dict[str, object] | None = None pending_entry = "" pending_exit = False curve: list[tuple[pd.Timestamp, float]] = [] trades: list[dict[str, object]] = [] rows = list(frame.itertuples(index=False)) for index in range(warmup, len(rows)): candle = rows[index] if pending_exit and position is not None: net = close_return(str(position["side"]), float(position["entry"]), float(candle.open)) equity *= 1.0 + net trades.append({"entry_time": position["entry_time"], "exit_time": candle.dt, "side": position["side"], "return": net}) position = None pending_exit = False if pending_entry and position is None and equity > 0.0: position = {"side": pending_entry, "entry": float(candle.open), "entry_index": index, "entry_time": candle.dt} pending_entry = "" mark = equity if position is not None: side = str(position["side"]) entry = float(position["entry"]) stop = float(candidate.params["stop"]) take = float(candidate.params["take"]) stop_price = entry * (1.0 - stop if side == "long" else 1.0 + stop) take_price = entry * (1.0 + take if side == "long" else 1.0 - take) stop_hit = candle.low <= stop_price if side == "long" else candle.high >= stop_price take_hit = candle.high >= take_price if side == "long" else candle.low <= take_price if stop_hit or take_hit: exit_price = stop_price if stop_hit else take_price net = close_return(side, entry, exit_price) equity *= 1.0 + net trades.append({"entry_time": position["entry_time"], "exit_time": candle.dt, "side": side, "return": net}) position = None mark = equity else: mark = equity * (1.0 + mark_return(side, entry, float(candle.close))) curve.append((candle.dt, mark)) if index == len(rows) - 1 or equity <= 0.0: continue next_entry = str(entry_signal.iloc[index]) if position is not None: reverse = bool(next_entry) and next_entry != position["side"] stale = index - int(position["entry_index"]) >= int(candidate.params["hold"]) if bool(exit_signal.iloc[index]) or reverse or stale: pending_exit = True pending_entry = next_entry if reverse else "" elif next_entry: pending_entry = next_entry if position is not None: final = rows[-1] net = close_return(str(position["side"]), float(position["entry"]), float(final.close)) equity *= 1.0 + net trades.append({"entry_time": position["entry_time"], "exit_time": final.dt, "side": position["side"], "return": net}) curve.append((final.dt, equity)) return pd.Series(dict(curve)).sort_index(), pd.DataFrame(trades) def scoped(equity: pd.Series, trades: pd.DataFrame, offset: pd.DateOffset | None) -> tuple[pd.Series, pd.DataFrame]: if offset is None: return equity, trades start = equity.index[-1] - offset scoped_equity = equity[equity.index >= start] scoped_trades = trades[trades["entry_time"] >= scoped_equity.index[0]] if len(trades) else trades return scoped_equity, scoped_trades def metrics(equity: pd.Series, trades: pd.DataFrame) -> dict[str, float | int]: total = float(equity.iloc[-1] / equity.iloc[0] - 1.0) years = (equity.index[-1] - equity.index[0]).total_seconds() / 31_536_000 annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0 else 0.0 drawdown = float(((equity.cummax() - equity) / equity.cummax()).max()) returns = trades["return"] if len(trades) else pd.Series(dtype=float) wins = returns[returns > 0.0] losses = returns[returns < 0.0] return { "total_return": total, "annualized_return": annual, "max_drawdown": drawdown, "calmar": annual / drawdown if drawdown else 0.0, "trades": int(len(trades)), "short_trades": int((trades["side"] == "short").sum()) if len(trades) else 0, "long_trades": int((trades["side"] == "long").sum()) if len(trades) else 0, "profit_factor": float(wins.sum() / abs(losses.sum())) if len(losses) else (999.0 if len(wins) else 0.0), "win_rate": float(len(wins) / len(returns)) if len(returns) else 0.0, } def summarize(candidate: Candidate, equity: pd.Series, trades: pd.DataFrame) -> dict[str, object]: row: dict[str, object] = { "symbol": SYMBOL, "bar": candidate.bar, "family": candidate.family, "name": candidate.name, "params_json": json.dumps(candidate.params, separators=(",", ":")), "first_time": equity.index[0].strftime("%Y-%m-%d %H:%M"), "last_time": equity.index[-1].strftime("%Y-%m-%d %H:%M"), } for label, offset in HORIZONS: part_equity, part_trades = scoped(equity, trades, offset) for key, value in metrics(part_equity, part_trades).items(): row[f"{label}_{key}"] = value row["recent_trigger_score"] = int(row["3m_trades"]) + int(row["30d_trades"]) * 2 + int(row["14d_trades"]) * 4 observe = ( int(row["3m_trades"]) >= 12 and int(row["30d_trades"]) >= 4 and int(row["14d_trades"]) >= 1 and float(row["full_total_return"]) > 0.0 and float(row["3y_total_return"]) > 0.0 and float(row["1y_total_return"]) > 0.0 and float(row["3y_max_drawdown"]) <= 0.35 and float(row["1y_max_drawdown"]) <= 0.25 and float(row["1y_profit_factor"]) >= 1.05 ) row["readonly_observe"] = "yes" if observe else "no" return row def horizon_summary(totals: pd.DataFrame) -> pd.DataFrame: rows = [] for label, _ in HORIZONS: returns = totals[f"{label}_total_return"] drawdowns = totals[f"{label}_max_drawdown"] trades = totals[f"{label}_trades"] best_index = returns.idxmax() rows.append( { "horizon": label, "positive_candidates": int((returns > 0.0).sum()), "non_disaster_candidates": int(((returns > -0.50) & (drawdowns < 0.60)).sum()), "best_total_return": float(returns.max()), "median_total_return": float(returns.median()), "worst_total_return": float(returns.min()), "median_max_drawdown": float(drawdowns.median()), "max_trades": int(trades.max()), "best_name": str(totals.loc[best_index, "name"]), } ) return pd.DataFrame(rows) def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.4f}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def write_report(totals: pd.DataFrame, summary: pd.DataFrame, cache: pd.DataFrame, paths: list[Path], command: str) -> str: selected = totals[totals["readonly_observe"] == "yes"].head(12) least_bad = totals.sort_values( ["full_total_return", "3y_total_return", "1y_total_return", "6m_total_return", "3m_total_return"], ascending=[False, False, False, False, False], ).head(12) recent = totals.sort_values(["recent_trigger_score", "3y_calmar", "1y_calmar"], ascending=[False, False, False]).head(12) cols = [ "family", "bar", "name", "full_total_return", "full_max_drawdown", "full_trades", "3y_total_return", "1y_total_return", "6m_total_return", "3m_total_return", "30d_total_return", "14d_total_return", "3m_trades", "30d_trades", "14d_trades", "readonly_observe", ] observation = ( "No read-only observation candidates passed the rule." if not len(selected) else "At least one read-only observation candidate passed the rule." ) disaster = ( "Long-term status: still disastrous. The full, 3y, and 1y windows have zero positive candidates under the original high-frequency short/bidir candidate set." if all(int(summary.loc[summary["horizon"] == label, "positive_candidates"].iloc[0]) == 0 for label in ("full", "3y", "1y")) else "Long-term status: not uniformly disastrous; at least one of full/3y/1y has a positive candidate." ) return "\n".join( [ "# ETH high-frequency short/bidirectional revalidation", "", f"Run command: `{command}`", "Scope: offline only; local refreshed 3m/5m/15m OKX ETH candle cache; no live executor, deployment, private API, or order path touched.", f"Cost model: taker fee `{TAKER_FEE}` each side on `{LEVERAGE:g}x` notional; entries execute on next open.", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Cache used:", "", markdown_table(cache), "", "Windows compared: full, 3y, 1y, 6m, 3m, 30d, 14d.", "Read-only observation rule: 3m >= 12 trades, 30d >= 4 trades, 14d >= 1 trade, positive full/3y/1y return, 3y MDD <= 35%, 1y MDD <= 25%, and 1y profit factor >= 1.05.", "", f"Conclusion: {disaster} {observation}", "", "## Horizon Summary", "", markdown_table(summary), "", "## Read-only Observation Candidates", "", markdown_table(selected[cols]) if len(selected) else "No candidates passed the read-only observation rule.", "", "## Least-bad Long-term Rows", "", markdown_table(least_bad[cols]), "", "## Most Recently Active Rows", "", markdown_table(recent[cols]), ] ) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bars", nargs="+", default=["3m", "5m", "15m"]) parser.add_argument("--output-dir", type=Path, default=OUT_DIR) args = parser.parse_args() rows: list[dict[str, object]] = [] frames = {bar: load_frame(bar) for bar in args.bars} for candidate in build_candidates(args.bars): equity, trades = backtest(frames[candidate.bar], candidate) if len(equity) >= 2: rows.append(summarize(candidate, equity, trades)) totals = pd.DataFrame(rows).sort_values( ["readonly_observe", "full_total_return", "3y_total_return", "1y_total_return", "recent_trigger_score"], ascending=[False, False, False, False, False], ) summary = horizon_summary(totals) cache = load_cache_summary(args.bars) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / "eth-highfreq-short-bidir-revalidation-candidates.csv" summary_path = args.output_dir / "eth-highfreq-short-bidir-revalidation-summary.csv" report_path = args.output_dir / "eth-highfreq-short-bidir-revalidation-report.md" paths = [totals_path, summary_path, report_path] totals.to_csv(totals_path, index=False) summary.to_csv(summary_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bars {' '.join(args.bars)}" report_path.write_text(write_report(totals, summary, cache, paths, command), encoding="utf-8") print(summary.to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())