from __future__ import annotations import argparse import csv import json import sys from datetime import UTC, datetime from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.okx_client import OkxClient from scripts import explore_ultrashort as explore SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") BARS = ("15m", "5m", "3m") def iso_text(ts: int | None) -> str | None: if ts is None: return None return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z") def local_last_ts(cache_dir: Path, symbol: str, bar: str) -> tuple[int, int | None]: path = cache_dir / symbol / f"{bar}.csv" rows = 0 last_ts: int | None = None if not path.exists(): return rows, last_ts with path.open("r", encoding="utf-8", newline="") as handle: for row in csv.DictReader(handle): rows += 1 last_ts = int(row["ts"]) return rows, last_ts def freshness_rows( *, client: OkxClient, cache_dir: Path, symbols: tuple[str, ...] = SYMBOLS, bars: tuple[str, ...] = BARS, ) -> list[dict[str, object]]: rows = [] for symbol in symbols: for bar in bars: local_rows, local_ts = local_last_ts(cache_dir, symbol, bar) recent = client.get_recent_candles(symbol, bar, 2) remote_ts = recent[-1].ts if recent else None interval = explore.CANDLE_BAR_MS[bar] lag_bars = None if local_ts is None or remote_ts is None else max(0, (remote_ts - local_ts) // interval) rows.append( { "symbol": symbol, "bar": bar, "local_rows": local_rows, "local_last_ts": local_ts, "local_last_time": iso_text(local_ts), "remote_last_ts": remote_ts, "remote_last_time": iso_text(remote_ts), "lag_bars": lag_bars, "lag_ms": None if local_ts is None or remote_ts is None else max(0, remote_ts - local_ts), } ) return rows def build_report(cache_dir: Path, client: OkxClient) -> dict[str, object]: rows = freshness_rows(client=client, cache_dir=cache_dir) max_lag_bars = max((int(row["lag_bars"]) for row in rows if row["lag_bars"] is not None), default=None) return { "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"), "cache_dir": str(cache_dir), "max_lag_bars": max_lag_bars, "rows": rows, } def main() -> int: parser = argparse.ArgumentParser(description="Report local OKX candle cache freshness against recent OKX candles.") parser.add_argument("--cache-dir", type=Path, default=explore.CANDLE_CACHE_DIR) args = parser.parse_args() print(json.dumps(build_report(args.cache_dir, OkxClient()), indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())