run_calendar_fusion_observer.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import sys
  5. import time
  6. from datetime import UTC, datetime
  7. from pathlib import Path
  8. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  9. from scripts import build_calendar_fusion_observation_intent as intent
  10. from scripts import explore_ultrashort as explore
  11. ROOT = Path(__file__).resolve().parents[1]
  12. STATE_DIR = ROOT / "var" / "calendar-fusion"
  13. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  14. BAR = "15m"
  15. def now_iso() -> str:
  16. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  17. def append_jsonl(path: Path, payload: dict[str, object]) -> None:
  18. path.parent.mkdir(parents=True, exist_ok=True)
  19. with path.open("a", encoding="utf-8") as handle:
  20. handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
  21. def run_once(state_dir: Path) -> dict[str, object]:
  22. state_dir.mkdir(parents=True, exist_ok=True)
  23. candles = cached_candle_status()
  24. payload = intent.build_payload()
  25. payload["created_at"] = now_iso()
  26. payload["orders_submitted"] = 0
  27. payload["candles"] = candles
  28. intent.REPORT_DIR.mkdir(parents=True, exist_ok=True)
  29. intent.OUTPUT_JSON.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  30. intent.OUTPUT_MD.write_text(intent.markdown(payload), encoding="utf-8")
  31. (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  32. append_jsonl(state_dir / "observer-events.jsonl", payload)
  33. return payload
  34. def cached_candle_status() -> dict[str, object]:
  35. rows = {}
  36. for symbol in SYMBOLS:
  37. candles, _history_exhausted = explore.load_cached_candles(explore.CANDLE_CACHE_DIR, symbol, BAR)
  38. rows[symbol] = {
  39. "rows": len(candles),
  40. "first_ts": candles[0].ts if candles else None,
  41. "last_ts": candles[-1].ts if candles else None,
  42. "last_time": "" if not candles else datetime.fromtimestamp(candles[-1].ts / 1000, UTC).isoformat().replace("+00:00", "Z"),
  43. }
  44. return rows
  45. def main() -> int:
  46. parser = argparse.ArgumentParser(description="Run calendar-fusion read-only observer.")
  47. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  48. parser.add_argument("--interval-seconds", type=int, default=300)
  49. parser.add_argument("--once", action="store_true")
  50. args = parser.parse_args()
  51. while True:
  52. try:
  53. payload = run_once(args.state_dir)
  54. print(json.dumps(payload, indent=2, sort_keys=True))
  55. except Exception as exc:
  56. error = {"created_at": now_iso(), "mode": "calendar_fusion_readonly_observer", "orders_submitted": 0, "error": str(exc)}
  57. append_jsonl(args.state_dir / "observer-events.jsonl", error)
  58. print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
  59. if args.once:
  60. return 1
  61. if args.once:
  62. return 0
  63. time.sleep(args.interval_seconds)
  64. if __name__ == "__main__":
  65. raise SystemExit(main())