| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- from __future__ import annotations
- import argparse
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- from okx_codex_trader.candles import align_candles_by_ts, load_candles_csv
- from okx_codex_trader.models import Candle
- from okx_codex_trader.research_metrics import (
- DEFAULT_COSTS,
- DEFAULT_INITIAL_EQUITY,
- DEFAULT_PRIMARY_COST,
- cost_equity_frame,
- equity_metrics,
- format_utc_ts,
- trade_stats,
- )
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- from okx_codex_trader.time_rules import entry_allowed, is_us_open_window
- ETH_SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/eth-exploration")
- INITIAL_EQUITY = DEFAULT_INITIAL_EQUITY
- PRIMARY_COST = DEFAULT_PRIMARY_COST
- COSTS = DEFAULT_COSTS
- LEVERAGE = 3
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- ("30d", pd.DateOffset(days=30)),
- ("21d", pd.DateOffset(days=21)),
- )
- @dataclass(frozen=True)
- class Position:
- side: str
- entry_ts: int
- entry_price: float
- margin_used: float
- stop_price: float
- take_price: float
- mfe_pct: float
- def fmt(ts: int) -> str:
- return format_utc_ts(ts)
- def close_trade(trades: list[dict[str, object]], position: Position, candle: Candle, exit_price: float, reason: str) -> tuple[float, bool]:
- exit_equity = trade_equity(
- side=position.side,
- margin_used=position.margin_used,
- entry_price=position.entry_price,
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - position.margin_used
- trades.append(
- {
- "side": "Long" if position.side == "long" else "Short",
- "entry_time": fmt(position.entry_ts),
- "exit_time": fmt(candle.ts),
- "exit_ts": candle.ts,
- "entry_price": round(position.entry_price, 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / position.margin_used * 100.0, 4),
- "cost_weight": 1.0,
- "exit_reason": reason,
- "mfe_pct": round(position.mfe_pct * 100.0, 4),
- }
- )
- return exit_equity, pnl > 0.0
- def favorable_move(side: str, entry_price: float, candle: Candle) -> float:
- if side == "long":
- return candle.high / entry_price - 1.0
- return entry_price / candle.low - 1.0
- def risk_exit(position: Position, candle: Candle, use_breakeven: bool) -> tuple[float, str] | None:
- stop = position.stop_price
- reason = "stop"
- if use_breakeven and position.mfe_pct >= 0.008:
- breakeven = position.entry_price * (1.001 if position.side == "long" else 0.999)
- stop = max(stop, breakeven) if position.side == "long" else min(stop, breakeven)
- reason = "breakeven"
- if position.side == "long":
- if candle.open <= stop:
- return candle.open, f"{reason}_gap" if reason != "stop" else "stop_gap"
- if candle.open >= position.take_price:
- return candle.open, "take_gap"
- if candle.low <= stop:
- return stop, reason
- if candle.high >= position.take_price:
- return position.take_price, "take_profit"
- else:
- if candle.open >= stop:
- return candle.open, f"{reason}_gap" if reason != "stop" else "stop_gap"
- if candle.open <= position.take_price:
- return candle.open, "take_gap"
- if candle.high >= stop:
- return stop, reason
- if candle.low <= position.take_price:
- return position.take_price, "take_profit"
- return None
- def build_15m_signals(eth: list[Candle], btc: list[Candle]) -> pd.DataFrame:
- eth_close = pd.Series([c.close for c in eth], dtype=float)
- btc_close = pd.Series([c.close for c in btc], dtype=float)
- middle = eth_close.rolling(96).mean()
- stdev = eth_close.rolling(96).std(ddof=0)
- upper = middle + 2.0 * stdev
- lower = middle - 2.0 * stdev
- bandwidth = (upper - lower) / middle
- threshold = bandwidth.rolling(960).quantile(0.25)
- btc_sma = btc_close.rolling(480).mean()
- eth_vol = eth_close.pct_change().rolling(96).std(ddof=0)
- return pd.DataFrame(
- {
- "ts": [c.ts for c in eth],
- "middle": middle,
- "upper": upper,
- "lower": lower,
- "bandwidth": bandwidth,
- "threshold": threshold,
- "btc_close": btc_close,
- "btc_sma": btc_sma,
- "eth_vol": eth_vol,
- }
- )
- def run_15m_baseline(eth: list[Candle], btc: list[Candle]) -> SegmentResult:
- indicators = build_15m_signals(eth, btc)
- equity = INITIAL_EQUITY
- peak = equity
- max_dd = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: Position | None = None
- pending_side: str | None = None
- pending_middle_exit = False
- cooldown_until = -1
- warmup = 960
- for index in range(warmup, len(eth)):
- candle = eth[index]
- if pending_middle_exit and position is not None:
- equity, won = close_trade(trades, position, candle, candle.open, "signal_middle")
- wins += int(won)
- position = None
- pending_middle_exit = False
- cooldown_until = index + 24
- if pending_side is not None and position is None:
- entry = candle.open
- position = Position(
- side=pending_side,
- entry_ts=candle.ts,
- entry_price=entry,
- margin_used=equity,
- stop_price=entry * (0.99 if pending_side == "long" else 1.01),
- take_price=entry * (1.03 if pending_side == "long" else 0.97),
- mfe_pct=0.0,
- )
- pending_side = None
- current = equity
- if position is not None:
- out = risk_exit(position, candle, True)
- if out is not None:
- price, reason = out
- equity, won = close_trade(trades, position, candle, price, reason)
- wins += int(won)
- current = equity
- position = None
- cooldown_until = index + 24
- if position is not None:
- position = Position(**{**position.__dict__, "mfe_pct": max(position.mfe_pct, favorable_move(position.side, position.entry_price, candle))})
- current = mark_to_market(
- side=position.side,
- margin_used=position.margin_used,
- entry_price=position.entry_price,
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak = max(peak, current)
- max_dd = max(max_dd, (peak - current) / peak)
- equity_curve.append({"ts": candle.ts, "equity": current, "close": candle.close})
- if index == len(eth) - 1:
- continue
- row = indicators.iloc[index]
- if any(value != value for value in (row.middle, row.upper, row.lower, row.bandwidth, row.threshold, row.btc_sma, row.eth_vol)):
- continue
- if position is not None:
- middle_exit = (position.side == "long" and candle.close < row.middle * 0.999) or (position.side == "short" and candle.close > row.middle * 1.001)
- if middle_exit and not is_us_open_window(candle.ts):
- pending_middle_exit = True
- continue
- if index < cooldown_until or row.eth_vol > 0.006 or not entry_allowed(candle.ts, "weekday") or not row.btc_close > row.btc_sma:
- continue
- if row.bandwidth <= row.threshold:
- if candle.close > row.upper:
- pending_side = "long"
- elif candle.close < row.lower:
- pending_side = "short"
- return SegmentResult(len(trades), (equity_curve[-1]["equity"] - INITIAL_EQUITY) / INITIAL_EQUITY, wins / len(trades) if trades else 0.0, max_dd, trades, position.__dict__ if position else None, eth[warmup:], equity_curve, [], [])
- def run_3m_execution(eth15: list[Candle], btc15: list[Candle], eth3: list[Candle]) -> SegmentResult:
- indicators = build_15m_signals(eth15, btc15)
- three_by_window: dict[int, list[Candle]] = {}
- for candle in eth3:
- parent_ts = candle.ts - (candle.ts % 900_000)
- three_by_window.setdefault(parent_ts, []).append(candle)
- equity = INITIAL_EQUITY
- peak = equity
- max_dd = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: Position | None = None
- pending_side: str | None = None
- pending_middle_exit = False
- cooldown_until = -1
- warmup = 960
- for index in range(warmup, len(eth15)):
- candle15 = eth15[index]
- subcandles = [c for c in three_by_window.get(candle15.ts, []) if candle15.ts <= c.ts < candle15.ts + 900_000]
- if not subcandles:
- subcandles = [candle15]
- if pending_middle_exit and position is not None:
- equity, won = close_trade(trades, position, subcandles[0], subcandles[0].open, "signal_middle")
- wins += int(won)
- position = None
- pending_middle_exit = False
- cooldown_until = index + 24
- if pending_side is not None and position is None:
- entry = subcandles[0].open
- position = Position(
- side=pending_side,
- entry_ts=subcandles[0].ts,
- entry_price=entry,
- margin_used=equity,
- stop_price=entry * (0.99 if pending_side == "long" else 1.01),
- take_price=entry * (1.03 if pending_side == "long" else 0.97),
- mfe_pct=0.0,
- )
- pending_side = None
- current = equity
- if position is not None:
- for sub in subcandles:
- out = risk_exit(position, sub, True)
- if out is not None:
- price, reason = out
- equity, won = close_trade(trades, position, sub, price, f"3m_{reason}")
- wins += int(won)
- current = equity
- position = None
- cooldown_until = index + 24
- break
- position = Position(**{**position.__dict__, "mfe_pct": max(position.mfe_pct, favorable_move(position.side, position.entry_price, sub))})
- if position is not None:
- current = mark_to_market(
- side=position.side,
- margin_used=position.margin_used,
- entry_price=position.entry_price,
- mark_price=candle15.close,
- leverage=LEVERAGE,
- )
- peak = max(peak, current)
- max_dd = max(max_dd, (peak - current) / peak)
- equity_curve.append({"ts": candle15.ts, "equity": current, "close": candle15.close})
- if index == len(eth15) - 1:
- continue
- row = indicators.iloc[index]
- if any(value != value for value in (row.middle, row.upper, row.lower, row.bandwidth, row.threshold, row.btc_sma, row.eth_vol)):
- continue
- if position is not None:
- middle_exit = (position.side == "long" and candle15.close < row.middle * 0.999) or (position.side == "short" and candle15.close > row.middle * 1.001)
- if middle_exit and not is_us_open_window(candle15.ts):
- pending_middle_exit = True
- continue
- if index < cooldown_until or row.eth_vol > 0.006 or not entry_allowed(candle15.ts, "weekday") or not row.btc_close > row.btc_sma:
- continue
- if row.bandwidth <= row.threshold:
- if candle15.close > row.upper:
- pending_side = "long"
- elif candle15.close < row.lower:
- pending_side = "short"
- return SegmentResult(len(trades), (equity_curve[-1]["equity"] - INITIAL_EQUITY) / INITIAL_EQUITY, wins / len(trades) if trades else 0.0, max_dd, trades, position.__dict__ if position else None, eth15[warmup:], equity_curve, [], [])
- def refined_entry(side: str, subcandles: list[Candle], mode: str) -> tuple[int, float] | None:
- if mode == "confirm_1":
- if len(subcandles) < 2:
- return None
- first = subcandles[0]
- if side == "long" and first.close > first.open:
- return subcandles[1].ts, subcandles[1].open
- if side == "short" and first.close < first.open:
- return subcandles[1].ts, subcandles[1].open
- return None
- pullback_rates = {
- "pullback_0005": 0.0005,
- "pullback_001": 0.001,
- "pullback_002": 0.002,
- "pullback_003": 0.003,
- }
- if mode in pullback_rates:
- anchor = subcandles[0].open
- rate = pullback_rates[mode]
- target = anchor * (1.0 - rate if side == "long" else 1.0 + rate)
- for offset, candle in enumerate(subcandles[:-1]):
- if side == "long" and candle.low <= target:
- return subcandles[offset + 1].ts, subcandles[offset + 1].open
- if side == "short" and candle.high >= target:
- return subcandles[offset + 1].ts, subcandles[offset + 1].open
- return None
- twap_slices = {
- "twap_2x3m": 2,
- "twap_3x3m": 3,
- "twap_4x3m": 4,
- "twap_5x3m": 5,
- }
- if mode in twap_slices:
- if not subcandles:
- return None
- slices = subcandles[: twap_slices[mode]]
- return slices[-1].ts, sum(candle.open for candle in slices) / len(slices)
- raise ValueError("entry mode is invalid")
- def run_3m_entry_execution(eth15: list[Candle], btc15: list[Candle], eth3: list[Candle], entry_mode: str) -> SegmentResult:
- indicators = build_15m_signals(eth15, btc15)
- three_by_window: dict[int, list[Candle]] = {}
- for candle in eth3:
- parent_ts = candle.ts - (candle.ts % 900_000)
- three_by_window.setdefault(parent_ts, []).append(candle)
- equity = INITIAL_EQUITY
- peak = equity
- max_dd = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: Position | None = None
- pending_side: str | None = None
- pending_middle_exit = False
- cooldown_until = -1
- warmup = 960
- for index in range(warmup, len(eth15)):
- candle15 = eth15[index]
- subcandles = [c for c in three_by_window.get(candle15.ts, []) if candle15.ts <= c.ts < candle15.ts + 900_000]
- if not subcandles:
- subcandles = [candle15]
- if pending_middle_exit and position is not None:
- equity, won = close_trade(trades, position, subcandles[0], subcandles[0].open, "signal_middle")
- wins += int(won)
- position = None
- pending_middle_exit = False
- cooldown_until = index + 24
- if pending_side is not None and position is None:
- entry = refined_entry(pending_side, subcandles, entry_mode)
- if entry is not None:
- entry_ts, entry_price = entry
- position = Position(
- side=pending_side,
- entry_ts=entry_ts,
- entry_price=entry_price,
- margin_used=equity,
- stop_price=entry_price * (0.99 if pending_side == "long" else 1.01),
- take_price=entry_price * (1.03 if pending_side == "long" else 0.97),
- mfe_pct=0.0,
- )
- pending_side = None
- current = equity
- if position is not None:
- for sub in subcandles:
- if sub.ts < position.entry_ts:
- continue
- out = risk_exit(position, sub, True)
- if out is not None:
- price, reason = out
- equity, won = close_trade(trades, position, sub, price, f"3m_{reason}")
- wins += int(won)
- current = equity
- position = None
- cooldown_until = index + 24
- break
- position = Position(**{**position.__dict__, "mfe_pct": max(position.mfe_pct, favorable_move(position.side, position.entry_price, sub))})
- if position is not None:
- current = mark_to_market(
- side=position.side,
- margin_used=position.margin_used,
- entry_price=position.entry_price,
- mark_price=candle15.close,
- leverage=LEVERAGE,
- )
- peak = max(peak, current)
- max_dd = max(max_dd, (peak - current) / peak)
- equity_curve.append({"ts": candle15.ts, "equity": current, "close": candle15.close})
- if index == len(eth15) - 1:
- continue
- row = indicators.iloc[index]
- if any(value != value for value in (row.middle, row.upper, row.lower, row.bandwidth, row.threshold, row.btc_sma, row.eth_vol)):
- continue
- if position is not None:
- middle_exit = (position.side == "long" and candle15.close < row.middle * 0.999) or (position.side == "short" and candle15.close > row.middle * 1.001)
- if middle_exit and not is_us_open_window(candle15.ts):
- pending_middle_exit = True
- continue
- if index < cooldown_until or row.eth_vol > 0.006 or not entry_allowed(candle15.ts, "weekday") or not row.btc_close > row.btc_sma:
- continue
- if row.bandwidth <= row.threshold:
- if candle15.close > row.upper:
- pending_side = "long"
- elif candle15.close < row.lower:
- pending_side = "short"
- return SegmentResult(len(trades), (equity_curve[-1]["equity"] - INITIAL_EQUITY) / INITIAL_EQUITY, wins / len(trades) if trades else 0.0, max_dd, trades, position.__dict__ if position else None, eth15[warmup:], equity_curve, [], [])
- def scoped_trades(trades: list[dict[str, object]], start: pd.Timestamp, end_ts: int) -> list[dict[str, object]]:
- return [trade for trade in trades if start < pd.to_datetime(str(trade["exit_time"]), utc=True) <= pd.to_datetime(end_ts, unit="ms", utc=True)]
- def write_outputs(results: dict[str, SegmentResult], last_ts: int, output_dir: Path) -> None:
- rows: list[dict[str, object]] = []
- reason_rows: list[dict[str, object]] = []
- trade_rows: list[dict[str, object]] = []
- for name, result in results.items():
- for trade in result.trades:
- trade_rows.append({"name": name, **trade})
- for cost_name, cost in COSTS:
- frame = cost_equity_frame(result, cost)
- for label, offset in HORIZONS:
- if offset is None:
- scoped = frame[["ts", "equity"]].copy()
- start = pd.Timestamp(scoped["ts"].iloc[0])
- else:
- end = pd.to_datetime(last_ts, unit="ms", utc=True)
- start = end - offset
- before = frame[frame["ts"] <= start]
- start_equity = float(before["equity"].iloc[-1]) if len(before) else float(frame["equity"].iloc[0])
- scoped = pd.concat([pd.DataFrame([{"ts": start, "equity": start_equity}]), frame[frame["ts"] > start][["ts", "equity"]]], ignore_index=True)
- trades = scoped_trades(result.trades, start, last_ts)
- rows.append(
- {
- "name": name,
- "cost": cost_name,
- "horizon": label,
- "trades": len(trades),
- "win_rate": sum(1 for trade in trades if float(trade["return_pct"]) > 0.0) / len(trades) if trades else 0.0,
- **trade_stats(trades),
- **equity_metrics(scoped, int(start.timestamp() * 1000), last_ts),
- }
- )
- for reason, group in pd.DataFrame(result.trades).groupby("exit_reason") if result.trades else []:
- reason_rows.append({"name": name, "exit_reason": reason, "trades": len(group), **trade_stats(group.to_dict("records"))})
- output_dir.mkdir(parents=True, exist_ok=True)
- pd.DataFrame(rows).to_csv(output_dir / "bb-squeeze-15m-signal-3m-execution-horizons.csv", index=False)
- pd.DataFrame(reason_rows).to_csv(output_dir / "bb-squeeze-15m-signal-3m-execution-exits.csv", index=False)
- pd.DataFrame(trade_rows).to_csv(output_dir / "bb-squeeze-15m-signal-3m-execution-trades.csv", index=False)
- primary = pd.DataFrame(rows)
- report = "# BB squeeze 15m signal plus 3m execution test\n\n"
- report += markdown_table(primary[(primary["cost"] == PRIMARY_COST) & (primary["horizon"].isin(["full", "1y", "6m", "3m", "30d", "21d"]))])
- report += "\n"
- (output_dir / "bb-squeeze-15m-signal-3m-execution-report.md").write_text(report, encoding="utf-8")
- def markdown_table(frame: pd.DataFrame) -> str:
- columns = list(frame.columns)
- lines = [
- "| " + " | ".join(columns) + " |",
- "| " + " | ".join(["---"] * len(columns)) + " |",
- ]
- for row in frame.itertuples(index=False):
- lines.append("| " + " | ".join(str(value) for value in row) + " |")
- return "\n".join(lines)
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=10.0)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- eth15 = load_candles_csv(DATA_DIR, ETH_SYMBOL, "15m")
- btc15 = load_candles_csv(DATA_DIR, BTC_SYMBOL, "15m")
- eth15, btc15 = align_candles_by_ts(eth15, btc15)
- eth3 = load_candles_csv(DATA_DIR, ETH_SYMBOL, "3m")
- requested = min(int(args.years * 365 * 24 * 60 / 15), len(eth15))
- first_ts = eth15[-requested].ts
- eth15 = [c for c in eth15 if c.ts >= first_ts]
- btc15 = [c for c in btc15 if c.ts >= first_ts]
- eth3 = [c for c in eth3 if first_ts <= c.ts <= eth15[-1].ts + 900_000]
- results = {
- "15m_baseline": run_15m_baseline(eth15, btc15),
- "15m_signal_3m_risk": run_3m_execution(eth15, btc15, eth3),
- "15m_signal_3m_confirm_1": run_3m_entry_execution(eth15, btc15, eth3, "confirm_1"),
- "15m_signal_3m_pullback_0005": run_3m_entry_execution(eth15, btc15, eth3, "pullback_0005"),
- "15m_signal_3m_pullback_001": run_3m_entry_execution(eth15, btc15, eth3, "pullback_001"),
- "15m_signal_3m_pullback_002": run_3m_entry_execution(eth15, btc15, eth3, "pullback_002"),
- "15m_signal_3m_pullback_003": run_3m_entry_execution(eth15, btc15, eth3, "pullback_003"),
- "15m_signal_3m_twap_2x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_2x3m"),
- "15m_signal_3m_twap_3x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_3x3m"),
- "15m_signal_3m_twap_4x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_4x3m"),
- "15m_signal_3m_twap_5x3m": run_3m_entry_execution(eth15, btc15, eth3, "twap_5x3m"),
- }
- write_outputs(results, eth15[-1].ts, args.output_dir)
- print((args.output_dir / "bb-squeeze-15m-signal-3m-execution-report.md").as_posix())
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|