live_execution.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. from __future__ import annotations
  2. import json
  3. from dataclasses import asdict, dataclass
  4. from pathlib import Path
  5. import re
  6. from typing import Literal
  7. from okx_codex_trader.models import InstrumentMeta, Position
  8. from okx_codex_trader.okx_client import OkxClient, build_contract_size
  9. PositionSide = Literal["flat", "long", "short"]
  10. @dataclass(frozen=True)
  11. class TargetPosition:
  12. side: PositionSide
  13. unit: float
  14. known: bool
  15. reason: str
  16. @dataclass(frozen=True)
  17. class RuntimeState:
  18. last_candle_ts: int | None
  19. nextgen_active_legs: tuple[str, ...]
  20. micro_side: Literal["long", "short"] | None
  21. @dataclass(frozen=True)
  22. class PlannedAction:
  23. action: Literal["noop", "open", "increase", "reduce", "close", "reverse"]
  24. side: PositionSide
  25. unit: float
  26. reduce_only: bool
  27. @dataclass(frozen=True)
  28. class ExecutionPlan:
  29. target: TargetPosition
  30. current: TargetPosition
  31. actions: tuple[PlannedAction, ...]
  32. @dataclass(frozen=True)
  33. class RenderedOrder:
  34. action: str
  35. margin_usdt: float
  36. body: dict[str, str]
  37. EMPTY_STATE = RuntimeState(last_candle_ts=None, nextgen_active_legs=(), micro_side=None)
  38. CLIENT_ORDER_ID_MAX_LENGTH = 32
  39. def load_runtime_state(path: Path) -> RuntimeState:
  40. if not path.exists():
  41. return EMPTY_STATE
  42. payload = json.loads(path.read_text(encoding="utf-8"))
  43. return RuntimeState(
  44. last_candle_ts=payload["last_candle_ts"],
  45. nextgen_active_legs=tuple(payload["nextgen_active_legs"]),
  46. micro_side=payload["micro_side"],
  47. )
  48. def save_runtime_state(path: Path, state: RuntimeState) -> None:
  49. path.parent.mkdir(parents=True, exist_ok=True)
  50. path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
  51. def _decision_candle_ts(payload: dict[str, object]) -> int:
  52. active_engine = str(payload["decision"]["active_engine"])
  53. if active_engine == "nextgen":
  54. nextgen = payload["nextgen"]
  55. if "data" in nextgen:
  56. return int(nextgen["data"]["decision_candle_ts"])
  57. return int(nextgen["decision"]["decision_candle_ts"])
  58. return int(payload["micro"]["decision_candle_ts"])
  59. def target_from_signal(payload: dict[str, object], state: RuntimeState) -> tuple[RuntimeState, TargetPosition]:
  60. candle_ts = _decision_candle_ts(payload)
  61. if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
  62. return state, target_from_state(payload, state)
  63. active_engine = str(payload["decision"]["active_engine"])
  64. if active_engine == "nextgen":
  65. active = set(state.nextgen_active_legs)
  66. weights: dict[str, float] = {}
  67. for leg in payload["nextgen"]["legs"]:
  68. leg_id = str(leg["leg_id"])
  69. weights[leg_id] = float(leg["suggested_weight"])
  70. if bool(leg["signal"]):
  71. active.add(leg_id)
  72. elif leg_id in active and bool(leg["exit_signal"]):
  73. active.remove(leg_id)
  74. next_state = RuntimeState(last_candle_ts=candle_ts, nextgen_active_legs=tuple(sorted(active)), micro_side=None)
  75. return next_state, _nextgen_target(next_state, weights)
  76. return (
  77. RuntimeState(last_candle_ts=candle_ts, nextgen_active_legs=(), micro_side=state.micro_side),
  78. TargetPosition(
  79. side="flat",
  80. unit=0.0,
  81. known=False,
  82. reason="micro target position requires persistent micro exit state before live execution",
  83. ),
  84. )
  85. def target_from_state(payload: dict[str, object], state: RuntimeState) -> TargetPosition:
  86. active_engine = str(payload["decision"]["active_engine"])
  87. if active_engine != "nextgen":
  88. return TargetPosition(
  89. side="flat",
  90. unit=0.0,
  91. known=False,
  92. reason="micro target position requires persistent micro exit state before live execution",
  93. )
  94. weights = {str(leg["leg_id"]): float(leg["suggested_weight"]) for leg in payload["nextgen"]["legs"]}
  95. return _nextgen_target(state, weights)
  96. def _nextgen_target(state: RuntimeState, weights: dict[str, float]) -> TargetPosition:
  97. unit = sum(weights[leg_id] for leg_id in state.nextgen_active_legs)
  98. if unit <= 0.0:
  99. return TargetPosition(side="flat", unit=0.0, known=True, reason="no active nextgen virtual legs")
  100. return TargetPosition(side="long", unit=unit, known=True, reason="active nextgen virtual legs net to one long ETH target")
  101. def plan_position_delta(current: TargetPosition, target: TargetPosition) -> ExecutionPlan:
  102. if not current.known or not target.known:
  103. return ExecutionPlan(target=target, current=current, actions=())
  104. if current.side == target.side and current.unit == target.unit:
  105. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("noop", target.side, 0.0, False),))
  106. if current.side == "flat":
  107. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("open", target.side, target.unit, False),))
  108. if target.side == "flat":
  109. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("close", current.side, current.unit, True),))
  110. if current.side == target.side:
  111. if target.unit > current.unit:
  112. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("increase", target.side, target.unit - current.unit, False),))
  113. return ExecutionPlan(target=target, current=current, actions=(PlannedAction("reduce", current.side, current.unit - target.unit, True),))
  114. return ExecutionPlan(
  115. target=target,
  116. current=current,
  117. actions=(
  118. PlannedAction("close", current.side, current.unit, True),
  119. PlannedAction("reverse", target.side, target.unit, False),
  120. ),
  121. )
  122. def current_position_from_okx(
  123. *,
  124. positions: list[Position],
  125. mark_price: float,
  126. metadata: InstrumentMeta,
  127. leverage: int,
  128. margin_per_unit_usdt: float,
  129. ) -> TargetPosition:
  130. if leverage <= 0 or margin_per_unit_usdt <= 0.0 or mark_price <= 0.0 or metadata.ct_val <= 0.0:
  131. raise ValueError("position normalization inputs are invalid")
  132. active = [position for position in positions if position.size > 0.0]
  133. if not active:
  134. return TargetPosition(side="flat", unit=0.0, known=True, reason="no open OKX position")
  135. sides = {position.pos_side for position in active}
  136. if len(sides) != 1:
  137. return TargetPosition(side="flat", unit=0.0, known=False, reason="both OKX hedge sides are open")
  138. side = active[0].pos_side
  139. if side not in {"long", "short"}:
  140. return TargetPosition(side="flat", unit=0.0, known=False, reason="OKX position side is unsupported")
  141. notional = sum(position.size for position in active) * metadata.ct_val * mark_price
  142. margin = notional / leverage
  143. unit = margin / margin_per_unit_usdt
  144. return TargetPosition(side=side, unit=unit, known=True, reason="OKX position normalized by configured strategy unit margin")
  145. def render_market_order_bodies(
  146. *,
  147. plan: ExecutionPlan,
  148. symbol: str,
  149. mark_price: float,
  150. metadata: InstrumentMeta,
  151. leverage: int,
  152. margin_per_unit_usdt: float,
  153. max_new_margin_usdt: float,
  154. max_total_margin_usdt: float,
  155. client_order_id_prefix: str,
  156. ) -> tuple[RenderedOrder, ...]:
  157. if leverage <= 0 or margin_per_unit_usdt <= 0.0 or max_new_margin_usdt < 0.0 or max_total_margin_usdt < 0.0:
  158. raise ValueError("order rendering inputs are invalid")
  159. if plan.target.known and plan.target.unit * margin_per_unit_usdt > max_total_margin_usdt:
  160. raise ValueError("target margin exceeds max_total_margin_usdt")
  161. rendered: list[RenderedOrder] = []
  162. new_margin = 0.0
  163. index = 1
  164. for action in plan.actions:
  165. if action.action == "noop":
  166. continue
  167. margin = action.unit * margin_per_unit_usdt
  168. if margin <= 0.0 or action.side == "flat":
  169. raise ValueError("planned action is invalid")
  170. if not action.reduce_only:
  171. new_margin += margin
  172. if new_margin > max_new_margin_usdt:
  173. raise ValueError("new margin exceeds max_new_margin_usdt")
  174. side = _okx_side(action)
  175. size = build_contract_size(margin * leverage, mark_price, metadata)
  176. rendered.append(
  177. RenderedOrder(
  178. action=action.action,
  179. margin_usdt=margin,
  180. body=OkxClient.build_market_order_body(
  181. symbol=symbol,
  182. side=side,
  183. pos_side=action.side,
  184. size=size,
  185. client_order_id=market_client_order_id(client_order_id_prefix, index, action.action),
  186. reduce_only=action.reduce_only,
  187. ),
  188. )
  189. )
  190. index += 1
  191. return tuple(rendered)
  192. def market_client_order_id(prefix: str, index: int, action: str) -> str:
  193. compact = re.sub(r"[^A-Za-z0-9]", "", f"{prefix}{index}{action}")
  194. if not compact:
  195. raise ValueError("client order id prefix is invalid")
  196. return compact[:CLIENT_ORDER_ID_MAX_LENGTH]
  197. def _okx_side(action: PlannedAction) -> str:
  198. if action.reduce_only:
  199. return "sell" if action.side == "long" else "buy"
  200. return "buy" if action.side == "long" else "sell"