from __future__ import annotations import argparse import json import subprocess import sys import time from dataclasses import asdict from datetime import UTC, datetime from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.config import load_config from okx_codex_trader.okx_client import OkxClient from scripts import build_eth_nextgen_micro_signal_intent as intent from scripts import explore_ultrashort as explore ROOT = Path(__file__).resolve().parents[1] STATE_DIR = ROOT / "var" / "eth-nextgen-micro" SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") BAR = "15m" HISTORY_LIMIT = 350_400 def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def append_jsonl(path: Path, payload: dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n") def run_command(command: list[str]) -> dict[str, object]: started_at = now_iso() completed = subprocess.run(command, cwd=ROOT, text=True, capture_output=True) return { "command": command, "started_at": started_at, "finished_at": now_iso(), "returncode": completed.returncode, "stdout_tail": completed.stdout[-4000:], "stderr_tail": completed.stderr[-4000:], } def refresh_candles(client: OkxClient) -> dict[str, object]: rows = {} for symbol in SYMBOLS: candles = explore.get_candles_cached(client, symbol, BAR, HISTORY_LIMIT, explore.CANDLE_CACHE_DIR) rows[symbol] = { "rows": len(candles), "first_ts": candles[0].ts if candles else None, "last_ts": candles[-1].ts if candles else None, "last_time": "" if not candles else datetime.fromtimestamp(candles[-1].ts / 1000, UTC).isoformat().replace("+00:00", "Z"), } return rows def private_snapshot() -> dict[str, object] | None: try: client = OkxClient(load_config()) except ValueError: return None try: return { "balance": client.get_account_balance("USDT"), "positions": [asdict(position) for position in client.get_positions("ETH-USDT-SWAP")], } except ValueError as exc: return {"account_error": str(exc)} def run_once(state_dir: Path) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) public_client = OkxClient() payload: dict[str, object] = { "created_at": now_iso(), "mode": "readonly_observer", "orders_submitted": 0, } payload["candles"] = refresh_candles(public_client) signal_payload = intent.build_payload() intent.JSON_REPORT.write_text(json.dumps(signal_payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") intent.MARKDOWN_REPORT.write_text(intent.markdown_report(signal_payload), encoding="utf-8") payload["signal"] = signal_payload["decision"] payload["switch_state"] = signal_payload["switch_state"] payload["risk_limits"] = signal_payload["risk_limits"] payload["account"] = private_snapshot() payload["tests"] = run_command([sys.executable, "-m", "pytest", "-q", "tests/test_build_eth_nextgen_micro_signal_intent.py"]) heartbeat_path = state_dir / "heartbeat.json" heartbeat_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") append_jsonl(state_dir / "observer-events.jsonl", payload) return payload def main() -> int: parser = argparse.ArgumentParser(description="Run readonly ETH nextgen+micro live observer.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--interval-seconds", type=int, default=300) parser.add_argument("--once", action="store_true") args = parser.parse_args() while True: try: payload = run_once(args.state_dir) print(json.dumps(payload, indent=2, sort_keys=True)) except Exception as exc: error = {"created_at": now_iso(), "mode": "readonly_observer", "orders_submitted": 0, "error": str(exc)} append_jsonl(args.state_dir / "observer-events.jsonl", error) print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr) if args.once: return 1 if args.once: return 0 time.sleep(args.interval_seconds) if __name__ == "__main__": raise SystemExit(main())