from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import pandas as pd from okx_codex_trader.candles import align_candles_by_ts, load_candles_csv from okx_codex_trader.models import Candle from okx_codex_trader.research_metrics import ( DEFAULT_COSTS, DEFAULT_INITIAL_EQUITY, DEFAULT_PRIMARY_COST, cost_equity_frame, equity_metrics, format_utc_ts, trade_stats, ) from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from okx_codex_trader.time_rules import entry_allowed, is_us_open_window ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") INITIAL_EQUITY = DEFAULT_INITIAL_EQUITY PRIMARY_COST = DEFAULT_PRIMARY_COST COSTS = DEFAULT_COSTS LEVERAGE = 3 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ("21d", pd.DateOffset(days=21)), ) @dataclass(frozen=True) class Position: side: str entry_ts: int entry_price: float margin_used: float stop_price: float take_price: float mfe_pct: float def fmt(ts: int) -> str: return format_utc_ts(ts) def close_trade(trades: list[dict[str, object]], position: Position, candle: Candle, exit_price: float, reason: str) -> tuple[float, bool]: exit_equity = trade_equity( side=position.side, margin_used=position.margin_used, entry_price=position.entry_price, exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - position.margin_used trades.append( { "side": "Long" if position.side == "long" else "Short", "entry_time": fmt(position.entry_ts), "exit_time": fmt(candle.ts), "exit_ts": candle.ts, "entry_price": round(position.entry_price, 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / position.margin_used * 100.0, 4), "cost_weight": 1.0, "exit_reason": reason, "mfe_pct": round(position.mfe_pct * 100.0, 4), } ) return exit_equity, pnl > 0.0 def favorable_move(side: str, entry_price: float, candle: Candle) -> float: if side == "long": return candle.high / entry_price - 1.0 return entry_price / candle.low - 1.0 def risk_exit(position: Position, candle: Candle, use_breakeven: bool) -> tuple[float, str] | None: stop = position.stop_price reason = "stop" if use_breakeven and position.mfe_pct >= 0.008: breakeven = position.entry_price * (1.001 if position.side == "long" else 0.999) stop = max(stop, breakeven) if position.side == "long" else min(stop, breakeven) reason = "breakeven" if position.side == "long": if candle.open <= stop: return candle.open, f"{reason}_gap" if reason != "stop" else "stop_gap" if candle.open >= position.take_price: return candle.open, "take_gap" if candle.low <= stop: return stop, reason if candle.high >= position.take_price: return position.take_price, "take_profit" else: if candle.open >= stop: return candle.open, f"{reason}_gap" if reason != "stop" else "stop_gap" if candle.open <= position.take_price: return candle.open, "take_gap" if candle.high >= stop: return stop, reason if candle.low <= position.take_price: return position.take_price, "take_profit" return None def build_15m_signals(eth: list[Candle], btc: list[Candle]) -> pd.DataFrame: eth_close = pd.Series([c.close for c in eth], dtype=float) btc_close = pd.Series([c.close for c in btc], dtype=float) middle = eth_close.rolling(96).mean() stdev = eth_close.rolling(96).std(ddof=0) upper = middle + 2.0 * stdev lower = middle - 2.0 * stdev bandwidth = (upper - lower) / middle threshold = bandwidth.rolling(960).quantile(0.25) btc_sma = btc_close.rolling(480).mean() eth_vol = eth_close.pct_change().rolling(96).std(ddof=0) return pd.DataFrame( { "ts": [c.ts for c in eth], "middle": middle, "upper": upper, "lower": lower, "bandwidth": bandwidth, "threshold": threshold, "btc_close": btc_close, "btc_sma": btc_sma, "eth_vol": eth_vol, } ) def run_15m_baseline(eth: list[Candle], btc: list[Candle]) -> SegmentResult: indicators = build_15m_signals(eth, btc) equity = INITIAL_EQUITY peak = equity max_dd = 0.0 wins = 0 trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: Position | None = None pending_side: str | None = None pending_middle_exit = False cooldown_until = -1 warmup = 960 for index in range(warmup, len(eth)): candle = eth[index] if pending_middle_exit and position is not None: equity, won = close_trade(trades, position, candle, candle.open, "signal_middle") wins += int(won) position = None pending_middle_exit = False cooldown_until = index + 24 if pending_side is not None and position is None: entry = candle.open position = Position( side=pending_side, entry_ts=candle.ts, entry_price=entry, margin_used=equity, stop_price=entry * (0.99 if pending_side == "long" else 1.01), take_price=entry * (1.03 if pending_side == "long" else 0.97), mfe_pct=0.0, ) pending_side = None current = equity if position is not None: out = risk_exit(position, candle, True) if out is not None: price, reason = out equity, won = close_trade(trades, position, candle, price, reason) wins += int(won) current = equity position = None cooldown_until = index + 24 if position is not None: position = Position(**{**position.__dict__, "mfe_pct": max(position.mfe_pct, favorable_move(position.side, position.entry_price, candle))}) current = mark_to_market( side=position.side, margin_used=position.margin_used, entry_price=position.entry_price, mark_price=candle.close, leverage=LEVERAGE, ) peak = max(peak, current) max_dd = max(max_dd, (peak - current) / peak) equity_curve.append({"ts": candle.ts, "equity": current, "close": candle.close}) if index == len(eth) - 1: continue row = indicators.iloc[index] if any(value != value for value in (row.middle, row.upper, row.lower, row.bandwidth, row.threshold, row.btc_sma, row.eth_vol)): continue if position is not None: middle_exit = (position.side == "long" and candle.close < row.middle * 0.999) or (position.side == "short" and candle.close > row.middle * 1.001) if middle_exit and not is_us_open_window(candle.ts): pending_middle_exit = True continue if index < cooldown_until or row.eth_vol > 0.006 or not entry_allowed(candle.ts, "weekday") or not row.btc_close > row.btc_sma: continue if row.bandwidth <= row.threshold: if candle.close > row.upper: pending_side = "long" elif candle.close < row.lower: pending_side = "short" return SegmentResult(len(trades), (equity_curve[-1]["equity"] - INITIAL_EQUITY) / INITIAL_EQUITY, wins / len(trades) if trades else 0.0, max_dd, trades, position.__dict__ if position else None, eth[warmup:], equity_curve, [], []) def run_3m_execution(eth15: list[Candle], btc15: list[Candle], eth3: list[Candle]) -> SegmentResult: indicators = build_15m_signals(eth15, btc15) three_by_window: dict[int, list[Candle]] = {} for candle in eth3: parent_ts = candle.ts - (candle.ts % 900_000) three_by_window.setdefault(parent_ts, []).append(candle) equity = INITIAL_EQUITY peak = equity max_dd = 0.0 wins = 0 trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: Position | None = None pending_side: str | None = None pending_middle_exit = False cooldown_until = -1 warmup = 960 for index in range(warmup, len(eth15)): candle15 = eth15[index] subcandles = [c for c in three_by_window.get(candle15.ts, []) if candle15.ts <= c.ts < candle15.ts + 900_000] if not subcandles: subcandles = [candle15] if pending_middle_exit and position is not None: equity, won = close_trade(trades, position, subcandles[0], subcandles[0].open, "signal_middle") wins += int(won) position = None pending_middle_exit = False cooldown_until = index + 24 if pending_side is not None and position is None: entry = subcandles[0].open position = Position( side=pending_side, entry_ts=subcandles[0].ts, entry_price=entry, margin_used=equity, stop_price=entry * (0.99 if pending_side == "long" else 1.01), take_price=entry * (1.03 if pending_side == "long" else 0.97), mfe_pct=0.0, ) pending_side = None current = equity if position is not None: for sub in subcandles: out = risk_exit(position, sub, True) if out is not None: price, reason = out equity, won = close_trade(trades, position, sub, price, f"3m_{reason}") wins += int(won) current = equity position = None cooldown_until = index + 24 break position = Position(**{**position.__dict__, "mfe_pct": max(position.mfe_pct, favorable_move(position.side, position.entry_price, sub))}) if position is not None: current = mark_to_market( side=position.side, margin_used=position.margin_used, entry_price=position.entry_price, mark_price=candle15.close, leverage=LEVERAGE, ) peak = max(peak, current) max_dd = max(max_dd, (peak - current) / peak) equity_curve.append({"ts": candle15.ts, "equity": current, "close": candle15.close}) if index == len(eth15) - 1: continue row = indicators.iloc[index] if any(value != value for value in (row.middle, row.upper, row.lower, row.bandwidth, row.threshold, row.btc_sma, row.eth_vol)): continue if position is not None: middle_exit = (position.side == "long" and candle15.close < row.middle * 0.999) or (position.side == "short" and candle15.close > row.middle * 1.001) if middle_exit and not is_us_open_window(candle15.ts): pending_middle_exit = True continue if index < cooldown_until or row.eth_vol > 0.006 or not entry_allowed(candle15.ts, "weekday") or not row.btc_close > row.btc_sma: continue if row.bandwidth <= row.threshold: if candle15.close > row.upper: pending_side = "long" elif candle15.close < row.lower: pending_side = "short" return SegmentResult(len(trades), (equity_curve[-1]["equity"] - INITIAL_EQUITY) / INITIAL_EQUITY, wins / len(trades) if trades else 0.0, max_dd, trades, position.__dict__ if position else None, eth15[warmup:], equity_curve, [], []) def refined_entry(side: str, subcandles: list[Candle], mode: str) -> tuple[int, float] | None: if mode == "confirm_1": if len(subcandles) < 2: return None first = subcandles[0] if side == "long" and first.close > first.open: return subcandles[1].ts, subcandles[1].open if side == "short" and first.close < first.open: return subcandles[1].ts, subcandles[1].open return None pullback_rates = { "pullback_0005": 0.0005, "pullback_001": 0.001, "pullback_002": 0.002, "pullback_003": 0.003, } if mode in pullback_rates: anchor = subcandles[0].open rate = pullback_rates[mode] target = anchor * (1.0 - rate if side == "long" else 1.0 + rate) for offset, candle in enumerate(subcandles[:-1]): if side == "long" and candle.low <= target: return subcandles[offset + 1].ts, subcandles[offset + 1].open if side == "short" and candle.high >= target: return subcandles[offset + 1].ts, subcandles[offset + 1].open return None twap_slices = { "twap_2x3m": 2, "twap_3x3m": 3, "twap_4x3m": 4, "twap_5x3m": 5, } if mode in twap_slices: if not subcandles: return None slices = subcandles[: twap_slices[mode]] return slices[-1].ts, sum(candle.open for candle in slices) / len(slices) raise ValueError("entry mode is invalid") def run_3m_entry_execution(eth15: list[Candle], btc15: list[Candle], eth3: list[Candle], entry_mode: str) -> SegmentResult: indicators = build_15m_signals(eth15, btc15) three_by_window: dict[int, list[Candle]] = {} for candle in eth3: parent_ts = candle.ts - (candle.ts % 900_000) three_by_window.setdefault(parent_ts, []).append(candle) equity = INITIAL_EQUITY peak = equity max_dd = 0.0 wins = 0 trades: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: Position | None = None pending_side: str | None = None pending_middle_exit = False cooldown_until = -1 warmup = 960 for index in range(warmup, len(eth15)): candle15 = eth15[index] subcandles = [c for c in three_by_window.get(candle15.ts, []) if candle15.ts <= c.ts < candle15.ts + 900_000] if not subcandles: subcandles = [candle15] if pending_middle_exit and position is not None: equity, won = close_trade(trades, position, subcandles[0], subcandles[0].open, "signal_middle") wins += int(won) position = None pending_middle_exit = False cooldown_until = index + 24 if pending_side is not None and position is None: entry = refined_entry(pending_side, subcandles, entry_mode) if entry is not None: entry_ts, entry_price = entry position = Position( side=pending_side, entry_ts=entry_ts, entry_price=entry_price, margin_used=equity, stop_price=entry_price * (0.99 if pending_side == "long" else 1.01), take_price=entry_price * (1.03 if pending_side == "long" else 0.97), mfe_pct=0.0, ) pending_side = None current = equity if position is not None: for sub in subcandles: if sub.ts < position.entry_ts: continue out = risk_exit(position, sub, True) if out is not None: price, reason = out equity, won = close_trade(trades, position, sub, price, f"3m_{reason}") wins += int(won) current = equity position = None cooldown_until = index + 24 break position = Position(**{**position.__dict__, "mfe_pct": max(position.mfe_pct, favorable_move(position.side, position.entry_price, sub))}) if position is not None: current = mark_to_market( side=position.side, margin_used=position.margin_used, entry_price=position.entry_price, mark_price=candle15.close, leverage=LEVERAGE, ) peak = max(peak, current) max_dd = max(max_dd, (peak - current) / peak) equity_curve.append({"ts": candle15.ts, "equity": current, "close": candle15.close}) if index == len(eth15) - 1: continue row = indicators.iloc[index] if any(value != value for value in (row.middle, row.upper, row.lower, row.bandwidth, row.threshold, row.btc_sma, row.eth_vol)): continue if position is not None: middle_exit = (position.side == "long" and candle15.close < row.middle * 0.999) or (position.side == "short" and candle15.close > row.middle * 1.001) if middle_exit and not is_us_open_window(candle15.ts): pending_middle_exit = True continue if index < cooldown_until or row.eth_vol > 0.006 or not entry_allowed(candle15.ts, "weekday") or not row.btc_close > row.btc_sma: continue if row.bandwidth <= row.threshold: if candle15.close > row.upper: pending_side = "long" elif candle15.close < row.lower: pending_side = "short" return SegmentResult(len(trades), (equity_curve[-1]["equity"] - INITIAL_EQUITY) / INITIAL_EQUITY, wins / len(trades) if trades else 0.0, max_dd, trades, position.__dict__ if position else None, eth15[warmup:], equity_curve, [], []) def scoped_trades(trades: list[dict[str, object]], start: pd.Timestamp, end_ts: int) -> list[dict[str, object]]: return [trade for trade in trades if start < pd.to_datetime(str(trade["exit_time"]), utc=True) <= pd.to_datetime(end_ts, unit="ms", utc=True)] def write_outputs(results: dict[str, SegmentResult], last_ts: int, output_dir: Path) -> None: rows: list[dict[str, object]] = [] reason_rows: list[dict[str, object]] = [] trade_rows: list[dict[str, object]] = [] for name, result in results.items(): for trade in result.trades: trade_rows.append({"name": name, **trade}) for cost_name, cost in COSTS: frame = cost_equity_frame(result, cost) for label, offset in HORIZONS: if offset is None: scoped = frame[["ts", "equity"]].copy() start = pd.Timestamp(scoped["ts"].iloc[0]) else: end = pd.to_datetime(last_ts, unit="ms", utc=True) start = end - offset before = frame[frame["ts"] <= start] start_equity = float(before["equity"].iloc[-1]) if len(before) else float(frame["equity"].iloc[0]) scoped = pd.concat([pd.DataFrame([{"ts": start, "equity": start_equity}]), frame[frame["ts"] > start][["ts", "equity"]]], ignore_index=True) trades = scoped_trades(result.trades, start, last_ts) rows.append( { "name": name, "cost": cost_name, "horizon": label, "trades": len(trades), "win_rate": sum(1 for trade in trades if float(trade["return_pct"]) > 0.0) / len(trades) if trades else 0.0, **trade_stats(trades), **equity_metrics(scoped, int(start.timestamp() * 1000), last_ts), } ) for reason, group in pd.DataFrame(result.trades).groupby("exit_reason") if result.trades else []: reason_rows.append({"name": name, "exit_reason": reason, "trades": len(group), **trade_stats(group.to_dict("records"))}) output_dir.mkdir(parents=True, exist_ok=True) pd.DataFrame(rows).to_csv(output_dir / "bb-squeeze-15m-signal-3m-execution-horizons.csv", index=False) pd.DataFrame(reason_rows).to_csv(output_dir / "bb-squeeze-15m-signal-3m-execution-exits.csv", index=False) pd.DataFrame(trade_rows).to_csv(output_dir / "bb-squeeze-15m-signal-3m-execution-trades.csv", index=False) primary = pd.DataFrame(rows) report = "# BB squeeze 15m signal plus 3m execution test\n\n" report += markdown_table(primary[(primary["cost"] == PRIMARY_COST) & (primary["horizon"].isin(["full", "1y", "6m", "3m", "30d", "21d"]))]) report += "\n" (output_dir / "bb-squeeze-15m-signal-3m-execution-report.md").write_text(report, encoding="utf-8") def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) lines = [ "| " + " | ".join(columns) + " |", "| " + " | ".join(["---"] * len(columns)) + " |", ] for row in frame.itertuples(index=False): lines.append("| " + " | ".join(str(value) for value in row) + " |") return "\n".join(lines) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=10.0) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth15 = load_candles_csv(DATA_DIR, ETH_SYMBOL, "15m") btc15 = load_candles_csv(DATA_DIR, BTC_SYMBOL, "15m") eth15, btc15 = align_candles_by_ts(eth15, btc15) eth3 = load_candles_csv(DATA_DIR, ETH_SYMBOL, "3m") requested = min(int(args.years * 365 * 24 * 60 / 15), len(eth15)) first_ts = eth15[-requested].ts eth15 = [c for c in eth15 if c.ts >= first_ts] btc15 = [c for c in btc15 if c.ts >= first_ts] eth3 = [c for c in eth3 if first_ts <= c.ts <= eth15[-1].ts + 900_000] results = { "15m_baseline": run_15m_baseline(eth15, btc15), "15m_signal_3m_risk": run_3m_execution(eth15, btc15, eth3), "15m_signal_3m_confirm_1": run_3m_entry_execution(eth15, btc15, eth3, "confirm_1"), "15m_signal_3m_pullback_0005": run_3m_entry_execution(eth15, btc15, eth3, "pullback_0005"), "15m_signal_3m_pullback_001": run_3m_entry_execution(eth15, btc15, eth3, "pullback_001"), "15m_signal_3m_pullback_002": run_3m_entry_execution(eth15, btc15, eth3, "pullback_002"), "15m_signal_3m_pullback_003": run_3m_entry_execution(eth15, btc15, eth3, "pullback_003"), "15m_signal_3m_twap_2x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_2x3m"), "15m_signal_3m_twap_3x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_3x3m"), "15m_signal_3m_twap_4x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_4x3m"), "15m_signal_3m_twap_5x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_5x3m"), } write_outputs(results, eth15[-1].ts, args.output_dir) print((args.output_dir / "bb-squeeze-15m-signal-3m-execution-report.md").as_posix()) return 0 if __name__ == "__main__": raise SystemExit(main())