search_short_bias_intraday.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. from __future__ import annotations
  2. from dataclasses import dataclass
  3. from math import sqrt
  4. from pathlib import Path
  5. import pandas as pd
  6. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  7. BARS = ("3m", "5m", "15m")
  8. DATA_DIR = Path("data/okx-candles")
  9. OUT_DIR = Path("reports/short-bias")
  10. INITIAL_EQUITY = 10_000.0
  11. LEVERAGE = 3.0
  12. TAKER_FEE = 0.0004
  13. MIN_TRADES = 30
  14. BACKTEST_MONTHS = 36
  15. @dataclass(frozen=True)
  16. class Candidate:
  17. family: str
  18. name: str
  19. warmup: int
  20. params: dict[str, float | int]
  21. @dataclass(frozen=True)
  22. class BacktestResult:
  23. equity: pd.DataFrame
  24. trades: pd.DataFrame
  25. def load_frame(symbol: str, bar: str) -> pd.DataFrame:
  26. frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv")
  27. frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
  28. frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
  29. cutoff = frame["dt"].iloc[-1] - pd.DateOffset(months=BACKTEST_MONTHS)
  30. return frame[frame["dt"] >= cutoff].reset_index(drop=True)
  31. def rsi(series: pd.Series, length: int) -> pd.Series:
  32. delta = series.diff()
  33. gain = delta.clip(lower=0.0).rolling(length).mean()
  34. loss = (-delta.clip(upper=0.0)).rolling(length).mean()
  35. rs = gain / loss
  36. return 100.0 - (100.0 / (1.0 + rs))
  37. def build_candidates() -> list[Candidate]:
  38. candidates: list[Candidate] = []
  39. for trend in (80, 160):
  40. for entry in (90, 94):
  41. for exit_rsi in (35,):
  42. for max_hold in (12, 32):
  43. params = {"trend": trend, "entry": entry, "exit": exit_rsi, "max_hold": max_hold}
  44. candidates.append(Candidate("rsi2_short", f"rsi2-short-t{trend}-e{entry}-x{exit_rsi}-h{max_hold}", trend + 3, params))
  45. for lookback in (16, 32, 64):
  46. for stop, take in ((0.005, 0.010),):
  47. for max_hold in (12, 32):
  48. params = {"lookback": lookback, "stop": stop, "take": take, "max_hold": max_hold}
  49. candidates.append(Candidate("breakdown_short", f"breakdown-short-l{lookback}-sl{stop}-tp{take}-h{max_hold}", lookback + 1, params))
  50. for window in (48, 96):
  51. for entry_z in (1.8, 2.2):
  52. for exit_z in (0.0,):
  53. for max_hold in (12, 32):
  54. params = {"window": window, "entry_z": entry_z, "exit_z": exit_z, "max_hold": max_hold}
  55. candidates.append(Candidate("vwap_revert_short", f"vwap-short-w{window}-e{entry_z}-x{exit_z}-h{max_hold}", window * 2, params))
  56. for length in (40, 80):
  57. for std in (2.0,):
  58. for trend in (160,):
  59. for max_hold in (12, 32):
  60. params = {"length": length, "std": std, "trend": trend, "max_hold": max_hold}
  61. candidates.append(Candidate("bb_short", f"bb-short-l{length}-s{std}-t{trend}-h{max_hold}", max(length, trend) + 1, params))
  62. return candidates
  63. def exit_equity(start_equity: float, entry: float, exit_price: float) -> tuple[float, float]:
  64. entry_notional = start_equity * LEVERAGE
  65. exit_notional = entry_notional * (exit_price / entry)
  66. pnl = start_equity * LEVERAGE * ((entry - exit_price) / entry)
  67. fees = TAKER_FEE * (entry_notional + exit_notional)
  68. return start_equity + pnl - fees, pnl - fees
  69. def mark_equity(start_equity: float, entry: float, mark: float) -> float:
  70. entry_fee = TAKER_FEE * start_equity * LEVERAGE
  71. pnl = start_equity * LEVERAGE * ((entry - mark) / entry)
  72. return start_equity + pnl - entry_fee
  73. def signal_columns(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.Series, pd.Series]:
  74. close = frame["close"]
  75. false = pd.Series(False, index=frame.index)
  76. if candidate.family == "rsi2_short":
  77. trend = close.rolling(int(candidate.params["trend"])).mean()
  78. value = rsi(close, 2)
  79. entry = (close < trend) & (value >= float(candidate.params["entry"]))
  80. exit_signal = value <= float(candidate.params["exit"])
  81. return entry, exit_signal, false
  82. if candidate.family == "breakdown_short":
  83. lookback = int(candidate.params["lookback"])
  84. low = frame["low"].shift(1).rolling(lookback).min()
  85. entry = close < low
  86. return entry, false, false
  87. if candidate.family == "vwap_revert_short":
  88. window = int(candidate.params["window"])
  89. volume = frame["volume"]
  90. vwap = (close * volume).rolling(window).sum() / volume.rolling(window).sum()
  91. deviation = (close - vwap) / vwap
  92. zscore = deviation / deviation.rolling(window).std(ddof=0)
  93. entry = zscore >= float(candidate.params["entry_z"])
  94. exit_signal = zscore <= float(candidate.params["exit_z"])
  95. return entry, exit_signal, false
  96. length = int(candidate.params["length"])
  97. middle = close.rolling(length).mean()
  98. upper = middle + float(candidate.params["std"]) * close.rolling(length).std(ddof=0)
  99. trend = close.rolling(int(candidate.params["trend"])).mean()
  100. entry = (close > upper) & (close < trend)
  101. exit_signal = close <= middle
  102. return entry, exit_signal, false
  103. def backtest(frame: pd.DataFrame, candidate: Candidate) -> BacktestResult:
  104. entry_signal, exit_signal, _ = signal_columns(frame, candidate)
  105. ts_values = frame["ts"].to_numpy()
  106. dt_values = frame["dt"].to_numpy()
  107. open_values = frame["open"].to_numpy(dtype=float)
  108. high_values = frame["high"].to_numpy(dtype=float)
  109. low_values = frame["low"].to_numpy(dtype=float)
  110. close_values = frame["close"].to_numpy(dtype=float)
  111. entry_values = entry_signal.fillna(False).to_numpy(dtype=bool)
  112. exit_values = exit_signal.fillna(False).to_numpy(dtype=bool)
  113. equity = INITIAL_EQUITY
  114. peak = equity
  115. equity_rows: list[dict[str, object]] = []
  116. trade_rows: list[dict[str, object]] = []
  117. position: dict[str, float | int] | None = None
  118. pending_entry = False
  119. pending_exit = False
  120. for index in range(candidate.warmup, len(frame)):
  121. if pending_exit and position is not None:
  122. new_equity, pnl = exit_equity(equity, float(position["entry"]), float(open_values[index]))
  123. trade_rows.append(
  124. {
  125. "entry_time": dt_values[int(position["entry_index"])],
  126. "exit_time": dt_values[index],
  127. "side": "short",
  128. "entry": float(position["entry"]),
  129. "exit": float(open_values[index]),
  130. "pnl": pnl,
  131. "return": new_equity / equity - 1.0,
  132. "bars": index - int(position["entry_index"]),
  133. }
  134. )
  135. equity = new_equity
  136. position = None
  137. pending_exit = False
  138. if pending_entry and position is None and equity > 0.0:
  139. position = {"entry": float(open_values[index]), "entry_index": index}
  140. pending_entry = False
  141. current_equity = equity
  142. if position is not None:
  143. entry = float(position["entry"])
  144. stop = float(candidate.params.get("stop", 0.0))
  145. take = float(candidate.params.get("take", 0.0))
  146. exit_price: float | None = None
  147. if stop and float(high_values[index]) >= entry * (1.0 + stop):
  148. exit_price = entry * (1.0 + stop)
  149. elif take and float(low_values[index]) <= entry * (1.0 - take):
  150. exit_price = entry * (1.0 - take)
  151. if exit_price is not None:
  152. new_equity, pnl = exit_equity(equity, entry, exit_price)
  153. trade_rows.append(
  154. {
  155. "entry_time": dt_values[int(position["entry_index"])],
  156. "exit_time": dt_values[index],
  157. "side": "short",
  158. "entry": entry,
  159. "exit": exit_price,
  160. "pnl": pnl,
  161. "return": new_equity / equity - 1.0,
  162. "bars": index - int(position["entry_index"]),
  163. }
  164. )
  165. equity = new_equity
  166. current_equity = equity
  167. position = None
  168. else:
  169. current_equity = mark_equity(equity, entry, float(close_values[index]))
  170. peak = max(peak, current_equity)
  171. equity_rows.append({"ts": ts_values[index], "dt": dt_values[index], "equity": current_equity, "drawdown": (peak - current_equity) / peak})
  172. if index == len(frame) - 1 or equity <= 0.0:
  173. continue
  174. if position is not None:
  175. held = index - int(position["entry_index"])
  176. if bool(exit_values[index]) or held >= int(candidate.params["max_hold"]):
  177. pending_exit = True
  178. continue
  179. if bool(entry_values[index]):
  180. pending_entry = True
  181. return BacktestResult(pd.DataFrame(equity_rows), pd.DataFrame(trade_rows))
  182. def annualized(total_return: float, start: pd.Timestamp, end: pd.Timestamp) -> float:
  183. years = (end - start).total_seconds() / 31_536_000
  184. if years <= 0.0:
  185. return 0.0
  186. if total_return <= -1.0:
  187. return -1.0
  188. return (1.0 + total_return) ** (1.0 / years) - 1.0
  189. def horizon_return(equity: pd.DataFrame, months: int) -> float:
  190. end = equity["dt"].iloc[-1]
  191. cutoff = end - pd.DateOffset(months=months)
  192. before = equity[equity["dt"] <= cutoff]
  193. start_equity = float(before["equity"].iloc[-1]) if len(before) else float(equity["equity"].iloc[0])
  194. return float(equity["equity"].iloc[-1]) / start_equity - 1.0
  195. def monthly_returns(equity: pd.DataFrame, key: dict[str, object]) -> pd.DataFrame:
  196. monthly = equity.set_index("dt")["equity"].resample("ME").last().ffill().pct_change().dropna()
  197. rows = [{**key, "month": index.strftime("%Y-%m"), "return": value} for index, value in monthly.items()]
  198. return pd.DataFrame(rows)
  199. def summarize(symbol: str, bar: str, candidate: Candidate, result: BacktestResult) -> dict[str, object]:
  200. equity = result.equity
  201. trades = result.trades
  202. total_return = float(equity["equity"].iloc[-1] / equity["equity"].iloc[0] - 1.0)
  203. ann = annualized(total_return, equity["dt"].iloc[0], equity["dt"].iloc[-1])
  204. max_dd = float(equity["drawdown"].max())
  205. wins = trades[trades["return"] > 0.0] if len(trades) else trades
  206. losses = trades[trades["return"] < 0.0] if len(trades) else trades
  207. gross_profit = float(wins["return"].sum()) if len(wins) else 0.0
  208. gross_loss = abs(float(losses["return"].sum())) if len(losses) else 0.0
  209. avg_win = float(wins["return"].mean()) if len(wins) else 0.0
  210. avg_loss = abs(float(losses["return"].mean())) if len(losses) else 0.0
  211. return {
  212. "symbol": symbol,
  213. "bar": bar,
  214. "family": candidate.family,
  215. "candidate": candidate.name,
  216. "total_return": total_return,
  217. "annualized_return": ann,
  218. "max_drawdown": max_dd,
  219. "calmar": ann / max_dd if max_dd else 0.0,
  220. "trades": int(len(trades)),
  221. "win_rate": float(len(wins) / len(trades)) if len(trades) else 0.0,
  222. "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0,
  223. "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
  224. "short_trade_ratio": 1.0 if len(trades) else 0.0,
  225. "return_3y": horizon_return(equity, 36),
  226. "return_1y": horizon_return(equity, 12),
  227. "return_6m": horizon_return(equity, 6),
  228. "return_3m": horizon_return(equity, 3),
  229. "first_time": equity["dt"].iloc[0].strftime("%Y-%m-%d %H:%M"),
  230. "last_time": equity["dt"].iloc[-1].strftime("%Y-%m-%d %H:%M"),
  231. }
  232. def worth_label(row: pd.Series) -> str:
  233. if row["trades"] >= MIN_TRADES and row["total_return"] > 0.0 and row["calmar"] > 0.25 and row["return_1y"] > 0.0:
  234. return "continue"
  235. return "stop"
  236. def main() -> int:
  237. OUT_DIR.mkdir(parents=True, exist_ok=True)
  238. candidates = build_candidates()
  239. total_rows: list[dict[str, object]] = []
  240. monthly_frames: list[pd.DataFrame] = []
  241. for symbol in SYMBOLS:
  242. for bar in BARS:
  243. frame = load_frame(symbol, bar)
  244. for candidate in candidates:
  245. result = backtest(frame, candidate)
  246. if len(result.equity) == 0:
  247. continue
  248. row = summarize(symbol, bar, candidate, result)
  249. if row["trades"] < MIN_TRADES:
  250. continue
  251. total_rows.append(row)
  252. monthly_frames.append(monthly_returns(result.equity, {key: row[key] for key in ("symbol", "bar", "family", "candidate")}))
  253. totals = pd.DataFrame(total_rows)
  254. totals = totals.sort_values(["calmar", "annualized_return", "profit_factor"], ascending=False).reset_index(drop=True)
  255. totals["worth_continuing"] = totals.apply(worth_label, axis=1)
  256. top3 = totals.head(3).copy()
  257. monthly = pd.concat(monthly_frames, ignore_index=True) if monthly_frames else pd.DataFrame()
  258. top_monthly = monthly.merge(top3[["symbol", "bar", "family", "candidate"]], on=["symbol", "bar", "family", "candidate"], how="inner")
  259. totals.to_csv(OUT_DIR / "intraday-totals.csv", index=False)
  260. top3.to_csv(OUT_DIR / "intraday-top3.csv", index=False)
  261. monthly.to_csv(OUT_DIR / "intraday-monthly.csv", index=False)
  262. top_monthly.to_csv(OUT_DIR / "intraday-top3-monthly.csv", index=False)
  263. lines = [
  264. "# Short-Bias Intraday Search",
  265. "",
  266. f"Universe: {', '.join(SYMBOLS)} on {', '.join(BARS)}. All candidates are short-only; short trade ratio is therefore 1.0 for traded rows.",
  267. f"Cost model: {TAKER_FEE:.2%} single-side taker fee on leveraged notional, charged on entry and exit; leverage {LEVERAGE:g}x.",
  268. "",
  269. "## Top 3",
  270. "",
  271. ]
  272. for index, row in top3.iterrows():
  273. lines.append(
  274. f"{index + 1}. {row.symbol} {row.bar} {row.candidate}: total {row.total_return:.2%}, annualized {row.annualized_return:.2%}, "
  275. f"max DD {row.max_drawdown:.2%}, Calmar {row.calmar:.2f}, trades {int(row.trades)}, win {row.win_rate:.2%}, "
  276. f"PF {row.profit_factor:.2f}, 3y {row.return_3y:.2%}, 1y {row.return_1y:.2%}, 6m {row.return_6m:.2%}, 3m {row.return_3m:.2%}, "
  277. f"decision {row.worth_continuing}."
  278. )
  279. lines.extend(
  280. [
  281. "",
  282. "## Continue",
  283. "",
  284. "Continue only candidates marked `continue`; the rest fail the direct requirement of positive full-period and recent 1y net return with enough trades.",
  285. ]
  286. )
  287. (OUT_DIR / "intraday-report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  288. print(top3.to_string(index=False))
  289. return 0
  290. if __name__ == "__main__":
  291. raise SystemExit(main())