run_eth_nextgen_micro_observer.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import subprocess
  5. import sys
  6. import time
  7. from dataclasses import asdict
  8. from datetime import UTC, datetime
  9. from pathlib import Path
  10. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  11. from okx_codex_trader.config import load_config
  12. from okx_codex_trader.okx_client import OkxClient
  13. from scripts import build_eth_nextgen_micro_signal_intent as intent
  14. from scripts import explore_ultrashort as explore
  15. ROOT = Path(__file__).resolve().parents[1]
  16. STATE_DIR = ROOT / "var" / "eth-nextgen-micro"
  17. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  18. BAR = "15m"
  19. HISTORY_LIMIT = 350_400
  20. def now_iso() -> str:
  21. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  22. def append_jsonl(path: Path, payload: dict[str, object]) -> None:
  23. path.parent.mkdir(parents=True, exist_ok=True)
  24. with path.open("a", encoding="utf-8") as handle:
  25. handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
  26. def run_command(command: list[str]) -> dict[str, object]:
  27. started_at = now_iso()
  28. completed = subprocess.run(command, cwd=ROOT, text=True, capture_output=True)
  29. return {
  30. "command": command,
  31. "started_at": started_at,
  32. "finished_at": now_iso(),
  33. "returncode": completed.returncode,
  34. "stdout_tail": completed.stdout[-4000:],
  35. "stderr_tail": completed.stderr[-4000:],
  36. }
  37. def refresh_candles(client: OkxClient) -> dict[str, object]:
  38. rows = {}
  39. for symbol in SYMBOLS:
  40. candles = explore.get_candles_cached(client, symbol, BAR, HISTORY_LIMIT, explore.CANDLE_CACHE_DIR)
  41. rows[symbol] = {
  42. "rows": len(candles),
  43. "first_ts": candles[0].ts if candles else None,
  44. "last_ts": candles[-1].ts if candles else None,
  45. "last_time": "" if not candles else datetime.fromtimestamp(candles[-1].ts / 1000, UTC).isoformat().replace("+00:00", "Z"),
  46. }
  47. return rows
  48. def private_snapshot() -> dict[str, object] | None:
  49. try:
  50. client = OkxClient(load_config())
  51. except ValueError:
  52. return None
  53. try:
  54. return {
  55. "balance": client.get_account_balance("USDT"),
  56. "positions": [asdict(position) for position in client.get_positions("ETH-USDT-SWAP")],
  57. }
  58. except ValueError as exc:
  59. return {"account_error": str(exc)}
  60. def run_once(state_dir: Path) -> dict[str, object]:
  61. state_dir.mkdir(parents=True, exist_ok=True)
  62. public_client = OkxClient()
  63. payload: dict[str, object] = {
  64. "created_at": now_iso(),
  65. "mode": "readonly_observer",
  66. "orders_submitted": 0,
  67. }
  68. payload["candles"] = refresh_candles(public_client)
  69. signal_payload = intent.build_payload()
  70. intent.JSON_REPORT.write_text(json.dumps(signal_payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  71. intent.MARKDOWN_REPORT.write_text(intent.markdown_report(signal_payload), encoding="utf-8")
  72. payload["signal"] = signal_payload["decision"]
  73. payload["switch_state"] = signal_payload["switch_state"]
  74. payload["risk_limits"] = signal_payload["risk_limits"]
  75. payload["account"] = private_snapshot()
  76. payload["tests"] = run_command([sys.executable, "-m", "pytest", "-q", "tests/test_build_eth_nextgen_micro_signal_intent.py"])
  77. heartbeat_path = state_dir / "heartbeat.json"
  78. heartbeat_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  79. append_jsonl(state_dir / "observer-events.jsonl", payload)
  80. return payload
  81. def main() -> int:
  82. parser = argparse.ArgumentParser(description="Run readonly ETH nextgen+micro live observer.")
  83. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  84. parser.add_argument("--interval-seconds", type=int, default=300)
  85. parser.add_argument("--once", action="store_true")
  86. args = parser.parse_args()
  87. while True:
  88. try:
  89. payload = run_once(args.state_dir)
  90. print(json.dumps(payload, indent=2, sort_keys=True))
  91. except Exception as exc:
  92. error = {"created_at": now_iso(), "mode": "readonly_observer", "orders_submitted": 0, "error": str(exc)}
  93. append_jsonl(args.state_dir / "observer-events.jsonl", error)
  94. print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
  95. if args.once:
  96. return 1
  97. if args.once:
  98. return 0
  99. time.sleep(args.interval_seconds)
  100. if __name__ == "__main__":
  101. raise SystemExit(main())