from __future__ import annotations import argparse import json import sys import time from dataclasses import asdict from datetime import UTC, datetime from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient ROOT = Path(__file__).resolve().parents[1] DATA_DIR = ROOT / "data" / "okx-candles" STATE_DIR = ROOT / "var" / "crash-follow-short-observer" EVENTS_FILE = "observer-events.jsonl" ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" SOURCE_BAR = "15m" LIVE_CANDLE_LIMIT = 1_200 RECENT_CANDLE_LIMIT = 300 FAST = 20 SLOW = 120 LOOKBACK = 8 THRESHOLD = 0.035 STOP_LOSS_PCT = 0.02 TAKE_PROFIT_PCT = 0.06 HOLD_BARS = 96 MIN_FRAME_ROWS = max(SLOW, 180, LOOKBACK * 2, 160, 120 + 24) + 2 def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def append_jsonl(path: Path, payload: dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n") def frame_from_candles(candles: list[Candle]) -> pd.DataFrame: frame = pd.DataFrame([asdict(candle) for candle in candles]) frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True) def load_pair_frame(client: OkxClient) -> pd.DataFrame: eth_path = DATA_DIR / ETH_SYMBOL / f"{SOURCE_BAR}.csv" btc_path = DATA_DIR / BTC_SYMBOL / f"{SOURCE_BAR}.csv" if eth_path.exists() and btc_path.exists(): eth = pd.read_csv(eth_path).tail(LIVE_CANDLE_LIMIT) btc = pd.read_csv(btc_path).tail(LIVE_CANDLE_LIMIT) eth["time"] = pd.to_datetime(eth["ts"], unit="ms", utc=True) btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"}) merged = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True) return resample_hourly(merged) eth = frame_from_candles(client.get_candles(ETH_SYMBOL, SOURCE_BAR, LIVE_CANDLE_LIMIT)) btc = frame_from_candles(client.get_candles(BTC_SYMBOL, SOURCE_BAR, LIVE_CANDLE_LIMIT)) btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"}) merged = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True) return resample_hourly(merged) def resample_hourly(frame: pd.DataFrame) -> pd.DataFrame: hourly = ( frame.set_index("time") .resample("1h", label="left", closed="left") .agg( ts=("ts", "first"), open=("open", "first"), high=("high", "max"), low=("low", "min"), close=("close", "last"), volume=("volume", "sum"), btc_close=("btc_close", "last"), ) .dropna() .reset_index() ) hourly["ts"] = hourly["ts"].astype(int) return hourly def refresh_pair_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame: if frame is None or len(frame) < MIN_FRAME_ROWS: return load_pair_frame(client) eth = frame_from_candles(client.get_recent_candles(ETH_SYMBOL, SOURCE_BAR, RECENT_CANDLE_LIMIT)) btc = frame_from_candles(client.get_recent_candles(BTC_SYMBOL, SOURCE_BAR, RECENT_CANDLE_LIMIT)) btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"}) recent = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True) recent = resample_hourly(recent) merged = pd.concat([frame, recent], ignore_index=True) return merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True) def rsi(close: pd.Series, length: int) -> pd.Series: diff = close.diff() gain = diff.clip(lower=0).ewm(alpha=1 / length, adjust=False).mean() loss = (-diff.clip(upper=0)).ewm(alpha=1 / length, adjust=False).mean() return 100 - 100 / (1 + gain / loss) def latest_signal(frame: pd.DataFrame) -> dict[str, object]: if len(frame) < MIN_FRAME_ROWS: raise ValueError("not enough candles") close = frame["close"].astype(float) high = frame["high"].astype(float) low = frame["low"].astype(float) btc_close = frame["btc_close"].astype(float) fast = close.ewm(span=FAST, adjust=False).mean() slow = close.ewm(span=SLOW, adjust=False).mean() ret = close / close.shift(LOOKBACK) - 1.0 range_pct = (high - low) / close range_rank = range_pct.rolling(160).rank(pct=True) rsi14 = rsi(close, 14) btc_slow = btc_close.rolling(120).mean() btc_drop = btc_close / btc_close.shift(24) - 1.0 gate = (btc_close < btc_slow) & (btc_drop < -0.015) entry = gate & (close < slow) & (ret < -THRESHOLD) & (range_rank > 0.75) exit_ = (close > fast) | (rsi14 > 52) index = len(frame) - 1 row = frame.iloc[index] active = bool(entry.iloc[index]) return { "decision_candle_ts": int(row["ts"]), "decision_candle_time": pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z"), "signal": "entry_short" if active else "hold", "target_side": "short" if active else "flat", "entry_signal": active, "exit_signal": bool(exit_.iloc[index]), "indicators": { "eth_close": float(close.iloc[index]), "eth_fast_ema_20": float(fast.iloc[index]), "eth_slow_ema_120": float(slow.iloc[index]), "eth_return_8h": float(ret.iloc[index]), "eth_range_rank_160": float(range_rank.iloc[index]), "eth_rsi_14": float(rsi14.iloc[index]), "btc_close": float(btc_close.iloc[index]), "btc_sma_120": float(btc_slow.iloc[index]), "btc_return_24h": float(btc_drop.iloc[index]), "btc_riskoff_gate": bool(gate.iloc[index]), }, "params": { "bar": "1H", "fast": FAST, "slow": SLOW, "lookback": LOOKBACK, "threshold": THRESHOLD, "stop_loss_pct": STOP_LOSS_PCT, "take_profit_pct": TAKE_PROFIT_PCT, "hold_bars": HOLD_BARS, "gate": "btc_riskoff", }, } def write_heartbeat(state_dir: Path, payload: dict[str, object]) -> None: state_dir.mkdir(parents=True, exist_ok=True) (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") def run_once(state_dir: Path, frame: pd.DataFrame | None = None) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) if frame is None: frame = load_pair_frame(OkxClient()) signal = latest_signal(frame) payload = { "created_at": now_iso(), "mode": "crash_follow_short_readonly_observer", "orders_submitted": 0, "strategy": "crash_follow-1H-f20-s120-lb8-th0.035-sl0.02-tp0.06-h96-btc_riskoff", "signal": signal, "candles": { "rows": len(frame), "first_time": pd.Timestamp(frame.iloc[0]["time"]).isoformat().replace("+00:00", "Z"), "last_time": pd.Timestamp(frame.iloc[-1]["time"]).isoformat().replace("+00:00", "Z"), }, "backtest_summary": { "available_full_return": 1.0114, "available_full_max_drawdown": 0.4490, "available_full_trades": 618, "return_3y": 0.4300, "return_1y": 0.4003, "return_6m": 0.2002, "return_3m": 0.0613, }, "risk_limits": { "no_order_submission": True, "no_cancel_submission": True, "execution": "read_only_signal_stream", }, } write_heartbeat(state_dir, payload) append_jsonl(state_dir / EVENTS_FILE, payload) return payload def main() -> int: parser = argparse.ArgumentParser(description="Run crash-follow short read-only observer.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--interval-seconds", type=int, default=300) parser.add_argument("--once", action="store_true") args = parser.parse_args() frame: pd.DataFrame | None = None while True: try: frame = refresh_pair_frame(OkxClient(), frame) payload = run_once(args.state_dir, frame) print(json.dumps(payload, indent=2, sort_keys=True)) except Exception as exc: error = {"created_at": now_iso(), "mode": "crash_follow_short_readonly_observer", "orders_submitted": 0, "error": str(exc)} write_heartbeat(args.state_dir, error) append_jsonl(args.state_dir / EVENTS_FILE, error) print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr) if args.once: return 1 if args.once: return 0 time.sleep(args.interval_seconds) if __name__ == "__main__": raise SystemExit(main())