live_execution.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. from __future__ import annotations
  2. import json
  3. from dataclasses import asdict, dataclass
  4. from pathlib import Path
  5. import re
  6. from typing import Literal
  7. from okx_codex_trader.models import InstrumentMeta, Position
  8. from okx_codex_trader.okx_client import OkxClient, build_contract_size
  9. PositionSide = Literal["flat", "long", "short"]
  10. @dataclass(frozen=True)
  11. class TargetPosition:
  12. side: PositionSide
  13. unit: float
  14. known: bool
  15. reason: str
  16. contracts: float | None = None
  17. @dataclass(frozen=True)
  18. class RuntimeState:
  19. last_candle_ts: int | None
  20. nextgen_active_legs: tuple[str, ...]
  21. micro_side: Literal["long", "short"] | None
  22. @dataclass(frozen=True)
  23. class PlannedAction:
  24. action: Literal["noop", "open", "increase", "reduce", "close", "reverse"]
  25. side: PositionSide
  26. unit: float
  27. reduce_only: bool
  28. @dataclass(frozen=True)
  29. class ExecutionPlan:
  30. target: TargetPosition
  31. current: TargetPosition
  32. actions: tuple[PlannedAction, ...]
  33. @dataclass(frozen=True)
  34. class RenderedOrder:
  35. action: str
  36. margin_usdt: float
  37. body: dict[str, str]
  38. EMPTY_STATE = RuntimeState(last_candle_ts=None, nextgen_active_legs=(), micro_side=None)
  39. CLIENT_ORDER_ID_MAX_LENGTH = 32
  40. def load_runtime_state(path: Path) -> RuntimeState:
  41. if not path.exists():
  42. return EMPTY_STATE
  43. payload = json.loads(path.read_text(encoding="utf-8"))
  44. return RuntimeState(
  45. last_candle_ts=payload["last_candle_ts"],
  46. nextgen_active_legs=tuple(payload["nextgen_active_legs"]),
  47. micro_side=payload["micro_side"],
  48. )
  49. def save_runtime_state(path: Path, state: RuntimeState) -> None:
  50. path.parent.mkdir(parents=True, exist_ok=True)
  51. path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
  52. def _decision_candle_ts(payload: dict[str, object]) -> int:
  53. active_engine = str(payload["decision"]["active_engine"])
  54. if active_engine == "nextgen":
  55. nextgen = payload["nextgen"]
  56. if "data" in nextgen:
  57. return int(nextgen["data"]["decision_candle_ts"])
  58. return int(nextgen["decision"]["decision_candle_ts"])
  59. return int(payload["micro"]["decision_candle_ts"])
  60. def target_from_signal(payload: dict[str, object], state: RuntimeState) -> tuple[RuntimeState, TargetPosition]:
  61. candle_ts = _decision_candle_ts(payload)
  62. if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
  63. return state, target_from_state(payload, state)
  64. active_engine = str(payload["decision"]["active_engine"])
  65. if active_engine == "nextgen":
  66. active = set(state.nextgen_active_legs)
  67. weights: dict[str, float] = {}
  68. for leg in payload["nextgen"]["legs"]:
  69. leg_id = str(leg["leg_id"])
  70. weights[leg_id] = float(leg["suggested_weight"])
  71. if bool(leg["signal"]):
  72. active.add(leg_id)
  73. elif leg_id in active and bool(leg["exit_signal"]):
  74. active.remove(leg_id)
  75. next_state = RuntimeState(last_candle_ts=candle_ts, nextgen_active_legs=tuple(sorted(active)), micro_side=None)
  76. return next_state, _nextgen_target(next_state, weights)
  77. return (
  78. RuntimeState(last_candle_ts=candle_ts, nextgen_active_legs=(), micro_side=state.micro_side),
  79. TargetPosition(
  80. side="flat",
  81. unit=0.0,
  82. known=False,
  83. reason="micro target position requires persistent micro exit state before live execution",
  84. ),
  85. )
  86. def target_from_state(payload: dict[str, object], state: RuntimeState) -> TargetPosition:
  87. active_engine = str(payload["decision"]["active_engine"])
  88. if active_engine != "nextgen":
  89. return TargetPosition(
  90. side="flat",
  91. unit=0.0,
  92. known=False,
  93. reason="micro target position requires persistent micro exit state before live execution",
  94. )
  95. weights = {str(leg["leg_id"]): float(leg["suggested_weight"]) for leg in payload["nextgen"]["legs"]}
  96. return _nextgen_target(state, weights)
  97. def _nextgen_target(state: RuntimeState, weights: dict[str, float]) -> TargetPosition:
  98. unit = sum(weights[leg_id] for leg_id in state.nextgen_active_legs)
  99. if unit <= 0.0:
  100. return TargetPosition(side="flat", unit=0.0, known=True, reason="no active nextgen virtual legs")
  101. return TargetPosition(side="long", unit=unit, known=True, reason="active nextgen virtual legs net to one long ETH target")
  102. def plan_position_delta(current: TargetPosition, target: TargetPosition) -> ExecutionPlan:
  103. if not current.known or not target.known:
  104. return ExecutionPlan(target=target, current=current, actions=())
  105. if current.side == target.side and current.unit == target.unit:
  106. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("noop", target.side, 0.0, False),))
  107. if current.side == "flat":
  108. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("open", target.side, target.unit, False),))
  109. if target.side == "flat":
  110. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("close", current.side, current.unit, True),))
  111. if current.side == target.side:
  112. if target.unit > current.unit:
  113. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("increase", target.side, target.unit - current.unit, False),))
  114. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("reduce", current.side, current.unit - target.unit, True),))
  115. return ExecutionPlan(
  116. target=target,
  117. current=current,
  118. actions=(
  119. PlannedAction("close", current.side, current.unit, True),
  120. PlannedAction("reverse", target.side, target.unit, False),
  121. ),
  122. )
  123. def current_position_from_okx(
  124. *,
  125. positions: list[Position],
  126. mark_price: float,
  127. metadata: InstrumentMeta,
  128. leverage: int,
  129. margin_per_unit_usdt: float,
  130. ) -> TargetPosition:
  131. if leverage <= 0 or margin_per_unit_usdt <= 0.0 or mark_price <= 0.0 or metadata.ct_val <= 0.0:
  132. raise ValueError("position normalization inputs are invalid")
  133. active = [position for position in positions if position.size > 0.0]
  134. if not active:
  135. return TargetPosition(side="flat", unit=0.0, known=True, reason="no open OKX position", contracts=0.0)
  136. sides = {position.pos_side for position in active}
  137. if len(sides) != 1:
  138. return TargetPosition(side="flat", unit=0.0, known=False, reason="both OKX hedge sides are open")
  139. side = active[0].pos_side
  140. if side not in {"long", "short"}:
  141. return TargetPosition(side="flat", unit=0.0, known=False, reason="OKX position side is unsupported")
  142. contracts = sum(position.size for position in active)
  143. notional = contracts * metadata.ct_val * mark_price
  144. margin = notional / leverage
  145. unit = margin / margin_per_unit_usdt
  146. return TargetPosition(side=side, unit=unit, known=True, reason="OKX position normalized by configured strategy unit margin", contracts=contracts)
  147. def render_market_order_bodies(
  148. *,
  149. plan: ExecutionPlan,
  150. symbol: str,
  151. mark_price: float,
  152. metadata: InstrumentMeta,
  153. leverage: int,
  154. margin_per_unit_usdt: float,
  155. max_new_margin_usdt: float,
  156. max_total_margin_usdt: float,
  157. client_order_id_prefix: str,
  158. stop_loss_pct: float | None = None,
  159. ) -> tuple[RenderedOrder, ...]:
  160. if leverage <= 0 or margin_per_unit_usdt <= 0.0 or max_new_margin_usdt < 0.0 or max_total_margin_usdt < 0.0:
  161. raise ValueError("order rendering inputs are invalid")
  162. if stop_loss_pct is not None and stop_loss_pct <= 0.0:
  163. raise ValueError("stop_loss_pct is invalid")
  164. if plan.target.known and plan.target.unit * margin_per_unit_usdt > max_total_margin_usdt:
  165. raise ValueError("target margin exceeds max_total_margin_usdt")
  166. rendered: list[RenderedOrder] = []
  167. new_margin = 0.0
  168. index = 1
  169. for action in plan.actions:
  170. if action.action == "noop":
  171. continue
  172. margin = action.unit * margin_per_unit_usdt
  173. if margin <= 0.0 or action.side == "flat":
  174. raise ValueError("planned action is invalid")
  175. if not action.reduce_only:
  176. new_margin += margin
  177. if new_margin > max_new_margin_usdt:
  178. raise ValueError("new margin exceeds max_new_margin_usdt")
  179. side = _okx_side(action)
  180. if action.action == "close":
  181. if plan.current.contracts is None or plan.current.contracts <= 0.0:
  182. raise ValueError("current contracts are required for close orders")
  183. size = plan.current.contracts
  184. else:
  185. size = build_contract_size(margin * leverage, mark_price, metadata)
  186. stop_loss_trigger_price = None
  187. if not action.reduce_only and stop_loss_pct is not None:
  188. stop_loss_trigger_price = mark_price * (1.0 - stop_loss_pct if action.side == "long" else 1.0 + stop_loss_pct)
  189. rendered.append(
  190. RenderedOrder(
  191. action=action.action,
  192. margin_usdt=margin,
  193. body=OkxClient.build_market_order_body(
  194. symbol=symbol,
  195. side=side,
  196. pos_side=action.side,
  197. size=size,
  198. client_order_id=market_client_order_id(client_order_id_prefix, index, action.action),
  199. reduce_only=action.reduce_only,
  200. stop_loss_trigger_price=stop_loss_trigger_price,
  201. ),
  202. )
  203. )
  204. index += 1
  205. return tuple(rendered)
  206. def market_client_order_id(prefix: str, index: int, action: str) -> str:
  207. compact = re.sub(r"[^A-Za-z0-9]", "", f"{prefix}{index}{action}")
  208. if not compact:
  209. raise ValueError("client order id prefix is invalid")
  210. return compact[:CLIENT_ORDER_ID_MAX_LENGTH]
  211. def _okx_side(action: PlannedAction) -> str:
  212. if action.reduce_only:
  213. return "sell" if action.side == "long" else "buy"
  214. return "buy" if action.side == "long" else "sell"