from __future__ import annotations from dataclasses import dataclass from pathlib import Path from statistics import median import pandas as pd from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import ( SegmentResult, generate_sampled_report, mark_to_market as _mark_to_market, trade_equity as _trade_equity, ) WARMUP_BARS = 69 @dataclass(frozen=True) class BBSBConfig: band_length: int = 20 std_multiplier: float = 2.0 bandwidth_lookback: int = 50 take_profit_pct: float = 0.01 stop_loss_pct: float = 0.005 initial_equity: float = 10_000.0 BBSB_STRATEGY_DESCRIPTION = ( "Bollinger Band squeeze breakout, bandwidth filter against previous 50 completed values, " "next-open entries, intrabar 1.0% take-profit, intrabar 0.5% stop-loss." ) def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def run_bbsb_segment( *, candles: list[Candle], leverage: int, warmup_bars: int, config: BBSBConfig = BBSBConfig(), ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) middle = closes.rolling(config.band_length).mean().tolist() stdev = closes.rolling(config.band_length).std(ddof=0).tolist() upper = [ None if middle_value != middle_value or std_value != std_value else middle_value + config.std_multiplier * std_value for middle_value, std_value in zip(middle, stdev) ] lower = [ None if middle_value != middle_value or std_value != std_value else middle_value - config.std_multiplier * std_value for middle_value, std_value in zip(middle, stdev) ] bandwidth = [ None if upper_value is None or lower_value is None or middle_value in (None, 0) else (upper_value - lower_value) / middle_value for upper_value, lower_value, middle_value in zip(upper, lower, middle) ] equity = config.initial_equity ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_entry_side is not None and position is None: entry_price = candle.open margin_used = equity position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": entry_price, "entry_index": index, "margin_used": margin_used, "stop_price": entry_price * (1 - config.stop_loss_pct if pending_entry_side == "long" else 1 + config.stop_loss_pct), "take_profit_price": entry_price * (1 + config.take_profit_pct if pending_entry_side == "long" else 1 - config.take_profit_pct), } entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: stop_hit = ( position["side"] == "long" and candle.low <= float(position["stop_price"]) ) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: exit_price = float(position["stop_price"]) exit_equity = _trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(exit_equity - float(position["margin_used"]), 4), "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) if exit_equity > float(position["margin_used"]): wins += 1 equity = exit_equity current_equity = exit_equity position = None if current_equity > peak_equity: peak_equity = current_equity max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity continue take_profit_hit = ( position["side"] == "long" and candle.high >= float(position["take_profit_price"]) ) or ( position["side"] == "short" and candle.low <= float(position["take_profit_price"]) ) if take_profit_hit: exit_price = float(position["take_profit_price"]) exit_equity = _trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(exit_equity - float(position["margin_used"]), 4), "return_pct": round((exit_equity - float(position["margin_used"])) / float(position["margin_used"]) * 100, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) if exit_equity > float(position["margin_used"]): wins += 1 equity = exit_equity current_equity = exit_equity position = None if current_equity > peak_equity: peak_equity = current_equity max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity continue if position is not None: current_equity = _mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) if current_equity > peak_equity: peak_equity = current_equity max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or position is not None: continue previous_bandwidths = [value for value in bandwidth[max(0, index - config.bandwidth_lookback) : index] if value is not None] if len(previous_bandwidths) < config.bandwidth_lookback: continue current_bandwidth = bandwidth[index] if current_bandwidth is None or current_bandwidth > median(previous_bandwidths): continue if upper[index] is not None and candle.close > float(upper[index]): pending_entry_side = "long" elif lower[index] is not None and candle.close < float(lower[index]): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - config.initial_equity) / config.initial_equity, win_rate=(wins / trade_count) if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def generate_bbsb_sampled_report( *, candles: list[Candle], leverage: int, output_file: Path, symbol: str, bar: str, segments: int, window_size: int, ) -> dict[str, object]: return generate_sampled_report( candles=candles, leverage=leverage, output_file=output_file, symbol=symbol, bar=bar, segments=segments, window_size=window_size, report_title="BBSB Sampled Report", strategy_label="BBSB", strategy_description=BBSB_STRATEGY_DESCRIPTION, strategy_params={ "band_length": BBSBConfig().band_length, "std_multiplier": BBSBConfig().std_multiplier, "bandwidth_lookback": BBSBConfig().bandwidth_lookback, "take_profit_pct": BBSBConfig().take_profit_pct, "stop_loss_pct": BBSBConfig().stop_loss_pct, }, run_segment=run_bbsb_segment, warmup_bars=WARMUP_BARS, )