| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- from __future__ import annotations
- import argparse
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- SYMBOL = "ETH-USDT-SWAP"
- BAR = "15m"
- YEARS = 10.0
- LEVERAGE = 3
- INITIAL_EQUITY = 10_000.0
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/eth-exploration")
- PRIMARY_COST = "maker_taker"
- COSTS = (
- ("maker_maker", 0.0012),
- ("maker_taker", 0.0021),
- ("taker_taker", 0.0030),
- )
- HORIZONS = (
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Variant:
- middle_exit_buffer_pct: float
- middle_exit_confirm_bars: int
- @property
- def name(self) -> str:
- return (
- "live-bb-squeeze-l48-bw960-q0.25-sl0.01-tpnone-both-none-vc0.006-ddnone-cd24"
- f"-mxbuf{self.middle_exit_buffer_pct:g}-mxc{self.middle_exit_confirm_bars}"
- )
- def _format_ts(ts: int) -> str:
- return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M")
- def _load_candles(symbol: str, bar: str) -> list[Candle]:
- frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv")
- return [
- Candle(
- symbol=symbol,
- ts=int(row.ts),
- open=float(row.open),
- high=float(row.high),
- low=float(row.low),
- close=float(row.close),
- volume=float(row.volume),
- )
- for row in frame.itertuples(index=False)
- ]
- def _close_position(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- candle: Candle,
- exit_price: float,
- ) -> tuple[float, bool]:
- margin_used = float(position["margin_used"])
- exit_equity = trade_equity(
- side=str(position["side"]),
- margin_used=margin_used,
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - margin_used
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": _format_ts(int(position["entry_time"])),
- "exit_time": _format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / margin_used * 100.0, 4),
- "cost_weight": 1.0,
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- return exit_equity, pnl > 0.0
- def run_variant(candles: list[Candle], variant: Variant) -> SegmentResult:
- close = pd.Series([candle.close for candle in candles], dtype=float)
- middle_series = close.rolling(48).mean()
- stdev = close.rolling(48).std(ddof=0)
- upper = (middle_series + 2.0 * stdev).tolist()
- lower = (middle_series - 2.0 * stdev).tolist()
- middle = middle_series.tolist()
- bandwidth = ((pd.Series(upper) - pd.Series(lower)) / middle_series).tolist()
- threshold = pd.Series(bandwidth, dtype=float).rolling(960).quantile(0.25).tolist()
- eth_vol = close.pct_change().rolling(96).std(ddof=0).tolist()
- warmup_bars = 960
- equity = INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_entry_side: str | None = None
- pending_exit = False
- middle_exit_streak = 0
- cooldown_until = -1
- for index in range(warmup_bars, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- equity, won = _close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open)
- wins += int(won)
- position = None
- pending_exit = False
- middle_exit_streak = 0
- cooldown_until = index + 24
- if pending_entry_side is not None and position is None and equity > 0.0:
- position = {
- "side": pending_entry_side,
- "entry_time": candle.ts,
- "entry_price": candle.open,
- "margin_used": equity,
- "stop_price": candle.open * (0.99 if pending_entry_side == "long" else 1.01),
- }
- entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side})
- pending_entry_side = None
- current_equity = equity
- if position is not None:
- side = str(position["side"])
- stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or (
- side == "short" and candle.high >= float(position["stop_price"])
- )
- if stop_hit:
- equity, won = _close_position(
- trades=trades,
- exits=exits,
- position=position,
- candle=candle,
- exit_price=float(position["stop_price"]),
- )
- wins += int(won)
- current_equity = equity
- position = None
- middle_exit_streak = 0
- cooldown_until = index + 24
- if position is not None:
- current_equity = mark_to_market(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1 or equity <= 0.0:
- continue
- values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index], eth_vol[index])
- if any(value != value for value in values):
- continue
- if position is not None:
- middle_exit = (
- position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct)
- ) or (
- position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct)
- )
- middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0
- if middle_exit_streak >= variant.middle_exit_confirm_bars:
- pending_exit = True
- continue
- if index < cooldown_until:
- continue
- if float(eth_vol[index]) > 0.006:
- continue
- if bandwidth[index] <= threshold[index]:
- if candle.close > float(upper[index]):
- pending_entry_side = "long"
- elif candle.close < float(lower[index]):
- pending_entry_side = "short"
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
- win_rate=wins / trade_count if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=candles[warmup_bars:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def cost_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame:
- rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}]
- equity = INITIAL_EQUITY
- for trade in result.trades:
- equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0))
- rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity})
- return pd.DataFrame(rows)
- def max_drawdown(values: list[float]) -> float:
- peak = values[0]
- dd = 0.0
- for value in values:
- peak = max(peak, value)
- dd = max(dd, (peak - value) / peak if peak else 0.0)
- return dd
- def equity_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]:
- years = (last_ts - first_ts) / 86_400_000 / 365
- total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0)
- annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0
- dd = max_drawdown([float(value) for value in frame["equity"]])
- return {
- "net_total_return": total_return,
- "net_annualized_return": annualized,
- "net_max_drawdown": dd,
- "net_calmar": annualized / dd if dd else 0.0,
- }
- def horizon_rows(frame: pd.DataFrame, last_ts: int) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- end_time = pd.to_datetime(last_ts, unit="ms", utc=True)
- for label, offset in HORIZONS:
- cutoff = end_time - offset
- before = frame[frame["ts"] <= cutoff]
- if len(before):
- start_equity = float(before["equity"].iloc[-1])
- start_time = cutoff
- after = frame[frame["ts"] > cutoff]
- horizon_frame = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True)
- else:
- horizon_frame = frame[["ts", "equity"]].copy()
- start_time = pd.Timestamp(horizon_frame["ts"].iloc[0])
- rows.append(
- {
- "horizon": label,
- "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"),
- "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"),
- **equity_metrics(horizon_frame, int(start_time.timestamp() * 1000), last_ts),
- }
- )
- return rows
- def worst_month(frame: pd.DataFrame) -> tuple[str, float]:
- monthly = frame.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna()
- if not len(monthly):
- return "", 0.0
- idx = monthly.idxmin()
- return idx.strftime("%Y-%m"), float(monthly.loc[idx])
- def build_variants() -> list[Variant]:
- return [
- Variant(buffer, confirm)
- for buffer in (0.0, 0.0005, 0.001, 0.0015, 0.002, 0.003)
- for confirm in (1, 2, 3)
- ]
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bar", default=BAR)
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- candles = _load_candles(SYMBOL, args.bar)
- requested_bars = int(args.years * 365 * 24 * 60 / 15)
- candles = candles[-requested_bars:]
- summary_rows: list[dict[str, object]] = []
- horizon_rows_out: list[dict[str, object]] = []
- for variant in build_variants():
- result = run_variant(candles, variant)
- for cost_name, cost in COSTS:
- frame = cost_equity_frame(result, cost)
- metrics = equity_metrics(frame, candles[0].ts, candles[-1].ts)
- month, month_return = worst_month(frame)
- row = {
- "family": "live_bb_squeeze_exit_variant",
- "cost": cost_name,
- "symbol": SYMBOL,
- "bar": args.bar,
- "name": variant.name,
- "middle_exit_buffer_pct": variant.middle_exit_buffer_pct,
- "middle_exit_confirm_bars": variant.middle_exit_confirm_bars,
- "first_candle": _format_ts(candles[0].ts),
- "last_candle": _format_ts(candles[-1].ts),
- "years": (candles[-1].ts - candles[0].ts) / 86_400_000 / 365,
- "trades": result.trade_count,
- "gross_total_return": result.total_return,
- "gross_max_drawdown_mark_to_market": result.max_drawdown,
- "worst_month": month,
- "worst_month_return": month_return,
- **metrics,
- }
- summary_rows.append(row)
- for horizon_row in horizon_rows(frame, candles[-1].ts):
- horizon_rows_out.append(
- {
- "family": "live_bb_squeeze_exit_variant",
- "cost": cost_name,
- "symbol": SYMBOL,
- "bar": args.bar,
- "name": variant.name,
- "trades": result.trade_count,
- **horizon_row,
- }
- )
- summary = pd.DataFrame(summary_rows).sort_values(
- ["cost", "net_calmar", "net_annualized_return", "worst_month_return"],
- ascending=[True, False, False, False],
- )
- horizon = pd.DataFrame(horizon_rows_out)
- horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True)
- horizon = horizon.sort_values(["cost", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- summary_path = args.output_dir / "live-bb-squeeze-exit-variants-summary.csv"
- horizon_path = args.output_dir / "live-bb-squeeze-exit-variants-horizon.csv"
- summary.to_csv(summary_path, index=False)
- horizon.to_csv(horizon_path, index=False)
- primary = summary[summary["cost"] == PRIMARY_COST]
- print(primary.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|