test_eth_nextgen_micro_executor.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import importlib.util
  2. import sys
  3. from pathlib import Path
  4. from okx_codex_trader.live_execution import TargetPosition
  5. def load_executor():
  6. path = Path(__file__).resolve().parents[1] / "scripts" / "run_eth_nextgen_micro_executor.py"
  7. spec = importlib.util.spec_from_file_location("run_eth_nextgen_micro_executor", path)
  8. assert spec is not None
  9. module = importlib.util.module_from_spec(spec)
  10. assert spec.loader is not None
  11. sys.modules[spec.name] = module
  12. spec.loader.exec_module(module)
  13. return module
  14. executor = load_executor()
  15. def payload_with_nextgen_long():
  16. return {
  17. "decision": {"active_engine": "nextgen", "selected_signal": "long"},
  18. "execution_intent": {"entry_signal": "long", "entry_unit": 0.5, "target_position_known": False, "target_position": None},
  19. "nextgen": {
  20. "data": {"decision_candle_ts": 1000},
  21. "legs": [
  22. {"leg_id": "a", "suggested_weight": 0.5, "signal": True, "exit_signal": False},
  23. {"leg_id": "b", "suggested_weight": 0.5, "signal": False, "exit_signal": False},
  24. ],
  25. },
  26. }
  27. def test_executor_snapshot_does_not_render_orders_when_current_position_is_unknown(monkeypatch, tmp_path):
  28. monkeypatch.setattr(executor.eth_nextgen_micro, "build_payload", payload_with_nextgen_long)
  29. monkeypatch.setattr(
  30. executor,
  31. "account_current_position",
  32. lambda _: (TargetPosition(side="flat", unit=0.0, known=False, reason="no credentials"), {"account_error": "no credentials"}),
  33. )
  34. snapshot = executor.build_snapshot(state_dir=tmp_path, margin_per_unit_usdt=1000.0, max_new_margin_usdt=500.0)
  35. assert snapshot["orders_submitted"] == 0
  36. assert snapshot["target_position"]["known"] is True
  37. assert snapshot["target_position"]["unit"] == 0.5
  38. assert snapshot["current_position"]["known"] is False
  39. assert snapshot["rendered_orders"] == []
  40. def test_executor_snapshot_renders_order_body_when_positions_are_known(monkeypatch, tmp_path):
  41. monkeypatch.setattr(executor.eth_nextgen_micro, "build_payload", payload_with_nextgen_long)
  42. monkeypatch.setattr(
  43. executor,
  44. "account_current_position",
  45. lambda _: (
  46. TargetPosition(side="flat", unit=0.0, known=True, reason="flat"),
  47. {"mark_price": 3000.0, "instrument_meta": {"ct_val": 0.1, "lot_sz": 1.0, "min_sz": 1.0}},
  48. ),
  49. )
  50. snapshot = executor.build_snapshot(state_dir=tmp_path, margin_per_unit_usdt=1000.0, max_new_margin_usdt=500.0)
  51. assert snapshot["orders_submitted"] == 0
  52. assert snapshot["rendered_orders"] == [
  53. {
  54. "action": "open",
  55. "margin_usdt": 500.0,
  56. "body": {
  57. "instId": "ETH-USDT-SWAP",
  58. "tdMode": "isolated",
  59. "side": "buy",
  60. "posSide": "long",
  61. "ordType": "market",
  62. "sz": "5",
  63. "clOrdId": "ethnm-1000-1-open",
  64. },
  65. }
  66. ]