run_eth_nextgen_micro_observer.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import subprocess
  5. import sys
  6. import time
  7. from dataclasses import asdict
  8. from datetime import UTC, datetime
  9. from pathlib import Path
  10. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  11. from okx_codex_trader.config import load_config
  12. from okx_codex_trader import eth_nextgen_micro as intent
  13. from okx_codex_trader.live_execution import TargetPosition, load_runtime_state, plan_position_delta, save_runtime_state, target_from_signal
  14. from okx_codex_trader.okx_client import OkxClient
  15. from scripts import explore_ultrashort as explore
  16. ROOT = Path(__file__).resolve().parents[1]
  17. STATE_DIR = ROOT / "var" / "eth-nextgen-micro"
  18. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  19. BAR = "15m"
  20. HISTORY_LIMIT = 350_400
  21. def now_iso() -> str:
  22. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  23. def append_jsonl(path: Path, payload: dict[str, object]) -> None:
  24. path.parent.mkdir(parents=True, exist_ok=True)
  25. with path.open("a", encoding="utf-8") as handle:
  26. handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
  27. def run_command(command: list[str]) -> dict[str, object]:
  28. started_at = now_iso()
  29. completed = subprocess.run(command, cwd=ROOT, text=True, capture_output=True)
  30. return {
  31. "command": command,
  32. "started_at": started_at,
  33. "finished_at": now_iso(),
  34. "returncode": completed.returncode,
  35. "stdout_tail": completed.stdout[-4000:],
  36. "stderr_tail": completed.stderr[-4000:],
  37. }
  38. def refresh_candles(client: OkxClient) -> dict[str, object]:
  39. rows = {}
  40. for symbol in SYMBOLS:
  41. candles = explore.get_candles_cached(client, symbol, BAR, HISTORY_LIMIT, explore.CANDLE_CACHE_DIR)
  42. rows[symbol] = {
  43. "rows": len(candles),
  44. "first_ts": candles[0].ts if candles else None,
  45. "last_ts": candles[-1].ts if candles else None,
  46. "last_time": "" if not candles else datetime.fromtimestamp(candles[-1].ts / 1000, UTC).isoformat().replace("+00:00", "Z"),
  47. }
  48. return rows
  49. def private_snapshot() -> dict[str, object] | None:
  50. try:
  51. client = OkxClient(load_config())
  52. except ValueError:
  53. return None
  54. try:
  55. return {
  56. "balance": client.get_account_balance("USDT"),
  57. "positions": [asdict(position) for position in client.get_positions("ETH-USDT-SWAP")],
  58. }
  59. except ValueError as exc:
  60. return {"account_error": str(exc)}
  61. def run_once(state_dir: Path) -> dict[str, object]:
  62. state_dir.mkdir(parents=True, exist_ok=True)
  63. public_client = OkxClient()
  64. payload: dict[str, object] = {
  65. "created_at": now_iso(),
  66. "mode": "readonly_observer",
  67. "orders_submitted": 0,
  68. }
  69. payload["candles"] = refresh_candles(public_client)
  70. signal_payload = intent.build_payload()
  71. intent.JSON_REPORT.write_text(json.dumps(signal_payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  72. intent.MARKDOWN_REPORT.write_text(intent.markdown_report(signal_payload), encoding="utf-8")
  73. runtime_state_path = state_dir / "observer-runtime-state.json"
  74. previous_state = load_runtime_state(runtime_state_path)
  75. next_state, target_position = target_from_signal(signal_payload, previous_state)
  76. save_runtime_state(runtime_state_path, next_state)
  77. current_position = TargetPosition(side="flat", unit=0.0, known=False, reason="OKX current position is not normalized to strategy units")
  78. execution_plan = plan_position_delta(current_position, target_position)
  79. payload["signal"] = signal_payload["decision"]
  80. payload["execution_intent"] = signal_payload["execution_intent"]
  81. payload["runtime_state"] = asdict(next_state)
  82. payload["target_position"] = asdict(target_position)
  83. payload["execution_plan"] = {
  84. "current": asdict(execution_plan.current),
  85. "target": asdict(execution_plan.target),
  86. "actions": [asdict(action) for action in execution_plan.actions],
  87. }
  88. payload["switch_state"] = signal_payload["switch_state"]
  89. payload["risk_limits"] = signal_payload["risk_limits"]
  90. payload["account"] = private_snapshot()
  91. payload["tests"] = run_command([sys.executable, "-m", "pytest", "-q", "tests/test_build_eth_nextgen_micro_signal_intent.py"])
  92. heartbeat_path = state_dir / "heartbeat.json"
  93. heartbeat_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  94. append_jsonl(state_dir / "observer-events.jsonl", payload)
  95. return payload
  96. def main() -> int:
  97. parser = argparse.ArgumentParser(description="Run readonly ETH nextgen+micro live observer.")
  98. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  99. parser.add_argument("--interval-seconds", type=int, default=300)
  100. parser.add_argument("--once", action="store_true")
  101. args = parser.parse_args()
  102. while True:
  103. try:
  104. payload = run_once(args.state_dir)
  105. print(json.dumps(payload, indent=2, sort_keys=True))
  106. except Exception as exc:
  107. error = {"created_at": now_iso(), "mode": "readonly_observer", "orders_submitted": 0, "error": str(exc)}
  108. append_jsonl(args.state_dir / "observer-events.jsonl", error)
  109. print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
  110. if args.once:
  111. return 1
  112. if args.once:
  113. return 0
  114. time.sleep(args.interval_seconds)
  115. if __name__ == "__main__":
  116. raise SystemExit(main())