| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- from __future__ import annotations
- import argparse
- import json
- import subprocess
- import sys
- import time
- from dataclasses import asdict
- from datetime import UTC, datetime
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.config import load_config
- from okx_codex_trader import eth_nextgen_micro as intent
- from okx_codex_trader.live_execution import TargetPosition, load_runtime_state, plan_position_delta, save_runtime_state, target_from_signal
- from okx_codex_trader.okx_client import OkxClient
- from scripts import explore_ultrashort as explore
- ROOT = Path(__file__).resolve().parents[1]
- STATE_DIR = ROOT / "var" / "eth-nextgen-micro"
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- BAR = "15m"
- HISTORY_LIMIT = 350_400
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def append_jsonl(path: Path, payload: dict[str, object]) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("a", encoding="utf-8") as handle:
- handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
- def run_command(command: list[str]) -> dict[str, object]:
- started_at = now_iso()
- completed = subprocess.run(command, cwd=ROOT, text=True, capture_output=True)
- return {
- "command": command,
- "started_at": started_at,
- "finished_at": now_iso(),
- "returncode": completed.returncode,
- "stdout_tail": completed.stdout[-4000:],
- "stderr_tail": completed.stderr[-4000:],
- }
- def refresh_candles(client: OkxClient) -> dict[str, object]:
- rows = {}
- for symbol in SYMBOLS:
- candles = explore.get_candles_cached(client, symbol, BAR, HISTORY_LIMIT, explore.CANDLE_CACHE_DIR)
- rows[symbol] = {
- "rows": len(candles),
- "first_ts": candles[0].ts if candles else None,
- "last_ts": candles[-1].ts if candles else None,
- "last_time": "" if not candles else datetime.fromtimestamp(candles[-1].ts / 1000, UTC).isoformat().replace("+00:00", "Z"),
- }
- return rows
- def private_snapshot() -> dict[str, object] | None:
- try:
- client = OkxClient(load_config())
- except ValueError:
- return None
- try:
- return {
- "balance": client.get_account_balance("USDT"),
- "positions": [asdict(position) for position in client.get_positions("ETH-USDT-SWAP")],
- }
- except ValueError as exc:
- return {"account_error": str(exc)}
- def run_once(state_dir: Path) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- public_client = OkxClient()
- payload: dict[str, object] = {
- "created_at": now_iso(),
- "mode": "readonly_observer",
- "orders_submitted": 0,
- }
- payload["candles"] = refresh_candles(public_client)
- signal_payload = intent.build_payload()
- intent.JSON_REPORT.write_text(json.dumps(signal_payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- intent.MARKDOWN_REPORT.write_text(intent.markdown_report(signal_payload), encoding="utf-8")
- runtime_state_path = state_dir / "observer-runtime-state.json"
- previous_state = load_runtime_state(runtime_state_path)
- next_state, target_position = target_from_signal(signal_payload, previous_state)
- save_runtime_state(runtime_state_path, next_state)
- current_position = TargetPosition(side="flat", unit=0.0, known=False, reason="OKX current position is not normalized to strategy units")
- execution_plan = plan_position_delta(current_position, target_position)
- payload["signal"] = signal_payload["decision"]
- payload["execution_intent"] = signal_payload["execution_intent"]
- payload["runtime_state"] = asdict(next_state)
- payload["target_position"] = asdict(target_position)
- payload["execution_plan"] = {
- "current": asdict(execution_plan.current),
- "target": asdict(execution_plan.target),
- "actions": [asdict(action) for action in execution_plan.actions],
- }
- payload["switch_state"] = signal_payload["switch_state"]
- payload["risk_limits"] = signal_payload["risk_limits"]
- payload["account"] = private_snapshot()
- payload["tests"] = run_command([sys.executable, "-m", "pytest", "-q", "tests/test_build_eth_nextgen_micro_signal_intent.py"])
- heartbeat_path = state_dir / "heartbeat.json"
- heartbeat_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- append_jsonl(state_dir / "observer-events.jsonl", payload)
- return payload
- def main() -> int:
- parser = argparse.ArgumentParser(description="Run readonly ETH nextgen+micro live observer.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--interval-seconds", type=int, default=300)
- parser.add_argument("--once", action="store_true")
- args = parser.parse_args()
- while True:
- try:
- payload = run_once(args.state_dir)
- print(json.dumps(payload, indent=2, sort_keys=True))
- except Exception as exc:
- error = {"created_at": now_iso(), "mode": "readonly_observer", "orders_submitted": 0, "error": str(exc)}
- append_jsonl(args.state_dir / "observer-events.jsonl", error)
- print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
- if args.once:
- return 1
- if args.once:
- return 0
- time.sleep(args.interval_seconds)
- if __name__ == "__main__":
- raise SystemExit(main())
|