| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- from __future__ import annotations
- import argparse
- import json
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- DATA_DIR = Path("data/okx-candles")
- OUT_DIR = Path("reports/ultrashort")
- SYMBOL = "ETH-USDT-SWAP"
- INITIAL_EQUITY = 10_000.0
- LEVERAGE = 3.0
- TAKER_FEE = 0.0004
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("3m", pd.DateOffset(months=3)),
- ("1m", pd.DateOffset(months=1)),
- ("2w", pd.DateOffset(weeks=2)),
- )
- @dataclass(frozen=True)
- class Candidate:
- family: str
- bar: str
- params: dict[str, float | int | str]
- @property
- def name(self) -> str:
- body = "-".join(f"{key}{value:g}" if isinstance(value, float) else f"{key}{value}" for key, value in self.params.items())
- return f"{self.family}-{self.bar}-{body}"
- def load_frame(bar: str, years: float) -> pd.DataFrame:
- frame = pd.read_csv(DATA_DIR / SYMBOL / f"{bar}.csv")
- frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- frame = frame.sort_values("ts").drop_duplicates("ts", keep="last")
- cutoff = frame["dt"].iloc[-1] - pd.DateOffset(days=int(years * 365))
- return frame[frame["dt"] >= cutoff].reset_index(drop=True)
- def rsi(close: pd.Series, length: int) -> pd.Series:
- delta = close.diff()
- gain = delta.clip(lower=0.0).rolling(length).mean()
- loss = (-delta.clip(upper=0.0)).rolling(length).mean()
- return 100.0 - 100.0 / (1.0 + gain / loss)
- def build_candidates(bars: list[str]) -> list[Candidate]:
- candidates: list[Candidate] = []
- for bar in bars:
- for window in (48, 96):
- for entry_z in (1.5,):
- for hold in (12,):
- base = {"window": window, "entry_z": entry_z, "exit_z": 0.20, "stop": 0.006, "take": 0.009, "hold": hold}
- candidates.append(Candidate("vwap_bidir", bar, base))
- candidates.append(Candidate("vwap_short", bar, base))
- for trend in (96, 192):
- for entry in (90,):
- for hold in (12,):
- candidates.append(
- Candidate(
- "rsi_short",
- bar,
- {"trend": trend, "entry": entry, "exit": 45, "stop": 0.0075, "take": 0.010, "hold": hold},
- )
- )
- for entry in (10,):
- for hold in (12,):
- candidates.append(
- Candidate(
- "rsi_bidir",
- bar,
- {"trend": trend, "entry": entry, "exit": 55, "stop": 0.0075, "take": 0.010, "hold": hold},
- )
- )
- for lookback in (48, 96):
- for hold in (12,):
- candidates.append(
- Candidate(
- "breakdown_short",
- bar,
- {"lookback": lookback, "stop": 0.006, "take": 0.012, "hold": hold},
- )
- )
- return candidates
- def signal_columns(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.Series]:
- close = frame["close"]
- false = pd.Series(False, index=frame.index)
- params = candidate.params
- if candidate.family in ("vwap_bidir", "vwap_short"):
- window = int(params["window"])
- volume = frame["volume"]
- vwap = (close * volume).rolling(window).sum() / volume.rolling(window).sum()
- stdev = close.rolling(window).std(ddof=0)
- zscore = (close - vwap) / stdev
- short_entry = zscore >= float(params["entry_z"])
- long_entry = zscore <= -float(params["entry_z"])
- entry = pd.Series("", index=frame.index, dtype=object)
- entry.loc[short_entry] = "short"
- if candidate.family == "vwap_bidir":
- entry.loc[long_entry] = "long"
- exit_ = (zscore.abs() <= float(params["exit_z"]))
- return entry, exit_
- if candidate.family in ("rsi_short", "rsi_bidir"):
- trend = close.rolling(int(params["trend"])).mean()
- value = rsi(close, 2)
- entry = pd.Series("", index=frame.index, dtype=object)
- entry.loc[(close < trend) & (value >= float(params["entry"]))] = "short"
- if candidate.family == "rsi_bidir":
- entry.loc[(close > trend) & (value <= float(params["entry"]))] = "long"
- exit_ = (value <= 100.0 - float(params["exit"])) | (value >= float(params["exit"]))
- return entry, exit_
- lookback = int(params["lookback"])
- prior_low = frame["low"].shift(1).rolling(lookback).min()
- entry = pd.Series("", index=frame.index, dtype=object)
- entry.loc[close < prior_low] = "short"
- return entry, false
- def close_return(side: str, entry: float, exit_price: float) -> float:
- price_return = exit_price / entry - 1.0 if side == "long" else entry / exit_price - 1.0
- return LEVERAGE * price_return - LEVERAGE * TAKER_FEE * (1.0 + exit_price / entry)
- def mark_return(side: str, entry: float, close: float) -> float:
- price_return = close / entry - 1.0 if side == "long" else entry / close - 1.0
- return LEVERAGE * price_return - LEVERAGE * TAKER_FEE
- def backtest(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.DataFrame]:
- entry_signal, exit_signal = signal_columns(frame, candidate)
- warmup = max(int(value) for key, value in candidate.params.items() if key in {"window", "trend", "lookback"}) + 2
- equity = INITIAL_EQUITY
- position: dict[str, object] | None = None
- pending_entry = ""
- pending_exit = False
- curve: list[tuple[pd.Timestamp, float]] = []
- trades: list[dict[str, object]] = []
- rows = list(frame.itertuples(index=False))
- for index in range(warmup, len(rows)):
- candle = rows[index]
- if pending_exit and position is not None:
- net = close_return(str(position["side"]), float(position["entry"]), float(candle.open))
- equity *= 1.0 + net
- trades.append({"entry_time": position["entry_time"], "exit_time": candle.dt, "side": position["side"], "return": net})
- position = None
- pending_exit = False
- if pending_entry and position is None and equity > 0.0:
- position = {"side": pending_entry, "entry": float(candle.open), "entry_index": index, "entry_time": candle.dt}
- pending_entry = ""
- mark = equity
- if position is not None:
- side = str(position["side"])
- entry = float(position["entry"])
- stop = float(candidate.params["stop"])
- take = float(candidate.params["take"])
- stop_price = entry * (1.0 - stop if side == "long" else 1.0 + stop)
- take_price = entry * (1.0 + take if side == "long" else 1.0 - take)
- stop_hit = candle.low <= stop_price if side == "long" else candle.high >= stop_price
- take_hit = candle.high >= take_price if side == "long" else candle.low <= take_price
- if stop_hit or take_hit:
- exit_price = stop_price if stop_hit else take_price
- net = close_return(side, entry, exit_price)
- equity *= 1.0 + net
- trades.append({"entry_time": position["entry_time"], "exit_time": candle.dt, "side": side, "return": net})
- position = None
- mark = equity
- else:
- mark = equity * (1.0 + mark_return(side, entry, float(candle.close)))
- curve.append((candle.dt, mark))
- if index == len(rows) - 1 or equity <= 0.0:
- continue
- next_entry = str(entry_signal.iloc[index])
- if position is not None:
- reverse = bool(next_entry) and next_entry != position["side"]
- stale = index - int(position["entry_index"]) >= int(candidate.params["hold"])
- if bool(exit_signal.iloc[index]) or reverse or stale:
- pending_exit = True
- pending_entry = next_entry if reverse else ""
- elif next_entry:
- pending_entry = next_entry
- if position is not None:
- final = rows[-1]
- net = close_return(str(position["side"]), float(position["entry"]), float(final.close))
- equity *= 1.0 + net
- trades.append({"entry_time": position["entry_time"], "exit_time": final.dt, "side": position["side"], "return": net})
- curve.append((final.dt, equity))
- return pd.Series(dict(curve)).sort_index(), pd.DataFrame(trades)
- def scoped(equity: pd.Series, trades: pd.DataFrame, offset: pd.DateOffset | None) -> tuple[pd.Series, pd.DataFrame]:
- if offset is None:
- return equity, trades
- start = equity.index[-1] - offset
- scoped_equity = equity[equity.index >= start]
- if len(scoped_equity) < 2:
- scoped_equity = equity
- scoped_trades = trades[trades["entry_time"] >= scoped_equity.index[0]] if len(trades) else trades
- return scoped_equity, scoped_trades
- def metrics(equity: pd.Series, trades: pd.DataFrame) -> dict[str, float | int]:
- total = float(equity.iloc[-1] / equity.iloc[0] - 1.0)
- years = (equity.index[-1] - equity.index[0]).total_seconds() / 31_536_000
- annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0 else 0.0
- drawdown = float(((equity.cummax() - equity) / equity.cummax()).max())
- returns = trades["return"] if len(trades) else pd.Series(dtype=float)
- wins = returns[returns > 0.0]
- losses = returns[returns < 0.0]
- return {
- "total_return": total,
- "annualized_return": annual,
- "max_drawdown": drawdown,
- "calmar": annual / drawdown if drawdown else 0.0,
- "trades": int(len(trades)),
- "short_trades": int((trades["side"] == "short").sum()) if len(trades) else 0,
- "long_trades": int((trades["side"] == "long").sum()) if len(trades) else 0,
- "profit_factor": float(wins.sum() / abs(losses.sum())) if len(losses) else (999.0 if len(wins) else 0.0),
- "win_rate": float(len(wins) / len(returns)) if len(returns) else 0.0,
- }
- def summarize(candidate: Candidate, equity: pd.Series, trades: pd.DataFrame) -> dict[str, object]:
- row: dict[str, object] = {
- "symbol": SYMBOL,
- "bar": candidate.bar,
- "family": candidate.family,
- "name": candidate.name,
- "params_json": json.dumps(candidate.params, separators=(",", ":")),
- "first_time": equity.index[0].strftime("%Y-%m-%d %H:%M"),
- "last_time": equity.index[-1].strftime("%Y-%m-%d %H:%M"),
- }
- for label, offset in HORIZONS:
- part_equity, part_trades = scoped(equity, trades, offset)
- for key, value in metrics(part_equity, part_trades).items():
- row[f"{label}_{key}"] = value
- row["recent_trigger_score"] = int(row["3m_trades"]) + int(row["1m_trades"]) * 2 + int(row["2w_trades"]) * 4
- observe = (
- int(row["3m_trades"]) >= 12
- and int(row["1m_trades"]) >= 4
- and int(row["2w_trades"]) >= 1
- and float(row["3y_total_return"]) > 0.0
- and float(row["1y_total_return"]) > 0.0
- and float(row["3y_max_drawdown"]) <= 0.35
- and float(row["1y_max_drawdown"]) <= 0.25
- )
- row["readonly_observe"] = "yes" if observe else "no"
- return row
- def markdown_table(frame: pd.DataFrame) -> str:
- def cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.4f}"
- return str(value).replace("|", "\\|")
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows)
- def write_report(totals: pd.DataFrame, paths: list[Path], command: str) -> str:
- selected = totals[totals["readonly_observe"] == "yes"].head(12)
- recent = totals.sort_values(["recent_trigger_score", "3y_calmar", "1y_calmar"], ascending=[False, False, False]).head(12)
- least_bad = totals.sort_values(
- ["3y_total_return", "1y_total_return", "3m_total_return", "3m_trades"],
- ascending=[False, False, False, False],
- ).head(12)
- cols = [
- "family",
- "bar",
- "name",
- "full_total_return",
- "full_max_drawdown",
- "full_trades",
- "3y_total_return",
- "3y_max_drawdown",
- "3y_trades",
- "1y_total_return",
- "1y_max_drawdown",
- "1y_trades",
- "3m_trades",
- "1m_trades",
- "2w_trades",
- "readonly_observe",
- ]
- return "\n".join(
- [
- "# ETH high-frequency short/bidirectional candidate search",
- "",
- f"Run command: `{command}`",
- "Scope: local OKX ETH candle CSV only; no live executor, deployment, private API, or order path touched.",
- f"Cost model: taker fee `{TAKER_FEE}` each side on `{LEVERAGE:g}x` notional; entries execute on next open.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Selection rule for `readonly_observe`: 3m >= 12 trades, 1m >= 4 trades, 2w >= 1 trade, positive 3y/1y return, 3y MDD <= 35%, 1y MDD <= 25%.",
- "",
- "## Read-only observation candidates",
- "",
- markdown_table(selected[cols]) if len(selected) else "No candidates passed the read-only observation rule.",
- "",
- "## Least-bad risk rows",
- "",
- markdown_table(least_bad[cols]),
- "",
- "## Most recently active candidates",
- "",
- markdown_table(recent[cols]),
- ]
- ) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bars", nargs="+", default=["3m", "5m", "15m"])
- parser.add_argument("--output-dir", type=Path, default=OUT_DIR)
- parser.add_argument("--years", type=float, default=3.0)
- args = parser.parse_args()
- rows: list[dict[str, object]] = []
- frames = {bar: load_frame(bar, args.years) for bar in args.bars}
- for candidate in build_candidates(args.bars):
- equity, trades = backtest(frames[candidate.bar], candidate)
- if len(equity) < 2:
- continue
- rows.append(summarize(candidate, equity, trades))
- totals = pd.DataFrame(rows).sort_values(
- ["readonly_observe", "3m_trades", "1m_trades", "2w_trades", "3y_calmar", "1y_calmar"],
- ascending=[False, False, False, False, False, False],
- )
- args.output_dir.mkdir(parents=True, exist_ok=True)
- totals_path = args.output_dir / "eth-highfreq-short-bidir-candidates.csv"
- top_path = args.output_dir / "eth-highfreq-short-bidir-top.csv"
- report_path = args.output_dir / "eth-highfreq-short-bidir-report.md"
- paths = [totals_path, top_path, report_path]
- totals.to_csv(totals_path, index=False)
- totals.head(50).to_csv(top_path, index=False)
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bars {' '.join(args.bars)} --years {args.years:g}"
- report_path.write_text(write_report(totals, paths, command), encoding="utf-8")
- print(totals.head(20).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|