run_bb_squeeze_executor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import os
  5. import sys
  6. import time
  7. from dataclasses import asdict
  8. from datetime import UTC, datetime
  9. from pathlib import Path
  10. import pandas as pd
  11. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  12. from okx_codex_trader.config import load_config
  13. from okx_codex_trader.candles import align_candles_by_ts
  14. from okx_codex_trader.bb_squeeze_strategy import (
  15. BANDWIDTH_LOOKBACK,
  16. COOLDOWN_BARS,
  17. EMPTY_STATE,
  18. STOP_LOSS_PCT,
  19. TAKE_PROFIT_PCT,
  20. StrategyState,
  21. signal_from_frame,
  22. strategy_name,
  23. )
  24. from okx_codex_trader.live_execution import (
  25. TargetPosition,
  26. current_position_from_okx,
  27. plan_position_delta,
  28. render_market_order_bodies,
  29. )
  30. from okx_codex_trader.models import Candle
  31. from okx_codex_trader.okx_client import OkxClient
  32. ROOT = Path(__file__).resolve().parents[1]
  33. STATE_DIR = ROOT / "var" / "bb-squeeze-executor"
  34. STATE_FILE = "runtime-state.json"
  35. EVENTS_FILE = "events.jsonl"
  36. SYMBOL = "ETH-USDT-SWAP"
  37. BTC_SYMBOL = "BTC-USDT-SWAP"
  38. BAR = "15m"
  39. LEVERAGE = 3
  40. LIVE_CANDLE_LIMIT = 1_200
  41. RECENT_CANDLE_LIMIT = 20
  42. def now_iso() -> str:
  43. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  44. def load_state(path: Path) -> StrategyState:
  45. if not path.exists():
  46. return EMPTY_STATE
  47. payload = json.loads(path.read_text(encoding="utf-8"))
  48. return StrategyState(
  49. last_candle_ts=payload["last_candle_ts"],
  50. active_side=payload["active_side"],
  51. entry_price=payload["entry_price"],
  52. entry_candle_ts=payload["entry_candle_ts"],
  53. cooldown_until_ts=payload["cooldown_until_ts"],
  54. middle_exit_streak=int(payload.get("middle_exit_streak", 0)),
  55. max_favorable_move_pct=payload.get("max_favorable_move_pct"),
  56. )
  57. def save_state(path: Path, state: StrategyState) -> None:
  58. path.parent.mkdir(parents=True, exist_ok=True)
  59. path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
  60. def append_event(state_dir: Path, payload: dict[str, object]) -> None:
  61. state_dir.mkdir(parents=True, exist_ok=True)
  62. with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle:
  63. handle.write(json.dumps(payload, sort_keys=True) + "\n")
  64. def append_loop_error(state_dir: Path, message: str) -> None:
  65. append_event(
  66. state_dir,
  67. {
  68. "created_at": now_iso(),
  69. "mode": "bb_squeeze_live_executor_error",
  70. "execution_error": message,
  71. },
  72. )
  73. def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
  74. frame = pd.DataFrame([asdict(candle) for candle in candles])
  75. frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
  76. return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
  77. def aligned_frame_from_candles(eth_candles: list[Candle], btc_candles: list[Candle]) -> pd.DataFrame:
  78. eth_candles, btc_candles = align_candles_by_ts(eth_candles, btc_candles)
  79. eth = frame_from_candles(eth_candles)
  80. btc = frame_from_candles(btc_candles)[["ts", "close"]].rename(columns={"close": "btc_close"})
  81. return eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
  82. def load_live_frame(client: OkxClient) -> pd.DataFrame:
  83. return aligned_frame_from_candles(
  84. client.get_candles(SYMBOL, BAR, LIVE_CANDLE_LIMIT),
  85. client.get_candles(BTC_SYMBOL, BAR, LIVE_CANDLE_LIMIT),
  86. )
  87. def refresh_live_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
  88. if frame is None or len(frame) < BANDWIDTH_LOOKBACK + 2:
  89. return load_live_frame(client)
  90. recent = aligned_frame_from_candles(
  91. client.get_recent_candles(SYMBOL, BAR, RECENT_CANDLE_LIMIT),
  92. client.get_recent_candles(BTC_SYMBOL, BAR, RECENT_CANDLE_LIMIT),
  93. )
  94. merged = pd.concat([frame, recent], ignore_index=True)
  95. merged = merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT)
  96. return merged.reset_index(drop=True)
  97. def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]:
  98. positions = client.get_positions(SYMBOL)
  99. metadata = client.get_instrument_meta(SYMBOL)
  100. mark_price = client.get_last_price(SYMBOL)
  101. current = current_position_from_okx(
  102. positions=positions,
  103. mark_price=mark_price,
  104. metadata=metadata,
  105. leverage=LEVERAGE,
  106. margin_per_unit_usdt=margin_per_unit_usdt,
  107. )
  108. return current, {
  109. "positions": [asdict(position) for position in positions],
  110. "instrument_meta": asdict(metadata),
  111. "mark_price": mark_price,
  112. }
  113. def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition:
  114. side = str(signal["target_side"])
  115. if side == "flat":
  116. return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"]))
  117. if current.known and current.side == side and current.unit > 0.0:
  118. return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position")
  119. return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"]))
  120. def risk_arg(value: float | None, env_name: str) -> float:
  121. if value is not None:
  122. return value
  123. raw = os.environ.get(env_name)
  124. if raw is None or raw == "":
  125. raise ValueError(f"{env_name} is required")
  126. return float(raw)
  127. def run_once(
  128. *,
  129. state_dir: Path,
  130. margin_per_unit_usdt: float,
  131. max_new_margin_usdt: float,
  132. max_total_margin_usdt: float,
  133. submit_live: bool,
  134. frame: pd.DataFrame | None = None,
  135. ) -> dict[str, object]:
  136. state_dir.mkdir(parents=True, exist_ok=True)
  137. state_path = state_dir / STATE_FILE
  138. previous_state = load_state(state_path)
  139. client = OkxClient(load_config())
  140. if frame is None:
  141. frame = load_live_frame(client)
  142. next_state, signal = signal_from_frame(frame, previous_state)
  143. current, account = account_current_position(client, margin_per_unit_usdt)
  144. if previous_state.active_side is not None and current.known and current.side == "flat":
  145. decision_candle_ts = int(signal["decision_candle_ts"])
  146. next_state = StrategyState(
  147. decision_candle_ts,
  148. None,
  149. None,
  150. None,
  151. decision_candle_ts + (COOLDOWN_BARS * 900_000),
  152. 0,
  153. None,
  154. )
  155. signal = {
  156. **signal,
  157. "signal": "external_flat_sync",
  158. "target_side": "flat",
  159. }
  160. target = target_position(signal, current)
  161. plan = plan_position_delta(current, target)
  162. orders = ()
  163. if current.known and target.known:
  164. from okx_codex_trader.models import InstrumentMeta
  165. meta = account["instrument_meta"]
  166. orders = render_market_order_bodies(
  167. plan=plan,
  168. symbol=SYMBOL,
  169. mark_price=float(account["mark_price"]),
  170. metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])),
  171. leverage=LEVERAGE,
  172. margin_per_unit_usdt=margin_per_unit_usdt,
  173. max_new_margin_usdt=max_new_margin_usdt,
  174. max_total_margin_usdt=max_total_margin_usdt,
  175. client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}",
  176. stop_loss_pct=STOP_LOSS_PCT,
  177. take_profit_pct=TAKE_PROFIT_PCT,
  178. )
  179. snapshot = {
  180. "created_at": now_iso(),
  181. "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor",
  182. "orders_submitted": 0,
  183. "strategy": strategy_name(),
  184. "previous_state": asdict(previous_state),
  185. "next_state": asdict(next_state),
  186. "signal": signal,
  187. "current_position": asdict(current),
  188. "target_position": asdict(target),
  189. "execution_plan": {
  190. "current": asdict(plan.current),
  191. "target": asdict(plan.target),
  192. "actions": [asdict(action) for action in plan.actions],
  193. },
  194. "rendered_orders": [asdict(order) for order in orders],
  195. "account": account,
  196. "risk_limits": {
  197. "submit_enabled": submit_live,
  198. "margin_per_unit_usdt": margin_per_unit_usdt,
  199. "max_new_margin_usdt": max_new_margin_usdt,
  200. "max_total_margin_usdt": max_total_margin_usdt,
  201. },
  202. }
  203. if submit_live:
  204. client.ensure_hedge_mode()
  205. submitted = []
  206. try:
  207. for rendered in orders:
  208. body = rendered.body
  209. client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"])
  210. submitted.append(asdict(client.submit_market_order_body(body)))
  211. except ValueError as exc:
  212. snapshot["orders_submitted"] = len(submitted)
  213. snapshot["submitted_orders"] = submitted
  214. snapshot["execution_error"] = str(exc)
  215. append_event(state_dir, snapshot)
  216. raise
  217. snapshot["orders_submitted"] = len(submitted)
  218. snapshot["submitted_orders"] = submitted
  219. save_state(state_path, next_state)
  220. append_event(state_dir, snapshot)
  221. return snapshot
  222. def main() -> int:
  223. parser = argparse.ArgumentParser(description="Run BB squeeze live executor.")
  224. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  225. parser.add_argument("--margin-per-unit-usdt", type=float)
  226. parser.add_argument("--max-new-margin-usdt", type=float)
  227. parser.add_argument("--max-total-margin-usdt", type=float)
  228. parser.add_argument("--submit-live", action="store_true")
  229. parser.add_argument("--confirm-live", action="store_true")
  230. parser.add_argument("--loop", action="store_true")
  231. parser.add_argument("--poll-seconds", type=float, default=10.0)
  232. args = parser.parse_args()
  233. if args.submit_live != args.confirm_live:
  234. raise ValueError("--submit-live and --confirm-live must be used together")
  235. margin_per_unit_usdt = risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT")
  236. max_new_margin_usdt = risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT")
  237. max_total_margin_usdt = risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT")
  238. if not args.loop:
  239. snapshot = run_once(
  240. state_dir=args.state_dir,
  241. margin_per_unit_usdt=margin_per_unit_usdt,
  242. max_new_margin_usdt=max_new_margin_usdt,
  243. max_total_margin_usdt=max_total_margin_usdt,
  244. submit_live=args.submit_live,
  245. )
  246. print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
  247. return 0
  248. frame: pd.DataFrame | None = None
  249. while True:
  250. try:
  251. frame = refresh_live_frame(OkxClient(), frame)
  252. state = load_state(args.state_dir / STATE_FILE)
  253. _, loop_signal = signal_from_frame(frame, state)
  254. if loop_signal["signal"] != "state_replay":
  255. snapshot = run_once(
  256. state_dir=args.state_dir,
  257. margin_per_unit_usdt=margin_per_unit_usdt,
  258. max_new_margin_usdt=max_new_margin_usdt,
  259. max_total_margin_usdt=max_total_margin_usdt,
  260. submit_live=args.submit_live,
  261. frame=frame,
  262. )
  263. print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
  264. except ValueError as exc:
  265. append_loop_error(args.state_dir, str(exc))
  266. print(json.dumps({"created_at": now_iso(), "execution_error": str(exc)}, sort_keys=True), flush=True)
  267. time.sleep(args.poll_seconds)
  268. return 0
  269. if __name__ == "__main__":
  270. raise SystemExit(main())