from __future__ import annotations import argparse import json import sys import time from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient ROOT = Path(__file__).resolve().parents[1] DATA_DIR = ROOT / "data" / "okx-candles" STATE_DIR = ROOT / "var" / "bb-squeeze-t-gated-observer" STATE_FILE = "runtime-state.json" EVENTS_FILE = "observer-events.jsonl" ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" LIVE_CANDLE_LIMIT = 1_200 RECENT_CANDLE_LIMIT = 300 BAND_LENGTH = 96 BANDWIDTH_LOOKBACK = 960 BANDWIDTH_QUANTILE = 0.25 STOP_LOSS_PCT = 0.01 EXTREME_TAKE_PROFIT_PCT = 0.035 ETH_VOL_CAP = 0.006 COOLDOWN_BARS = 24 REENTRY_BARS = 96 BTC_TREND = 480 BTC_MOMENTUM = 96 MIN_FRAME_ROWS = BAND_LENGTH + BANDWIDTH_LOOKBACK @dataclass(frozen=True) class ObserverState: last_candle_ts: int | None active_side: str | None entry_price: float | None entry_candle_ts: int | None cooldown_until_ts: int | None reentry_side: str | None reentry_anchor_price: float | None reentry_until_ts: int | None EMPTY_STATE = ObserverState(None, None, None, None, None, None, None, None) def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def load_state(path: Path) -> ObserverState: if not path.exists(): return EMPTY_STATE payload = json.loads(path.read_text(encoding="utf-8")) return ObserverState( last_candle_ts=payload["last_candle_ts"], active_side=payload["active_side"], entry_price=payload["entry_price"], entry_candle_ts=payload["entry_candle_ts"], cooldown_until_ts=payload["cooldown_until_ts"], reentry_side=payload["reentry_side"], reentry_anchor_price=payload["reentry_anchor_price"], reentry_until_ts=payload["reentry_until_ts"], ) def save_state(path: Path, state: ObserverState) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8") def append_jsonl(path: Path, payload: dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n") def frame_from_candles(candles: list[Candle]) -> pd.DataFrame: frame = pd.DataFrame([asdict(candle) for candle in candles]) frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True) def load_pair_frame(client: OkxClient) -> pd.DataFrame: eth_path = DATA_DIR / ETH_SYMBOL / f"{BAR}.csv" btc_path = DATA_DIR / BTC_SYMBOL / f"{BAR}.csv" if eth_path.exists() and btc_path.exists(): eth = pd.read_csv(eth_path).tail(LIVE_CANDLE_LIMIT) btc = pd.read_csv(btc_path).tail(LIVE_CANDLE_LIMIT) eth["time"] = pd.to_datetime(eth["ts"], unit="ms", utc=True) btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"}) return eth.merge(btc, on="ts", how="inner").sort_values("ts").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True) eth = frame_from_candles(client.get_candles(ETH_SYMBOL, BAR, LIVE_CANDLE_LIMIT)) btc = frame_from_candles(client.get_candles(BTC_SYMBOL, BAR, LIVE_CANDLE_LIMIT)) btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"}) return eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True) def refresh_pair_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame: if frame is None or len(frame) < MIN_FRAME_ROWS: return load_pair_frame(client) eth = frame_from_candles(client.get_recent_candles(ETH_SYMBOL, BAR, RECENT_CANDLE_LIMIT)) btc = frame_from_candles(client.get_recent_candles(BTC_SYMBOL, BAR, RECENT_CANDLE_LIMIT)) btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"}) recent = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True) merged = pd.concat([frame, recent], ignore_index=True) return merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True) def signal_from_frame(frame: pd.DataFrame, state: ObserverState) -> tuple[ObserverState, dict[str, object]]: if len(frame) < MIN_FRAME_ROWS: raise ValueError("not enough candles") eth_close = frame["close"].astype(float) btc_close = frame["btc_close"].astype(float) middle = eth_close.rolling(BAND_LENGTH).mean() stdev = eth_close.rolling(BAND_LENGTH).std(ddof=0) upper = middle + 2.0 * stdev lower = middle - 2.0 * stdev bandwidth = (upper - lower) / middle threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE) eth_vol = eth_close.pct_change().rolling(96).std(ddof=0) btc_sma = btc_close.rolling(BTC_TREND).mean() btc_momentum = btc_close / btc_close.shift(BTC_MOMENTUM) - 1.0 index = len(frame) - 1 row = frame.iloc[index] candle_ts = int(row["ts"]) candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z") indicators = { "eth_close": float(row["close"]), "btc_close": float(row["btc_close"]), "middle": float(middle.iloc[index]), "upper": float(upper.iloc[index]), "lower": float(lower.iloc[index]), "bandwidth": float(bandwidth.iloc[index]), "bandwidth_threshold": float(threshold.iloc[index]), "eth_vol_96": float(eth_vol.iloc[index]), "btc_sma_480": float(btc_sma.iloc[index]), "btc_momentum_96": float(btc_momentum.iloc[index]), } if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts: return state, { "decision_candle_ts": candle_ts, "decision_candle_time": candle_time, "signal": "state_replay", "target_side": state.active_side or "flat", "indicators": indicators, } next_state = ObserverState( candle_ts, state.active_side, state.entry_price, state.entry_candle_ts, state.cooldown_until_ts, state.reentry_side, state.reentry_anchor_price, state.reentry_until_ts, ) signal = "hold" target_side = state.active_side or "flat" reentry_gate = False if state.active_side is not None: entry_price = float(state.entry_price) stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT) extreme_take = entry_price * (1.0 + EXTREME_TAKE_PROFIT_PCT if state.active_side == "long" else 1.0 - EXTREME_TAKE_PROFIT_PCT) stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop) extreme_take_hit = (state.active_side == "long" and float(row["high"]) >= extreme_take) or ( state.active_side == "short" and float(row["low"]) <= extreme_take ) middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or ( state.active_side == "short" and float(row["close"]) > indicators["middle"] ) if stop_hit or extreme_take_hit or middle_exit: signal = "exit_stop" if stop_hit else "exit_extreme_take" if extreme_take_hit else "exit_middle" target_side = "flat" if extreme_take_hit and not stop_hit: next_state = ObserverState( candle_ts, None, None, None, state.cooldown_until_ts, state.active_side, extreme_take, candle_ts + REENTRY_BARS * 900_000, ) else: next_state = ObserverState(candle_ts, None, None, None, candle_ts + COOLDOWN_BARS * 900_000, None, None, None) else: if state.reentry_side is not None: if state.reentry_until_ts is None or candle_ts > state.reentry_until_ts: next_state = ObserverState(candle_ts, None, None, None, state.cooldown_until_ts, None, None, None) else: reentry_gate = (state.reentry_side == "long" and indicators["btc_momentum_96"] < 0.0) or ( state.reentry_side == "short" and indicators["btc_momentum_96"] > 0.0 ) if reentry_gate: signal = "reentry_" + state.reentry_side target_side = state.reentry_side next_state = ObserverState(candle_ts, state.reentry_side, float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None) else: cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"] vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP btc_up = indicators["btc_close"] > indicators["btc_sma_480"] if cooldown_ok and compressed and vol_ok and btc_up and float(row["close"]) > indicators["upper"]: signal = "entry_long" target_side = "long" next_state = ObserverState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None) elif cooldown_ok and compressed and vol_ok and btc_up and float(row["close"]) < indicators["lower"]: signal = "entry_short" target_side = "short" next_state = ObserverState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None) return next_state, { "decision_candle_ts": candle_ts, "decision_candle_time": candle_time, "signal": signal, "target_side": target_side, "reentry_gate": reentry_gate, "indicators": indicators, "params": { "band_length": BAND_LENGTH, "bandwidth_lookback": BANDWIDTH_LOOKBACK, "bandwidth_quantile": BANDWIDTH_QUANTILE, "stop_loss_pct": STOP_LOSS_PCT, "extreme_take_profit_pct": EXTREME_TAKE_PROFIT_PCT, "eth_vol_cap": ETH_VOL_CAP, "cooldown_bars": COOLDOWN_BARS, "reentry_bars": REENTRY_BARS, "entry_btc_filter": "btc-up", "reentry_gate_mode": "btc_against", }, } def write_heartbeat(state_dir: Path, payload: dict[str, object]) -> None: state_dir.mkdir(parents=True, exist_ok=True) (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") def run_once(state_dir: Path, frame: pd.DataFrame | None = None) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) state_path = state_dir / STATE_FILE previous_state = load_state(state_path) if frame is None: frame = load_pair_frame(OkxClient()) next_state, signal = signal_from_frame(frame, previous_state) save_state(state_path, next_state) payload = { "created_at": now_iso(), "mode": "bb_squeeze_t_gated_readonly_observer", "orders_submitted": 0, "strategy": "bb-squeeze-t-l96-bw960-q0.25-sl0.01-xtp0.035-both-btc-up-vc0.006-dd0.25-cd24-tre96-btc_against", "previous_state": asdict(previous_state), "next_state": asdict(next_state), "signal": signal, "candles": { "rows": len(frame), "first_time": pd.Timestamp(frame.iloc[0]["time"]).isoformat().replace("+00:00", "Z"), "last_time": pd.Timestamp(frame.iloc[-1]["time"]).isoformat().replace("+00:00", "Z"), }, "risk_limits": { "no_order_submission": True, "no_cancel_submission": True, "execution": "read_only_signal_stream", }, } write_heartbeat(state_dir, payload) append_jsonl(state_dir / EVENTS_FILE, payload) return payload def main() -> int: parser = argparse.ArgumentParser(description="Run BB squeeze T-gated read-only observer.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--interval-seconds", type=int, default=300) parser.add_argument("--once", action="store_true") args = parser.parse_args() frame: pd.DataFrame | None = None while True: try: frame = refresh_pair_frame(OkxClient(), frame) payload = run_once(args.state_dir, frame) print(json.dumps(payload, indent=2, sort_keys=True)) except Exception as exc: error = {"created_at": now_iso(), "mode": "bb_squeeze_t_gated_readonly_observer", "orders_submitted": 0, "error": str(exc)} write_heartbeat(args.state_dir, error) append_jsonl(args.state_dir / EVENTS_FILE, error) print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr) if args.once: return 1 if args.once: return 0 time.sleep(args.interval_seconds) if __name__ == "__main__": raise SystemExit(main())