| 1234567891011121314151617181920212223242526272829303132333435 |
- from __future__ import annotations
- from pathlib import Path
- import pandas as pd
- from okx_codex_trader.models import Candle
- def load_candles_csv(data_dir: Path, symbol: str, bar: str) -> list[Candle]:
- frame = pd.read_csv(data_dir / symbol / f"{bar}.csv")
- return [
- Candle(
- symbol=symbol,
- ts=int(row.ts),
- open=float(row.open),
- high=float(row.high),
- low=float(row.low),
- close=float(row.close),
- volume=float(row.volume),
- )
- for row in frame.itertuples(index=False)
- ]
- def align_candles_by_ts(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]:
- right_by_ts = {candle.ts: candle for candle in right}
- left_out: list[Candle] = []
- right_out: list[Candle] = []
- for candle in left:
- other = right_by_ts.get(candle.ts)
- if other is not None:
- left_out.append(candle)
- right_out.append(other)
- return left_out, right_out
|