| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- from __future__ import annotations
- from dataclasses import dataclass
- from pathlib import Path
- from statistics import median
- import pandas as pd
- from okx_codex_trader.models import Candle
- from okx_codex_trader.sampled_report import (
- SegmentResult,
- generate_sampled_report,
- mark_to_market as _mark_to_market,
- trade_equity as _trade_equity,
- )
- BBMR_STRATEGY_DESCRIPTION = (
- "Bollinger Band mean reversion, bandwidth filter against previous 50 completed values, "
- "close-based return-to-middle exits at next open, intrabar 0.5% stop-loss."
- )
- @dataclass(frozen=True)
- class BBMRConfig:
- band_length: int = 20
- std_multiplier: float = 2.0
- bandwidth_lookback: int = 50
- stop_loss_pct: float = 0.005
- initial_equity: float = 10_000.0
- def _format_ts(ts: int) -> str:
- return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M")
- def run_bbmr_segment(
- *,
- candles: list[Candle],
- leverage: int,
- warmup_bars: int,
- config: BBMRConfig = BBMRConfig(),
- ) -> SegmentResult:
- closes = pd.Series([candle.close for candle in candles], dtype=float)
- middle = closes.rolling(config.band_length).mean().tolist()
- stdev = closes.rolling(config.band_length).std(ddof=0).tolist()
- upper = [
- None if middle_value != middle_value or std_value != std_value else middle_value + config.std_multiplier * std_value
- for middle_value, std_value in zip(middle, stdev)
- ]
- lower = [
- None if middle_value != middle_value or std_value != std_value else middle_value - config.std_multiplier * std_value
- for middle_value, std_value in zip(middle, stdev)
- ]
- bandwidth = [
- None
- if upper_value is None or lower_value is None or middle_value in (None, 0)
- else (upper_value - lower_value) / middle_value
- for upper_value, lower_value, middle_value in zip(upper, lower, middle)
- ]
- equity = config.initial_equity
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_entry_side: str | None = None
- pending_exit = False
- for index in range(warmup_bars, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- exit_price = candle.open
- exit_equity = _trade_equity(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=leverage,
- )
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": _format_ts(int(position["entry_time"])),
- "exit_time": _format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(exit_equity - float(position["margin_used"]), 4),
- "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- if exit_equity > float(position["margin_used"]):
- wins += 1
- equity = exit_equity
- position = None
- pending_exit = False
- if pending_entry_side is not None and position is None:
- entry_price = candle.open
- margin_used = equity
- stop_multiplier = 1 - config.stop_loss_pct if pending_entry_side == "long" else 1 + config.stop_loss_pct
- position = {
- "side": pending_entry_side,
- "entry_time": candle.ts,
- "entry_price": entry_price,
- "entry_index": index,
- "margin_used": margin_used,
- "stop_price": entry_price * stop_multiplier,
- }
- entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side})
- pending_entry_side = None
- current_equity = equity
- if position is not None:
- stop_hit = (
- position["side"] == "long" and candle.low <= float(position["stop_price"])
- ) or (
- position["side"] == "short" and candle.high >= float(position["stop_price"])
- )
- if stop_hit:
- exit_price = float(position["stop_price"])
- exit_equity = _trade_equity(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=leverage,
- )
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": _format_ts(int(position["entry_time"])),
- "exit_time": _format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(exit_equity - float(position["margin_used"]), 4),
- "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- if exit_equity > float(position["margin_used"]):
- wins += 1
- equity = exit_equity
- current_equity = exit_equity
- position = None
- if current_equity > peak_equity:
- peak_equity = current_equity
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- continue
- if position is not None:
- current_equity = _mark_to_market(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=leverage,
- )
- if current_equity > peak_equity:
- peak_equity = current_equity
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1:
- continue
- if position is not None:
- exit_signal = (
- position["side"] == "long" and middle[index] is not None and candle.close >= float(middle[index])
- ) or (
- position["side"] == "short" and middle[index] is not None and candle.close <= float(middle[index])
- )
- if exit_signal:
- pending_exit = True
- continue
- previous_bandwidths = [value for value in bandwidth[max(0, index - config.bandwidth_lookback) : index] if value is not None]
- if len(previous_bandwidths) < config.bandwidth_lookback:
- continue
- current_bandwidth = bandwidth[index]
- if current_bandwidth is None or current_bandwidth > median(previous_bandwidths):
- continue
- if lower[index] is not None and candle.close < float(lower[index]):
- pending_entry_side = "long"
- elif upper[index] is not None and candle.close > float(upper[index]):
- pending_entry_side = "short"
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - config.initial_equity) / config.initial_equity,
- win_rate=(wins / trade_count) if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=candles[warmup_bars:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def generate_bbmr_sampled_report(
- *,
- candles: list[Candle],
- leverage: int,
- output_file: Path,
- symbol: str,
- bar: str,
- segments: int,
- window_size: int,
- ) -> dict[str, object]:
- return generate_sampled_report(
- candles=candles,
- leverage=leverage,
- output_file=output_file,
- symbol=symbol,
- bar=bar,
- segments=segments,
- window_size=window_size,
- report_title="BBMR Sampled Report",
- strategy_label="BBMR",
- strategy_description=BBMR_STRATEGY_DESCRIPTION,
- strategy_params={
- "band_length": BBMRConfig().band_length,
- "std_multiplier": BBMRConfig().std_multiplier,
- "bandwidth_lookback": BBMRConfig().bandwidth_lookback,
- "stop_loss_pct": BBMRConfig().stop_loss_pct,
- },
- run_segment=run_bbmr_segment,
- )
|