run_eth_focused_portfolio_observer.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import sys
  5. import time
  6. from datetime import UTC, datetime
  7. from pathlib import Path
  8. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  9. from okx_codex_trader.okx_client import OkxClient
  10. from scripts import build_eth_focused_portfolio_signal_intent as intent
  11. from scripts import explore_ultrashort as explore
  12. ROOT = Path(__file__).resolve().parents[1]
  13. STATE_DIR = ROOT / "var" / "eth-focused-portfolio"
  14. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  15. LATEST_REFRESH_LIMIT = 100
  16. def now_iso() -> str:
  17. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  18. def append_jsonl(path: Path, payload: dict[str, object]) -> None:
  19. path.parent.mkdir(parents=True, exist_ok=True)
  20. with path.open("a", encoding="utf-8") as handle:
  21. handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
  22. def merge_latest_candles(client: OkxClient, symbol: str, bar: str) -> list[object]:
  23. cached, history_exhausted = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar)
  24. merged = {candle.ts: candle for candle in cached}
  25. for candle in client.get_candles(symbol, bar, LATEST_REFRESH_LIMIT):
  26. merged[candle.ts] = candle
  27. candles = sorted(merged.values(), key=lambda candle: candle.ts)
  28. explore.save_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar, candles, history_exhausted)
  29. return candles
  30. def cached_candles(symbol: str, bar: str) -> list[object]:
  31. candles, _history_exhausted = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, bar)
  32. return candles
  33. def candle_row(candles: list[object]) -> dict[str, object]:
  34. return {
  35. "rows": len(candles),
  36. "first_ts": candles[0].ts if candles else None,
  37. "last_ts": candles[-1].ts if candles else None,
  38. "last_time": "" if not candles else datetime.fromtimestamp(candles[-1].ts / 1000, UTC).isoformat().replace("+00:00", "Z"),
  39. }
  40. def refresh_candles(client: OkxClient) -> dict[str, object]:
  41. rows = {}
  42. for symbol in SYMBOLS:
  43. rows[symbol] = {
  44. "15m": candle_row(cached_candles(symbol, "15m")),
  45. "5m": candle_row(merge_latest_candles(client, symbol, "5m")),
  46. }
  47. return rows
  48. def run_once(state_dir: Path) -> dict[str, object]:
  49. state_dir.mkdir(parents=True, exist_ok=True)
  50. candles = refresh_candles(OkxClient())
  51. payload = intent.build_payload()
  52. payload["created_at"] = now_iso()
  53. payload["orders_submitted"] = 0
  54. payload["candles"] = candles
  55. intent.REPORT_DIR.mkdir(parents=True, exist_ok=True)
  56. intent.JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  57. intent.MARKDOWN_REPORT.write_text(intent.markdown_report(payload), encoding="utf-8")
  58. (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  59. append_jsonl(state_dir / "observer-events.jsonl", payload)
  60. return payload
  61. def main() -> int:
  62. parser = argparse.ArgumentParser(description="Run ETH-focused portfolio read-only observer.")
  63. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  64. parser.add_argument("--interval-seconds", type=int, default=300)
  65. parser.add_argument("--once", action="store_true")
  66. args = parser.parse_args()
  67. while True:
  68. try:
  69. payload = run_once(args.state_dir)
  70. print(json.dumps(payload, indent=2, sort_keys=True))
  71. except Exception as exc:
  72. error = {"created_at": now_iso(), "mode": "eth_focused_portfolio_readonly_observer", "orders_submitted": 0, "error": str(exc)}
  73. append_jsonl(args.state_dir / "observer-events.jsonl", error)
  74. print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
  75. if args.once:
  76. return 1
  77. if args.once:
  78. return 0
  79. time.sleep(args.interval_seconds)
  80. if __name__ == "__main__":
  81. raise SystemExit(main())