| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- from __future__ import annotations
- import argparse
- import csv
- import json
- import sys
- from datetime import UTC, datetime
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.okx_client import OkxClient
- from scripts import explore_ultrashort as explore
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- BARS = ("15m", "5m", "3m")
- def iso_text(ts: int | None) -> str | None:
- if ts is None:
- return None
- return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
- def local_last_ts(cache_dir: Path, symbol: str, bar: str) -> tuple[int, int | None]:
- path = cache_dir / symbol / f"{bar}.csv"
- rows = 0
- last_ts: int | None = None
- if not path.exists():
- return rows, last_ts
- with path.open("r", encoding="utf-8", newline="") as handle:
- for row in csv.DictReader(handle):
- rows += 1
- last_ts = int(row["ts"])
- return rows, last_ts
- def freshness_rows(
- *,
- client: OkxClient,
- cache_dir: Path,
- symbols: tuple[str, ...] = SYMBOLS,
- bars: tuple[str, ...] = BARS,
- ) -> list[dict[str, object]]:
- rows = []
- for symbol in symbols:
- for bar in bars:
- local_rows, local_ts = local_last_ts(cache_dir, symbol, bar)
- recent = client.get_recent_candles(symbol, bar, 2)
- remote_ts = recent[-1].ts if recent else None
- interval = explore.CANDLE_BAR_MS[bar]
- lag_bars = None if local_ts is None or remote_ts is None else max(0, (remote_ts - local_ts) // interval)
- rows.append(
- {
- "symbol": symbol,
- "bar": bar,
- "local_rows": local_rows,
- "local_last_ts": local_ts,
- "local_last_time": iso_text(local_ts),
- "remote_last_ts": remote_ts,
- "remote_last_time": iso_text(remote_ts),
- "lag_bars": lag_bars,
- "lag_ms": None if local_ts is None or remote_ts is None else max(0, remote_ts - local_ts),
- }
- )
- return rows
- def build_report(cache_dir: Path, client: OkxClient) -> dict[str, object]:
- rows = freshness_rows(client=client, cache_dir=cache_dir)
- max_lag_bars = max((int(row["lag_bars"]) for row in rows if row["lag_bars"] is not None), default=None)
- return {
- "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
- "cache_dir": str(cache_dir),
- "max_lag_bars": max_lag_bars,
- "rows": rows,
- }
- def main() -> int:
- parser = argparse.ArgumentParser(description="Report local OKX candle cache freshness against recent OKX candles.")
- parser.add_argument("--cache-dir", type=Path, default=explore.CANDLE_CACHE_DIR)
- args = parser.parse_args()
- print(json.dumps(build_report(args.cache_dir, OkxClient()), indent=2, sort_keys=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|