from __future__ import annotations import argparse import csv import json import sys from datetime import UTC, datetime from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from okx_codex_trader.okx_client import OkxClient SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") DIRECT_BARS = ("15m", "5m", "3m") DERIVED_BARS = ("1H", "4H") BASE_BAR = "15m" BAR_MS = { "3m": 180_000, "5m": 300_000, "15m": 900_000, "1H": 3_600_000, "4H": 14_400_000, } def iso_text(ts: int | None) -> str | None: if ts is None: return None return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z") def scan_csv(path: Path, interval_ms: int) -> dict[str, object]: rows = 0 first_ts: int | None = None last_ts: int | None = None duplicate_timestamps = 0 non_increasing_steps = 0 unexpected_intervals = 0 seen: set[int] = set() if not path.exists(): return { "rows": 0, "first_ts": None, "first_time": None, "last_ts": None, "last_time": None, "duplicate_timestamps": 0, "non_increasing_steps": 0, "unexpected_intervals": 0, } with path.open("r", encoding="utf-8", newline="") as handle: for row in csv.DictReader(handle): ts = int(row["ts"]) rows += 1 if first_ts is None: first_ts = ts if ts in seen: duplicate_timestamps += 1 seen.add(ts) if last_ts is not None: diff = ts - last_ts if diff <= 0: non_increasing_steps += 1 if diff != interval_ms: unexpected_intervals += 1 last_ts = ts return { "rows": rows, "first_ts": first_ts, "first_time": iso_text(first_ts), "last_ts": last_ts, "last_time": iso_text(last_ts), "duplicate_timestamps": duplicate_timestamps, "non_increasing_steps": non_increasing_steps, "unexpected_intervals": unexpected_intervals, } def remote_last_ts(client: OkxClient, symbol: str, bar: str) -> int | None: candles = client.get_recent_candles(symbol, bar, 2) return candles[-1].ts if candles else None def last_complete_derived_ts(base_last_ts: int | None, derived_interval_ms: int) -> int | None: if base_last_ts is None: return None label = (base_last_ts // derived_interval_ms) * derived_interval_ms required_last_base_ts = label + derived_interval_ms - BAR_MS[BASE_BAR] if base_last_ts >= required_last_base_ts: return label previous = label - derived_interval_ms return previous if previous >= 0 else None def build_report(cache_dir: Path, client: OkxClient) -> dict[str, object]: direct_rows = [] base_by_symbol: dict[str, dict[str, object]] = {} for symbol in SYMBOLS: for bar in DIRECT_BARS: local = scan_csv(cache_dir / symbol / f"{bar}.csv", BAR_MS[bar]) remote_ts = remote_last_ts(client, symbol, bar) local_ts = local["last_ts"] lag_ms = None if local_ts is None or remote_ts is None else max(0, remote_ts - int(local_ts)) row = { "symbol": symbol, "bar": bar, **local, "remote_last_ts": remote_ts, "remote_last_time": iso_text(remote_ts), "lag_ms": lag_ms, "lag_bars": None if lag_ms is None else lag_ms // BAR_MS[bar], "needs_update": bool(lag_ms and lag_ms > 0), } direct_rows.append(row) if bar == BASE_BAR: base_by_symbol[symbol] = row derived_rows = [] for symbol in SYMBOLS: base = base_by_symbol[symbol] for bar in DERIVED_BARS: interval = BAR_MS[bar] local_ts = last_complete_derived_ts(base["last_ts"], interval) # type: ignore[arg-type] remote_ts = last_complete_derived_ts(base["remote_last_ts"], interval) # type: ignore[arg-type] lag_ms = None if local_ts is None or remote_ts is None else max(0, remote_ts - local_ts) derived_rows.append( { "symbol": symbol, "bar": bar, "derived_from": BASE_BAR, "local_last_complete_ts": local_ts, "local_last_complete_time": iso_text(local_ts), "remote_last_complete_ts": remote_ts, "remote_last_complete_time": iso_text(remote_ts), "lag_ms": lag_ms, "lag_bars": None if lag_ms is None else lag_ms // interval, "needs_update": bool(lag_ms and lag_ms > 0), } ) direct_update_bars = sorted({str(row["bar"]) for row in direct_rows if row["needs_update"]}) derived_update_bars = sorted({str(row["bar"]) for row in derived_rows if row["needs_update"]}) return { "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"), "cache_dir": str(cache_dir), "direct_bars": direct_rows, "derived_bars": derived_rows, "conclusion": { "direct_bars_needing_update": direct_update_bars, "derived_bars_needing_update": derived_update_bars, "can_continue_1h_4h_closed_bar_exploration": not derived_update_bars, "update_before_3m_5m_exploration": any(bar in direct_update_bars for bar in ("3m", "5m")), "update_before_latest_15m_signal_work": "15m" in direct_update_bars, }, } def main() -> int: parser = argparse.ArgumentParser(description="Diagnose local candle freshness for ETH exploration.") parser.add_argument("--cache-dir", type=Path, default=Path("data/okx-candles")) parser.add_argument("--output-file", type=Path, default=Path("reports/eth-exploration/data-freshness-diagnostic.json")) args = parser.parse_args() report = build_report(args.cache_dir, OkxClient()) args.output_file.parent.mkdir(parents=True, exist_ok=True) args.output_file.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") print(json.dumps(report, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())