bbmr_report.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. from __future__ import annotations
  2. from dataclasses import dataclass
  3. from pathlib import Path
  4. from statistics import median
  5. import pandas as pd
  6. from okx_codex_trader.models import Candle
  7. from okx_codex_trader.sampled_report import (
  8. SegmentResult,
  9. generate_sampled_report,
  10. mark_to_market as _mark_to_market,
  11. trade_equity as _trade_equity,
  12. )
  13. BBMR_STRATEGY_DESCRIPTION = (
  14. "Bollinger Band mean reversion, bandwidth filter against previous 50 completed values, "
  15. "close-based return-to-middle exits at next open, intrabar 0.5% stop-loss."
  16. )
  17. @dataclass(frozen=True)
  18. class BBMRConfig:
  19. band_length: int = 20
  20. std_multiplier: float = 2.0
  21. bandwidth_lookback: int = 50
  22. stop_loss_pct: float = 0.005
  23. initial_equity: float = 10_000.0
  24. def _format_ts(ts: int) -> str:
  25. return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M")
  26. def run_bbmr_segment(
  27. *,
  28. candles: list[Candle],
  29. leverage: int,
  30. warmup_bars: int,
  31. config: BBMRConfig = BBMRConfig(),
  32. ) -> SegmentResult:
  33. closes = pd.Series([candle.close for candle in candles], dtype=float)
  34. middle = closes.rolling(config.band_length).mean().tolist()
  35. stdev = closes.rolling(config.band_length).std(ddof=0).tolist()
  36. upper = [
  37. None if middle_value != middle_value or std_value != std_value else middle_value + config.std_multiplier * std_value
  38. for middle_value, std_value in zip(middle, stdev)
  39. ]
  40. lower = [
  41. None if middle_value != middle_value or std_value != std_value else middle_value - config.std_multiplier * std_value
  42. for middle_value, std_value in zip(middle, stdev)
  43. ]
  44. bandwidth = [
  45. None
  46. if upper_value is None or lower_value is None or middle_value in (None, 0)
  47. else (upper_value - lower_value) / middle_value
  48. for upper_value, lower_value, middle_value in zip(upper, lower, middle)
  49. ]
  50. equity = config.initial_equity
  51. ending_equity = equity
  52. peak_equity = equity
  53. max_drawdown = 0.0
  54. wins = 0
  55. trades: list[dict[str, object]] = []
  56. entries: list[dict[str, object]] = []
  57. exits: list[dict[str, object]] = []
  58. equity_curve: list[dict[str, float | int]] = []
  59. position: dict[str, object] | None = None
  60. pending_entry_side: str | None = None
  61. pending_exit = False
  62. for index in range(warmup_bars, len(candles)):
  63. candle = candles[index]
  64. if pending_exit and position is not None:
  65. exit_price = candle.open
  66. exit_equity = _trade_equity(
  67. side=str(position["side"]),
  68. margin_used=float(position["margin_used"]),
  69. entry_price=float(position["entry_price"]),
  70. exit_price=exit_price,
  71. leverage=leverage,
  72. )
  73. trades.append(
  74. {
  75. "side": "Long" if position["side"] == "long" else "Short",
  76. "entry_time": _format_ts(int(position["entry_time"])),
  77. "exit_time": _format_ts(candle.ts),
  78. "entry_price": round(float(position["entry_price"]), 4),
  79. "exit_price": round(exit_price, 4),
  80. "pnl": round(exit_equity - float(position["margin_used"]), 4),
  81. "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4),
  82. }
  83. )
  84. exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
  85. if exit_equity > float(position["margin_used"]):
  86. wins += 1
  87. equity = exit_equity
  88. position = None
  89. pending_exit = False
  90. if pending_entry_side is not None and position is None:
  91. entry_price = candle.open
  92. margin_used = equity
  93. stop_multiplier = 1 - config.stop_loss_pct if pending_entry_side == "long" else 1 + config.stop_loss_pct
  94. position = {
  95. "side": pending_entry_side,
  96. "entry_time": candle.ts,
  97. "entry_price": entry_price,
  98. "entry_index": index,
  99. "margin_used": margin_used,
  100. "stop_price": entry_price * stop_multiplier,
  101. }
  102. entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side})
  103. pending_entry_side = None
  104. current_equity = equity
  105. if position is not None:
  106. stop_hit = (
  107. position["side"] == "long" and candle.low <= float(position["stop_price"])
  108. ) or (
  109. position["side"] == "short" and candle.high >= float(position["stop_price"])
  110. )
  111. if stop_hit:
  112. exit_price = float(position["stop_price"])
  113. exit_equity = _trade_equity(
  114. side=str(position["side"]),
  115. margin_used=float(position["margin_used"]),
  116. entry_price=float(position["entry_price"]),
  117. exit_price=exit_price,
  118. leverage=leverage,
  119. )
  120. trades.append(
  121. {
  122. "side": "Long" if position["side"] == "long" else "Short",
  123. "entry_time": _format_ts(int(position["entry_time"])),
  124. "exit_time": _format_ts(candle.ts),
  125. "entry_price": round(float(position["entry_price"]), 4),
  126. "exit_price": round(exit_price, 4),
  127. "pnl": round(exit_equity - float(position["margin_used"]), 4),
  128. "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4),
  129. }
  130. )
  131. exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
  132. if exit_equity > float(position["margin_used"]):
  133. wins += 1
  134. equity = exit_equity
  135. current_equity = exit_equity
  136. position = None
  137. if current_equity > peak_equity:
  138. peak_equity = current_equity
  139. max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
  140. equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
  141. ending_equity = current_equity
  142. continue
  143. if position is not None:
  144. current_equity = _mark_to_market(
  145. side=str(position["side"]),
  146. margin_used=float(position["margin_used"]),
  147. entry_price=float(position["entry_price"]),
  148. mark_price=candle.close,
  149. leverage=leverage,
  150. )
  151. if current_equity > peak_equity:
  152. peak_equity = current_equity
  153. max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
  154. equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
  155. ending_equity = current_equity
  156. if index == len(candles) - 1:
  157. continue
  158. if position is not None:
  159. exit_signal = (
  160. position["side"] == "long" and middle[index] is not None and candle.close >= float(middle[index])
  161. ) or (
  162. position["side"] == "short" and middle[index] is not None and candle.close <= float(middle[index])
  163. )
  164. if exit_signal:
  165. pending_exit = True
  166. continue
  167. previous_bandwidths = [value for value in bandwidth[max(0, index - config.bandwidth_lookback) : index] if value is not None]
  168. if len(previous_bandwidths) < config.bandwidth_lookback:
  169. continue
  170. current_bandwidth = bandwidth[index]
  171. if current_bandwidth is None or current_bandwidth > median(previous_bandwidths):
  172. continue
  173. if lower[index] is not None and candle.close < float(lower[index]):
  174. pending_entry_side = "long"
  175. elif upper[index] is not None and candle.close > float(upper[index]):
  176. pending_entry_side = "short"
  177. trade_count = len(trades)
  178. return SegmentResult(
  179. trade_count=trade_count,
  180. total_return=(ending_equity - config.initial_equity) / config.initial_equity,
  181. win_rate=(wins / trade_count) if trade_count else 0.0,
  182. max_drawdown=max_drawdown,
  183. trades=trades,
  184. open_position=position,
  185. candles=candles[warmup_bars:],
  186. equity_curve=equity_curve,
  187. entries=entries,
  188. exits=exits,
  189. )
  190. def generate_bbmr_sampled_report(
  191. *,
  192. candles: list[Candle],
  193. leverage: int,
  194. output_file: Path,
  195. symbol: str,
  196. bar: str,
  197. segments: int,
  198. window_size: int,
  199. ) -> dict[str, object]:
  200. return generate_sampled_report(
  201. candles=candles,
  202. leverage=leverage,
  203. output_file=output_file,
  204. symbol=symbol,
  205. bar=bar,
  206. segments=segments,
  207. window_size=window_size,
  208. report_title="BBMR Sampled Report",
  209. strategy_label="BBMR",
  210. strategy_description=BBMR_STRATEGY_DESCRIPTION,
  211. strategy_params={
  212. "band_length": BBMRConfig().band_length,
  213. "std_multiplier": BBMRConfig().std_multiplier,
  214. "bandwidth_lookback": BBMRConfig().bandwidth_lookback,
  215. "stop_loss_pct": BBMRConfig().stop_loss_pct,
  216. },
  217. run_segment=run_bbmr_segment,
  218. )