run_bb_squeeze_executor.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import os
  5. import sys
  6. import time
  7. from dataclasses import asdict, dataclass
  8. from datetime import UTC, datetime
  9. from pathlib import Path
  10. import pandas as pd
  11. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  12. from okx_codex_trader.config import load_config
  13. from okx_codex_trader.live_execution import (
  14. TargetPosition,
  15. current_position_from_okx,
  16. plan_position_delta,
  17. render_market_order_bodies,
  18. )
  19. from okx_codex_trader.models import Candle
  20. from okx_codex_trader.okx_client import OkxClient
  21. ROOT = Path(__file__).resolve().parents[1]
  22. STATE_DIR = ROOT / "var" / "bb-squeeze-executor"
  23. STATE_FILE = "runtime-state.json"
  24. EVENTS_FILE = "events.jsonl"
  25. SYMBOL = "ETH-USDT-SWAP"
  26. BAR = "15m"
  27. LEVERAGE = 3
  28. BAND_LENGTH = 48
  29. BANDWIDTH_LOOKBACK = 960
  30. BANDWIDTH_QUANTILE = 0.25
  31. STOP_LOSS_PCT = 0.01
  32. ETH_VOL_CAP = 0.006
  33. COOLDOWN_BARS = 24
  34. LIVE_CANDLE_LIMIT = 1_200
  35. RECENT_CANDLE_LIMIT = 20
  36. @dataclass(frozen=True)
  37. class StrategyState:
  38. last_candle_ts: int | None
  39. active_side: str | None
  40. entry_price: float | None
  41. entry_candle_ts: int | None
  42. cooldown_until_ts: int | None
  43. EMPTY_STATE = StrategyState(None, None, None, None, None)
  44. def now_iso() -> str:
  45. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  46. def load_state(path: Path) -> StrategyState:
  47. if not path.exists():
  48. return EMPTY_STATE
  49. payload = json.loads(path.read_text(encoding="utf-8"))
  50. return StrategyState(
  51. last_candle_ts=payload["last_candle_ts"],
  52. active_side=payload["active_side"],
  53. entry_price=payload["entry_price"],
  54. entry_candle_ts=payload["entry_candle_ts"],
  55. cooldown_until_ts=payload["cooldown_until_ts"],
  56. )
  57. def save_state(path: Path, state: StrategyState) -> None:
  58. path.parent.mkdir(parents=True, exist_ok=True)
  59. path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
  60. def append_event(state_dir: Path, payload: dict[str, object]) -> None:
  61. state_dir.mkdir(parents=True, exist_ok=True)
  62. with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle:
  63. handle.write(json.dumps(payload, sort_keys=True) + "\n")
  64. def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
  65. frame = pd.DataFrame([asdict(candle) for candle in candles])
  66. frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
  67. return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
  68. def load_live_frame(client: OkxClient) -> pd.DataFrame:
  69. return frame_from_candles(client.get_candles(SYMBOL, BAR, LIVE_CANDLE_LIMIT))
  70. def refresh_live_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
  71. if frame is None or len(frame) < BANDWIDTH_LOOKBACK + 2:
  72. return load_live_frame(client)
  73. recent = frame_from_candles(client.get_recent_candles(SYMBOL, BAR, RECENT_CANDLE_LIMIT))
  74. merged = pd.concat([frame, recent], ignore_index=True)
  75. merged = merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT)
  76. return merged.reset_index(drop=True)
  77. def signal_from_frame(frame: pd.DataFrame, state: StrategyState) -> tuple[StrategyState, dict[str, object]]:
  78. if len(frame) < BANDWIDTH_LOOKBACK + 2:
  79. raise ValueError("not enough candles")
  80. close = frame["close"].astype(float)
  81. middle = close.rolling(BAND_LENGTH).mean()
  82. stdev = close.rolling(BAND_LENGTH).std(ddof=0)
  83. upper = middle + (2.0 * stdev)
  84. lower = middle - (2.0 * stdev)
  85. bandwidth = (upper - lower) / middle
  86. threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE)
  87. eth_vol = close.pct_change().rolling(96).std(ddof=0)
  88. decision_index = len(frame) - 1
  89. row = frame.iloc[decision_index]
  90. candle_ts = int(row["ts"])
  91. candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z")
  92. indicators = {
  93. "eth_close": float(row["close"]),
  94. "middle": float(middle.iloc[decision_index]),
  95. "upper": float(upper.iloc[decision_index]),
  96. "lower": float(lower.iloc[decision_index]),
  97. "bandwidth": float(bandwidth.iloc[decision_index]),
  98. "bandwidth_threshold": float(threshold.iloc[decision_index]),
  99. "eth_vol_96": float(eth_vol.iloc[decision_index]),
  100. }
  101. if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
  102. return state, {
  103. "decision_candle_ts": candle_ts,
  104. "decision_candle_time": candle_time,
  105. "signal": "state_replay",
  106. "target_side": state.active_side or "flat",
  107. "indicators": indicators,
  108. }
  109. next_state = StrategyState(candle_ts, state.active_side, state.entry_price, state.entry_candle_ts, state.cooldown_until_ts)
  110. signal = "hold"
  111. target_side = state.active_side or "flat"
  112. if state.active_side is not None:
  113. entry_price = float(state.entry_price)
  114. stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT)
  115. stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop)
  116. middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or (
  117. state.active_side == "short" and float(row["close"]) > indicators["middle"]
  118. )
  119. if stop_hit or middle_exit:
  120. signal = "exit_stop" if stop_hit else "exit_middle"
  121. target_side = "flat"
  122. next_state = StrategyState(
  123. candle_ts,
  124. None,
  125. None,
  126. None,
  127. candle_ts + (COOLDOWN_BARS * 900_000),
  128. )
  129. else:
  130. cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts
  131. compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"]
  132. vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP
  133. if cooldown_ok and compressed and vol_ok and float(row["close"]) > indicators["upper"]:
  134. signal = "entry_long"
  135. target_side = "long"
  136. next_state = StrategyState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts)
  137. elif cooldown_ok and compressed and vol_ok and float(row["close"]) < indicators["lower"]:
  138. signal = "entry_short"
  139. target_side = "short"
  140. next_state = StrategyState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts)
  141. return next_state, {
  142. "decision_candle_ts": candle_ts,
  143. "decision_candle_time": candle_time,
  144. "signal": signal,
  145. "target_side": target_side,
  146. "indicators": indicators,
  147. "params": {
  148. "band_length": BAND_LENGTH,
  149. "bandwidth_lookback": BANDWIDTH_LOOKBACK,
  150. "bandwidth_quantile": BANDWIDTH_QUANTILE,
  151. "stop_loss_pct": STOP_LOSS_PCT,
  152. "eth_vol_cap": ETH_VOL_CAP,
  153. "cooldown_bars": COOLDOWN_BARS,
  154. "side_mode": "both",
  155. },
  156. }
  157. def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]:
  158. positions = client.get_positions(SYMBOL)
  159. metadata = client.get_instrument_meta(SYMBOL)
  160. mark_price = client.get_last_price(SYMBOL)
  161. current = current_position_from_okx(
  162. positions=positions,
  163. mark_price=mark_price,
  164. metadata=metadata,
  165. leverage=LEVERAGE,
  166. margin_per_unit_usdt=margin_per_unit_usdt,
  167. )
  168. return current, {
  169. "positions": [asdict(position) for position in positions],
  170. "instrument_meta": asdict(metadata),
  171. "mark_price": mark_price,
  172. }
  173. def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition:
  174. side = str(signal["target_side"])
  175. if side == "flat":
  176. return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"]))
  177. if current.known and current.side == side and current.unit > 0.0:
  178. return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position")
  179. return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"]))
  180. def risk_arg(value: float | None, env_name: str) -> float:
  181. if value is not None:
  182. return value
  183. raw = os.environ.get(env_name)
  184. if raw is None or raw == "":
  185. raise ValueError(f"{env_name} is required")
  186. return float(raw)
  187. def run_once(
  188. *,
  189. state_dir: Path,
  190. margin_per_unit_usdt: float,
  191. max_new_margin_usdt: float,
  192. max_total_margin_usdt: float,
  193. submit_live: bool,
  194. frame: pd.DataFrame | None = None,
  195. ) -> dict[str, object]:
  196. state_dir.mkdir(parents=True, exist_ok=True)
  197. state_path = state_dir / STATE_FILE
  198. previous_state = load_state(state_path)
  199. client = OkxClient(load_config())
  200. if frame is None:
  201. frame = load_live_frame(client)
  202. next_state, signal = signal_from_frame(frame, previous_state)
  203. current, account = account_current_position(client, margin_per_unit_usdt)
  204. target = target_position(signal, current)
  205. plan = plan_position_delta(current, target)
  206. orders = ()
  207. if current.known and target.known:
  208. from okx_codex_trader.models import InstrumentMeta
  209. meta = account["instrument_meta"]
  210. orders = render_market_order_bodies(
  211. plan=plan,
  212. symbol=SYMBOL,
  213. mark_price=float(account["mark_price"]),
  214. metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])),
  215. leverage=LEVERAGE,
  216. margin_per_unit_usdt=margin_per_unit_usdt,
  217. max_new_margin_usdt=max_new_margin_usdt,
  218. max_total_margin_usdt=max_total_margin_usdt,
  219. client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}",
  220. stop_loss_pct=STOP_LOSS_PCT,
  221. )
  222. snapshot = {
  223. "created_at": now_iso(),
  224. "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor",
  225. "orders_submitted": 0,
  226. "strategy": "bb-squeeze-l48-bw960-q0.25-sl0.01-tpnone-both-none-vc0.006-ddnone-cd24",
  227. "previous_state": asdict(previous_state),
  228. "next_state": asdict(next_state),
  229. "signal": signal,
  230. "current_position": asdict(current),
  231. "target_position": asdict(target),
  232. "execution_plan": {
  233. "current": asdict(plan.current),
  234. "target": asdict(plan.target),
  235. "actions": [asdict(action) for action in plan.actions],
  236. },
  237. "rendered_orders": [asdict(order) for order in orders],
  238. "account": account,
  239. "risk_limits": {
  240. "submit_enabled": submit_live,
  241. "margin_per_unit_usdt": margin_per_unit_usdt,
  242. "max_new_margin_usdt": max_new_margin_usdt,
  243. "max_total_margin_usdt": max_total_margin_usdt,
  244. },
  245. }
  246. if submit_live:
  247. client.ensure_hedge_mode()
  248. submitted = []
  249. try:
  250. for rendered in orders:
  251. body = rendered.body
  252. client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"])
  253. submitted.append(asdict(client.submit_market_order_body(body)))
  254. except ValueError as exc:
  255. snapshot["orders_submitted"] = len(submitted)
  256. snapshot["submitted_orders"] = submitted
  257. snapshot["execution_error"] = str(exc)
  258. append_event(state_dir, snapshot)
  259. raise
  260. snapshot["orders_submitted"] = len(submitted)
  261. snapshot["submitted_orders"] = submitted
  262. save_state(state_path, next_state)
  263. append_event(state_dir, snapshot)
  264. return snapshot
  265. def main() -> int:
  266. parser = argparse.ArgumentParser(description="Run BB squeeze live executor.")
  267. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  268. parser.add_argument("--margin-per-unit-usdt", type=float)
  269. parser.add_argument("--max-new-margin-usdt", type=float)
  270. parser.add_argument("--max-total-margin-usdt", type=float)
  271. parser.add_argument("--submit-live", action="store_true")
  272. parser.add_argument("--confirm-live", action="store_true")
  273. parser.add_argument("--loop", action="store_true")
  274. parser.add_argument("--poll-seconds", type=float, default=10.0)
  275. args = parser.parse_args()
  276. if args.submit_live != args.confirm_live:
  277. raise ValueError("--submit-live and --confirm-live must be used together")
  278. margin_per_unit_usdt = risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT")
  279. max_new_margin_usdt = risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT")
  280. max_total_margin_usdt = risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT")
  281. if not args.loop:
  282. snapshot = run_once(
  283. state_dir=args.state_dir,
  284. margin_per_unit_usdt=margin_per_unit_usdt,
  285. max_new_margin_usdt=max_new_margin_usdt,
  286. max_total_margin_usdt=max_total_margin_usdt,
  287. submit_live=args.submit_live,
  288. )
  289. print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
  290. return 0
  291. frame: pd.DataFrame | None = None
  292. while True:
  293. frame = refresh_live_frame(OkxClient(), frame)
  294. state = load_state(args.state_dir / STATE_FILE)
  295. _, loop_signal = signal_from_frame(frame, state)
  296. if loop_signal["signal"] != "state_replay":
  297. snapshot = run_once(
  298. state_dir=args.state_dir,
  299. margin_per_unit_usdt=margin_per_unit_usdt,
  300. max_new_margin_usdt=max_new_margin_usdt,
  301. max_total_margin_usdt=max_total_margin_usdt,
  302. submit_live=args.submit_live,
  303. frame=frame,
  304. )
  305. print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
  306. time.sleep(args.poll_seconds)
  307. return 0
  308. if __name__ == "__main__":
  309. raise SystemExit(main())