| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- from __future__ import annotations
- import argparse
- import csv
- import json
- import sys
- from datetime import UTC, datetime
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from okx_codex_trader.okx_client import OkxClient
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- DIRECT_BARS = ("15m", "5m", "3m")
- DERIVED_BARS = ("1H", "4H")
- BASE_BAR = "15m"
- BAR_MS = {
- "3m": 180_000,
- "5m": 300_000,
- "15m": 900_000,
- "1H": 3_600_000,
- "4H": 14_400_000,
- }
- def iso_text(ts: int | None) -> str | None:
- if ts is None:
- return None
- return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
- def scan_csv(path: Path, interval_ms: int) -> dict[str, object]:
- rows = 0
- first_ts: int | None = None
- last_ts: int | None = None
- duplicate_timestamps = 0
- non_increasing_steps = 0
- unexpected_intervals = 0
- seen: set[int] = set()
- if not path.exists():
- return {
- "rows": 0,
- "first_ts": None,
- "first_time": None,
- "last_ts": None,
- "last_time": None,
- "duplicate_timestamps": 0,
- "non_increasing_steps": 0,
- "unexpected_intervals": 0,
- }
- with path.open("r", encoding="utf-8", newline="") as handle:
- for row in csv.DictReader(handle):
- ts = int(row["ts"])
- rows += 1
- if first_ts is None:
- first_ts = ts
- if ts in seen:
- duplicate_timestamps += 1
- seen.add(ts)
- if last_ts is not None:
- diff = ts - last_ts
- if diff <= 0:
- non_increasing_steps += 1
- if diff != interval_ms:
- unexpected_intervals += 1
- last_ts = ts
- return {
- "rows": rows,
- "first_ts": first_ts,
- "first_time": iso_text(first_ts),
- "last_ts": last_ts,
- "last_time": iso_text(last_ts),
- "duplicate_timestamps": duplicate_timestamps,
- "non_increasing_steps": non_increasing_steps,
- "unexpected_intervals": unexpected_intervals,
- }
- def remote_last_ts(client: OkxClient, symbol: str, bar: str) -> int | None:
- candles = client.get_recent_candles(symbol, bar, 2)
- return candles[-1].ts if candles else None
- def last_complete_derived_ts(base_last_ts: int | None, derived_interval_ms: int) -> int | None:
- if base_last_ts is None:
- return None
- label = (base_last_ts // derived_interval_ms) * derived_interval_ms
- required_last_base_ts = label + derived_interval_ms - BAR_MS[BASE_BAR]
- if base_last_ts >= required_last_base_ts:
- return label
- previous = label - derived_interval_ms
- return previous if previous >= 0 else None
- def build_report(cache_dir: Path, client: OkxClient) -> dict[str, object]:
- direct_rows = []
- base_by_symbol: dict[str, dict[str, object]] = {}
- for symbol in SYMBOLS:
- for bar in DIRECT_BARS:
- local = scan_csv(cache_dir / symbol / f"{bar}.csv", BAR_MS[bar])
- remote_ts = remote_last_ts(client, symbol, bar)
- local_ts = local["last_ts"]
- lag_ms = None if local_ts is None or remote_ts is None else max(0, remote_ts - int(local_ts))
- row = {
- "symbol": symbol,
- "bar": bar,
- **local,
- "remote_last_ts": remote_ts,
- "remote_last_time": iso_text(remote_ts),
- "lag_ms": lag_ms,
- "lag_bars": None if lag_ms is None else lag_ms // BAR_MS[bar],
- "needs_update": bool(lag_ms and lag_ms > 0),
- }
- direct_rows.append(row)
- if bar == BASE_BAR:
- base_by_symbol[symbol] = row
- derived_rows = []
- for symbol in SYMBOLS:
- base = base_by_symbol[symbol]
- for bar in DERIVED_BARS:
- interval = BAR_MS[bar]
- local_ts = last_complete_derived_ts(base["last_ts"], interval) # type: ignore[arg-type]
- remote_ts = last_complete_derived_ts(base["remote_last_ts"], interval) # type: ignore[arg-type]
- lag_ms = None if local_ts is None or remote_ts is None else max(0, remote_ts - local_ts)
- derived_rows.append(
- {
- "symbol": symbol,
- "bar": bar,
- "derived_from": BASE_BAR,
- "local_last_complete_ts": local_ts,
- "local_last_complete_time": iso_text(local_ts),
- "remote_last_complete_ts": remote_ts,
- "remote_last_complete_time": iso_text(remote_ts),
- "lag_ms": lag_ms,
- "lag_bars": None if lag_ms is None else lag_ms // interval,
- "needs_update": bool(lag_ms and lag_ms > 0),
- }
- )
- direct_update_bars = sorted({str(row["bar"]) for row in direct_rows if row["needs_update"]})
- derived_update_bars = sorted({str(row["bar"]) for row in derived_rows if row["needs_update"]})
- return {
- "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
- "cache_dir": str(cache_dir),
- "direct_bars": direct_rows,
- "derived_bars": derived_rows,
- "conclusion": {
- "direct_bars_needing_update": direct_update_bars,
- "derived_bars_needing_update": derived_update_bars,
- "can_continue_1h_4h_closed_bar_exploration": not derived_update_bars,
- "update_before_3m_5m_exploration": any(bar in direct_update_bars for bar in ("3m", "5m")),
- "update_before_latest_15m_signal_work": "15m" in direct_update_bars,
- },
- }
- def main() -> int:
- parser = argparse.ArgumentParser(description="Diagnose local candle freshness for ETH exploration.")
- parser.add_argument("--cache-dir", type=Path, default=Path("data/okx-candles"))
- parser.add_argument("--output-file", type=Path, default=Path("reports/eth-exploration/data-freshness-diagnostic.json"))
- args = parser.parse_args()
- report = build_report(args.cache_dir, OkxClient())
- args.output_file.parent.mkdir(parents=True, exist_ok=True)
- args.output_file.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- print(json.dumps(report, indent=2, sort_keys=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|