test_report_candle_cache_freshness.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435
  1. from okx_codex_trader.models import Candle
  2. from scripts import report_candle_cache_freshness as module
  3. def candle(ts: int) -> Candle:
  4. return Candle(symbol="ETH-USDT-SWAP", ts=ts, open=1.0, high=1.0, low=1.0, close=1.0, volume=1.0)
  5. def test_freshness_rows_reports_local_remote_gap(tmp_path):
  6. path = tmp_path / "ETH-USDT-SWAP"
  7. path.mkdir()
  8. (path / "15m.csv").write_text("ts,open,high,low,close,volume\n1000,1,1,1,1,1\n901000,1,1,1,1,1\n", encoding="utf-8")
  9. class Client:
  10. def get_recent_candles(self, symbol: str, bar: str, limit: int) -> list[Candle]:
  11. assert symbol == "ETH-USDT-SWAP"
  12. assert bar == "15m"
  13. assert limit == 2
  14. return [candle(1_801_000), candle(2_701_000)]
  15. rows = module.freshness_rows(client=Client(), cache_dir=tmp_path, symbols=("ETH-USDT-SWAP",), bars=("15m",))
  16. assert rows == [
  17. {
  18. "symbol": "ETH-USDT-SWAP",
  19. "bar": "15m",
  20. "local_rows": 2,
  21. "local_last_ts": 901000,
  22. "local_last_time": "1970-01-01T00:15:01Z",
  23. "remote_last_ts": 2701000,
  24. "remote_last_time": "1970-01-01T00:45:01Z",
  25. "lag_bars": 2,
  26. "lag_ms": 1800000,
  27. }
  28. ]