run_bb_squeeze_executor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import os
  5. import sys
  6. from dataclasses import asdict, dataclass
  7. from datetime import UTC, datetime
  8. from pathlib import Path
  9. import pandas as pd
  10. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  11. from okx_codex_trader.config import load_config
  12. from okx_codex_trader.live_execution import (
  13. TargetPosition,
  14. current_position_from_okx,
  15. plan_position_delta,
  16. render_market_order_bodies,
  17. )
  18. from okx_codex_trader.okx_client import OkxClient
  19. ROOT = Path(__file__).resolve().parents[1]
  20. STATE_DIR = ROOT / "var" / "bb-squeeze-executor"
  21. STATE_FILE = "runtime-state.json"
  22. EVENTS_FILE = "events.jsonl"
  23. SYMBOL = "ETH-USDT-SWAP"
  24. BAR = "15m"
  25. LEVERAGE = 3
  26. CANDLES_PATH = ROOT / "data" / "okx-candles" / SYMBOL / f"{BAR}.csv"
  27. BAND_LENGTH = 48
  28. BANDWIDTH_LOOKBACK = 960
  29. BANDWIDTH_QUANTILE = 0.25
  30. STOP_LOSS_PCT = 0.01
  31. ETH_VOL_CAP = 0.006
  32. COOLDOWN_BARS = 24
  33. @dataclass(frozen=True)
  34. class StrategyState:
  35. last_candle_ts: int | None
  36. active_side: str | None
  37. entry_price: float | None
  38. entry_candle_ts: int | None
  39. cooldown_until_ts: int | None
  40. EMPTY_STATE = StrategyState(None, None, None, None, None)
  41. def now_iso() -> str:
  42. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  43. def load_state(path: Path) -> StrategyState:
  44. if not path.exists():
  45. return EMPTY_STATE
  46. payload = json.loads(path.read_text(encoding="utf-8"))
  47. return StrategyState(
  48. last_candle_ts=payload["last_candle_ts"],
  49. active_side=payload["active_side"],
  50. entry_price=payload["entry_price"],
  51. entry_candle_ts=payload["entry_candle_ts"],
  52. cooldown_until_ts=payload["cooldown_until_ts"],
  53. )
  54. def save_state(path: Path, state: StrategyState) -> None:
  55. path.parent.mkdir(parents=True, exist_ok=True)
  56. path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
  57. def append_event(state_dir: Path, payload: dict[str, object]) -> None:
  58. state_dir.mkdir(parents=True, exist_ok=True)
  59. with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle:
  60. handle.write(json.dumps(payload, sort_keys=True) + "\n")
  61. def load_frame() -> pd.DataFrame:
  62. frame = pd.read_csv(CANDLES_PATH)
  63. frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
  64. return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
  65. def signal_from_frame(frame: pd.DataFrame, state: StrategyState) -> tuple[StrategyState, dict[str, object]]:
  66. if len(frame) < BANDWIDTH_LOOKBACK + 3:
  67. raise ValueError("not enough candles")
  68. close = frame["close"].astype(float)
  69. middle = close.rolling(BAND_LENGTH).mean()
  70. stdev = close.rolling(BAND_LENGTH).std(ddof=0)
  71. upper = middle + (2.0 * stdev)
  72. lower = middle - (2.0 * stdev)
  73. bandwidth = (upper - lower) / middle
  74. threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE)
  75. eth_vol = close.pct_change().rolling(96).std(ddof=0)
  76. decision_index = len(frame) - 2
  77. row = frame.iloc[decision_index]
  78. candle_ts = int(row["ts"])
  79. candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z")
  80. indicators = {
  81. "eth_close": float(row["close"]),
  82. "middle": float(middle.iloc[decision_index]),
  83. "upper": float(upper.iloc[decision_index]),
  84. "lower": float(lower.iloc[decision_index]),
  85. "bandwidth": float(bandwidth.iloc[decision_index]),
  86. "bandwidth_threshold": float(threshold.iloc[decision_index]),
  87. "eth_vol_96": float(eth_vol.iloc[decision_index]),
  88. }
  89. if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
  90. return state, {
  91. "decision_candle_ts": candle_ts,
  92. "decision_candle_time": candle_time,
  93. "signal": "state_replay",
  94. "target_side": state.active_side or "flat",
  95. "indicators": indicators,
  96. }
  97. next_state = StrategyState(candle_ts, state.active_side, state.entry_price, state.entry_candle_ts, state.cooldown_until_ts)
  98. signal = "hold"
  99. target_side = state.active_side or "flat"
  100. if state.active_side is not None:
  101. entry_price = float(state.entry_price)
  102. stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT)
  103. stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop)
  104. middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or (
  105. state.active_side == "short" and float(row["close"]) > indicators["middle"]
  106. )
  107. if stop_hit or middle_exit:
  108. signal = "exit_stop" if stop_hit else "exit_middle"
  109. target_side = "flat"
  110. next_state = StrategyState(
  111. candle_ts,
  112. None,
  113. None,
  114. None,
  115. candle_ts + (COOLDOWN_BARS * 900_000),
  116. )
  117. else:
  118. cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts
  119. compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"]
  120. vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP
  121. if cooldown_ok and compressed and vol_ok and float(row["close"]) > indicators["upper"]:
  122. signal = "entry_long"
  123. target_side = "long"
  124. next_state = StrategyState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts)
  125. elif cooldown_ok and compressed and vol_ok and float(row["close"]) < indicators["lower"]:
  126. signal = "entry_short"
  127. target_side = "short"
  128. next_state = StrategyState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts)
  129. return next_state, {
  130. "decision_candle_ts": candle_ts,
  131. "decision_candle_time": candle_time,
  132. "signal": signal,
  133. "target_side": target_side,
  134. "indicators": indicators,
  135. "params": {
  136. "band_length": BAND_LENGTH,
  137. "bandwidth_lookback": BANDWIDTH_LOOKBACK,
  138. "bandwidth_quantile": BANDWIDTH_QUANTILE,
  139. "stop_loss_pct": STOP_LOSS_PCT,
  140. "eth_vol_cap": ETH_VOL_CAP,
  141. "cooldown_bars": COOLDOWN_BARS,
  142. "side_mode": "both",
  143. },
  144. }
  145. def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]:
  146. positions = client.get_positions(SYMBOL)
  147. metadata = client.get_instrument_meta(SYMBOL)
  148. mark_price = client.get_last_price(SYMBOL)
  149. current = current_position_from_okx(
  150. positions=positions,
  151. mark_price=mark_price,
  152. metadata=metadata,
  153. leverage=LEVERAGE,
  154. margin_per_unit_usdt=margin_per_unit_usdt,
  155. )
  156. return current, {
  157. "positions": [asdict(position) for position in positions],
  158. "instrument_meta": asdict(metadata),
  159. "mark_price": mark_price,
  160. }
  161. def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition:
  162. side = str(signal["target_side"])
  163. if side == "flat":
  164. return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"]))
  165. if current.known and current.side == side and current.unit > 0.0:
  166. return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position")
  167. return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"]))
  168. def risk_arg(value: float | None, env_name: str) -> float:
  169. if value is not None:
  170. return value
  171. raw = os.environ.get(env_name)
  172. if raw is None or raw == "":
  173. raise ValueError(f"{env_name} is required")
  174. return float(raw)
  175. def run_once(
  176. *,
  177. state_dir: Path,
  178. margin_per_unit_usdt: float,
  179. max_new_margin_usdt: float,
  180. max_total_margin_usdt: float,
  181. submit_live: bool,
  182. ) -> dict[str, object]:
  183. state_dir.mkdir(parents=True, exist_ok=True)
  184. state_path = state_dir / STATE_FILE
  185. previous_state = load_state(state_path)
  186. next_state, signal = signal_from_frame(load_frame(), previous_state)
  187. client = OkxClient(load_config())
  188. current, account = account_current_position(client, margin_per_unit_usdt)
  189. target = target_position(signal, current)
  190. plan = plan_position_delta(current, target)
  191. orders = ()
  192. if current.known and target.known:
  193. from okx_codex_trader.models import InstrumentMeta
  194. meta = account["instrument_meta"]
  195. orders = render_market_order_bodies(
  196. plan=plan,
  197. symbol=SYMBOL,
  198. mark_price=float(account["mark_price"]),
  199. metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])),
  200. leverage=LEVERAGE,
  201. margin_per_unit_usdt=margin_per_unit_usdt,
  202. max_new_margin_usdt=max_new_margin_usdt,
  203. max_total_margin_usdt=max_total_margin_usdt,
  204. client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}",
  205. )
  206. snapshot = {
  207. "created_at": now_iso(),
  208. "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor",
  209. "orders_submitted": 0,
  210. "strategy": "bb-squeeze-l48-bw960-q0.25-sl0.01-tpnone-both-none-vc0.006-ddnone-cd24",
  211. "previous_state": asdict(previous_state),
  212. "next_state": asdict(next_state),
  213. "signal": signal,
  214. "current_position": asdict(current),
  215. "target_position": asdict(target),
  216. "execution_plan": {
  217. "current": asdict(plan.current),
  218. "target": asdict(plan.target),
  219. "actions": [asdict(action) for action in plan.actions],
  220. },
  221. "rendered_orders": [asdict(order) for order in orders],
  222. "account": account,
  223. "risk_limits": {
  224. "submit_enabled": submit_live,
  225. "margin_per_unit_usdt": margin_per_unit_usdt,
  226. "max_new_margin_usdt": max_new_margin_usdt,
  227. "max_total_margin_usdt": max_total_margin_usdt,
  228. },
  229. }
  230. if submit_live:
  231. client.ensure_hedge_mode()
  232. submitted = []
  233. try:
  234. for rendered in orders:
  235. body = rendered.body
  236. client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"])
  237. submitted.append(asdict(client.submit_market_order_body(body)))
  238. except ValueError as exc:
  239. snapshot["orders_submitted"] = len(submitted)
  240. snapshot["submitted_orders"] = submitted
  241. snapshot["execution_error"] = str(exc)
  242. append_event(state_dir, snapshot)
  243. raise
  244. snapshot["orders_submitted"] = len(submitted)
  245. snapshot["submitted_orders"] = submitted
  246. save_state(state_path, next_state)
  247. append_event(state_dir, snapshot)
  248. return snapshot
  249. def main() -> int:
  250. parser = argparse.ArgumentParser(description="Run BB squeeze live executor.")
  251. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  252. parser.add_argument("--margin-per-unit-usdt", type=float)
  253. parser.add_argument("--max-new-margin-usdt", type=float)
  254. parser.add_argument("--max-total-margin-usdt", type=float)
  255. parser.add_argument("--submit-live", action="store_true")
  256. parser.add_argument("--confirm-live", action="store_true")
  257. args = parser.parse_args()
  258. if args.submit_live != args.confirm_live:
  259. raise ValueError("--submit-live and --confirm-live must be used together")
  260. snapshot = run_once(
  261. state_dir=args.state_dir,
  262. margin_per_unit_usdt=risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT"),
  263. max_new_margin_usdt=risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT"),
  264. max_total_margin_usdt=risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT"),
  265. submit_live=args.submit_live,
  266. )
  267. print(json.dumps(snapshot, indent=2, sort_keys=True))
  268. return 0
  269. if __name__ == "__main__":
  270. raise SystemExit(main())