diagnose_eth_exploration_data_freshness.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from __future__ import annotations
  2. import argparse
  3. import csv
  4. import json
  5. import sys
  6. from datetime import UTC, datetime
  7. from pathlib import Path
  8. ROOT = Path(__file__).resolve().parents[1]
  9. if str(ROOT) not in sys.path:
  10. sys.path.insert(0, str(ROOT))
  11. from okx_codex_trader.okx_client import OkxClient
  12. SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  13. DIRECT_BARS = ("15m", "5m", "3m")
  14. DERIVED_BARS = ("1H", "4H")
  15. BASE_BAR = "15m"
  16. BAR_MS = {
  17. "3m": 180_000,
  18. "5m": 300_000,
  19. "15m": 900_000,
  20. "1H": 3_600_000,
  21. "4H": 14_400_000,
  22. }
  23. def iso_text(ts: int | None) -> str | None:
  24. if ts is None:
  25. return None
  26. return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
  27. def scan_csv(path: Path, interval_ms: int) -> dict[str, object]:
  28. rows = 0
  29. first_ts: int | None = None
  30. last_ts: int | None = None
  31. duplicate_timestamps = 0
  32. non_increasing_steps = 0
  33. unexpected_intervals = 0
  34. seen: set[int] = set()
  35. if not path.exists():
  36. return {
  37. "rows": 0,
  38. "first_ts": None,
  39. "first_time": None,
  40. "last_ts": None,
  41. "last_time": None,
  42. "duplicate_timestamps": 0,
  43. "non_increasing_steps": 0,
  44. "unexpected_intervals": 0,
  45. }
  46. with path.open("r", encoding="utf-8", newline="") as handle:
  47. for row in csv.DictReader(handle):
  48. ts = int(row["ts"])
  49. rows += 1
  50. if first_ts is None:
  51. first_ts = ts
  52. if ts in seen:
  53. duplicate_timestamps += 1
  54. seen.add(ts)
  55. if last_ts is not None:
  56. diff = ts - last_ts
  57. if diff <= 0:
  58. non_increasing_steps += 1
  59. if diff != interval_ms:
  60. unexpected_intervals += 1
  61. last_ts = ts
  62. return {
  63. "rows": rows,
  64. "first_ts": first_ts,
  65. "first_time": iso_text(first_ts),
  66. "last_ts": last_ts,
  67. "last_time": iso_text(last_ts),
  68. "duplicate_timestamps": duplicate_timestamps,
  69. "non_increasing_steps": non_increasing_steps,
  70. "unexpected_intervals": unexpected_intervals,
  71. }
  72. def remote_last_ts(client: OkxClient, symbol: str, bar: str) -> int | None:
  73. candles = client.get_recent_candles(symbol, bar, 2)
  74. return candles[-1].ts if candles else None
  75. def last_complete_derived_ts(base_last_ts: int | None, derived_interval_ms: int) -> int | None:
  76. if base_last_ts is None:
  77. return None
  78. label = (base_last_ts // derived_interval_ms) * derived_interval_ms
  79. required_last_base_ts = label + derived_interval_ms - BAR_MS[BASE_BAR]
  80. if base_last_ts >= required_last_base_ts:
  81. return label
  82. previous = label - derived_interval_ms
  83. return previous if previous >= 0 else None
  84. def build_report(cache_dir: Path, client: OkxClient) -> dict[str, object]:
  85. direct_rows = []
  86. base_by_symbol: dict[str, dict[str, object]] = {}
  87. for symbol in SYMBOLS:
  88. for bar in DIRECT_BARS:
  89. local = scan_csv(cache_dir / symbol / f"{bar}.csv", BAR_MS[bar])
  90. remote_ts = remote_last_ts(client, symbol, bar)
  91. local_ts = local["last_ts"]
  92. lag_ms = None if local_ts is None or remote_ts is None else max(0, remote_ts - int(local_ts))
  93. row = {
  94. "symbol": symbol,
  95. "bar": bar,
  96. **local,
  97. "remote_last_ts": remote_ts,
  98. "remote_last_time": iso_text(remote_ts),
  99. "lag_ms": lag_ms,
  100. "lag_bars": None if lag_ms is None else lag_ms // BAR_MS[bar],
  101. "needs_update": bool(lag_ms and lag_ms > 0),
  102. }
  103. direct_rows.append(row)
  104. if bar == BASE_BAR:
  105. base_by_symbol[symbol] = row
  106. derived_rows = []
  107. for symbol in SYMBOLS:
  108. base = base_by_symbol[symbol]
  109. for bar in DERIVED_BARS:
  110. interval = BAR_MS[bar]
  111. local_ts = last_complete_derived_ts(base["last_ts"], interval) # type: ignore[arg-type]
  112. remote_ts = last_complete_derived_ts(base["remote_last_ts"], interval) # type: ignore[arg-type]
  113. lag_ms = None if local_ts is None or remote_ts is None else max(0, remote_ts - local_ts)
  114. derived_rows.append(
  115. {
  116. "symbol": symbol,
  117. "bar": bar,
  118. "derived_from": BASE_BAR,
  119. "local_last_complete_ts": local_ts,
  120. "local_last_complete_time": iso_text(local_ts),
  121. "remote_last_complete_ts": remote_ts,
  122. "remote_last_complete_time": iso_text(remote_ts),
  123. "lag_ms": lag_ms,
  124. "lag_bars": None if lag_ms is None else lag_ms // interval,
  125. "needs_update": bool(lag_ms and lag_ms > 0),
  126. }
  127. )
  128. direct_update_bars = sorted({str(row["bar"]) for row in direct_rows if row["needs_update"]})
  129. derived_update_bars = sorted({str(row["bar"]) for row in derived_rows if row["needs_update"]})
  130. return {
  131. "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
  132. "cache_dir": str(cache_dir),
  133. "direct_bars": direct_rows,
  134. "derived_bars": derived_rows,
  135. "conclusion": {
  136. "direct_bars_needing_update": direct_update_bars,
  137. "derived_bars_needing_update": derived_update_bars,
  138. "can_continue_1h_4h_closed_bar_exploration": not derived_update_bars,
  139. "update_before_3m_5m_exploration": any(bar in direct_update_bars for bar in ("3m", "5m")),
  140. "update_before_latest_15m_signal_work": "15m" in direct_update_bars,
  141. },
  142. }
  143. def main() -> int:
  144. parser = argparse.ArgumentParser(description="Diagnose local candle freshness for ETH exploration.")
  145. parser.add_argument("--cache-dir", type=Path, default=Path("data/okx-candles"))
  146. parser.add_argument("--output-file", type=Path, default=Path("reports/eth-exploration/data-freshness-diagnostic.json"))
  147. args = parser.parse_args()
  148. report = build_report(args.cache_dir, OkxClient())
  149. args.output_file.parent.mkdir(parents=True, exist_ok=True)
  150. args.output_file.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  151. print(json.dumps(report, indent=2, sort_keys=True))
  152. return 0
  153. if __name__ == "__main__":
  154. raise SystemExit(main())