report_candle_cache_freshness.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from __future__ import annotations
  2. import argparse
  3. import csv
  4. import json
  5. import sys
  6. from datetime import UTC, datetime
  7. from pathlib import Path
  8. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  9. from okx_codex_trader.okx_client import OkxClient
  10. from scripts import explore_ultrashort as explore
  11. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  12. BARS = ("15m", "5m", "3m")
  13. def iso_text(ts: int | None) -> str | None:
  14. if ts is None:
  15. return None
  16. return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
  17. def local_last_ts(cache_dir: Path, symbol: str, bar: str) -> tuple[int, int | None]:
  18. path = cache_dir / symbol / f"{bar}.csv"
  19. rows = 0
  20. last_ts: int | None = None
  21. if not path.exists():
  22. return rows, last_ts
  23. with path.open("r", encoding="utf-8", newline="") as handle:
  24. for row in csv.DictReader(handle):
  25. rows += 1
  26. last_ts = int(row["ts"])
  27. return rows, last_ts
  28. def freshness_rows(
  29. *,
  30. client: OkxClient,
  31. cache_dir: Path,
  32. symbols: tuple[str, ...] = SYMBOLS,
  33. bars: tuple[str, ...] = BARS,
  34. ) -> list[dict[str, object]]:
  35. rows = []
  36. for symbol in symbols:
  37. for bar in bars:
  38. local_rows, local_ts = local_last_ts(cache_dir, symbol, bar)
  39. recent = client.get_recent_candles(symbol, bar, 2)
  40. remote_ts = recent[-1].ts if recent else None
  41. interval = explore.CANDLE_BAR_MS[bar]
  42. lag_bars = None if local_ts is None or remote_ts is None else max(0, (remote_ts - local_ts) // interval)
  43. rows.append(
  44. {
  45. "symbol": symbol,
  46. "bar": bar,
  47. "local_rows": local_rows,
  48. "local_last_ts": local_ts,
  49. "local_last_time": iso_text(local_ts),
  50. "remote_last_ts": remote_ts,
  51. "remote_last_time": iso_text(remote_ts),
  52. "lag_bars": lag_bars,
  53. "lag_ms": None if local_ts is None or remote_ts is None else max(0, remote_ts - local_ts),
  54. }
  55. )
  56. return rows
  57. def build_report(cache_dir: Path, client: OkxClient) -> dict[str, object]:
  58. rows = freshness_rows(client=client, cache_dir=cache_dir)
  59. max_lag_bars = max((int(row["lag_bars"]) for row in rows if row["lag_bars"] is not None), default=None)
  60. return {
  61. "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
  62. "cache_dir": str(cache_dir),
  63. "max_lag_bars": max_lag_bars,
  64. "rows": rows,
  65. }
  66. def main() -> int:
  67. parser = argparse.ArgumentParser(description="Report local OKX candle cache freshness against recent OKX candles.")
  68. parser.add_argument("--cache-dir", type=Path, default=explore.CANDLE_CACHE_DIR)
  69. args = parser.parse_args()
  70. print(json.dumps(build_report(args.cache_dir, OkxClient()), indent=2, sort_keys=True))
  71. return 0
  72. if __name__ == "__main__":
  73. raise SystemExit(main())