| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- import time
- from dataclasses import asdict, dataclass
- from datetime import UTC, datetime
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- ROOT = Path(__file__).resolve().parents[1]
- DATA_DIR = ROOT / "data" / "okx-candles"
- STATE_DIR = ROOT / "var" / "bb-squeeze-t-gated-observer"
- STATE_FILE = "runtime-state.json"
- EVENTS_FILE = "observer-events.jsonl"
- ETH_SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- BAR = "15m"
- LIVE_CANDLE_LIMIT = 1_200
- RECENT_CANDLE_LIMIT = 300
- BAND_LENGTH = 96
- BANDWIDTH_LOOKBACK = 960
- BANDWIDTH_QUANTILE = 0.25
- STOP_LOSS_PCT = 0.01
- EXTREME_TAKE_PROFIT_PCT = 0.035
- ETH_VOL_CAP = 0.006
- COOLDOWN_BARS = 24
- REENTRY_BARS = 96
- BTC_TREND = 480
- BTC_MOMENTUM = 96
- MIN_FRAME_ROWS = BAND_LENGTH + BANDWIDTH_LOOKBACK
- @dataclass(frozen=True)
- class ObserverState:
- last_candle_ts: int | None
- active_side: str | None
- entry_price: float | None
- entry_candle_ts: int | None
- cooldown_until_ts: int | None
- reentry_side: str | None
- reentry_anchor_price: float | None
- reentry_until_ts: int | None
- EMPTY_STATE = ObserverState(None, None, None, None, None, None, None, None)
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def load_state(path: Path) -> ObserverState:
- if not path.exists():
- return EMPTY_STATE
- payload = json.loads(path.read_text(encoding="utf-8"))
- return ObserverState(
- last_candle_ts=payload["last_candle_ts"],
- active_side=payload["active_side"],
- entry_price=payload["entry_price"],
- entry_candle_ts=payload["entry_candle_ts"],
- cooldown_until_ts=payload["cooldown_until_ts"],
- reentry_side=payload["reentry_side"],
- reentry_anchor_price=payload["reentry_anchor_price"],
- reentry_until_ts=payload["reentry_until_ts"],
- )
- def save_state(path: Path, state: ObserverState) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
- def append_jsonl(path: Path, payload: dict[str, object]) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("a", encoding="utf-8") as handle:
- handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
- def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
- frame = pd.DataFrame([asdict(candle) for candle in candles])
- frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
- def load_pair_frame(client: OkxClient) -> pd.DataFrame:
- eth_path = DATA_DIR / ETH_SYMBOL / f"{BAR}.csv"
- btc_path = DATA_DIR / BTC_SYMBOL / f"{BAR}.csv"
- if eth_path.exists() and btc_path.exists():
- eth = pd.read_csv(eth_path).tail(LIVE_CANDLE_LIMIT)
- btc = pd.read_csv(btc_path).tail(LIVE_CANDLE_LIMIT)
- eth["time"] = pd.to_datetime(eth["ts"], unit="ms", utc=True)
- btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
- return eth.merge(btc, on="ts", how="inner").sort_values("ts").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True)
- eth = frame_from_candles(client.get_candles(ETH_SYMBOL, BAR, LIVE_CANDLE_LIMIT))
- btc = frame_from_candles(client.get_candles(BTC_SYMBOL, BAR, LIVE_CANDLE_LIMIT))
- btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
- return eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
- def refresh_pair_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
- if frame is None or len(frame) < MIN_FRAME_ROWS:
- return load_pair_frame(client)
- eth = frame_from_candles(client.get_recent_candles(ETH_SYMBOL, BAR, RECENT_CANDLE_LIMIT))
- btc = frame_from_candles(client.get_recent_candles(BTC_SYMBOL, BAR, RECENT_CANDLE_LIMIT))
- btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
- recent = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
- merged = pd.concat([frame, recent], ignore_index=True)
- return merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True)
- def signal_from_frame(frame: pd.DataFrame, state: ObserverState) -> tuple[ObserverState, dict[str, object]]:
- if len(frame) < MIN_FRAME_ROWS:
- raise ValueError("not enough candles")
- eth_close = frame["close"].astype(float)
- btc_close = frame["btc_close"].astype(float)
- middle = eth_close.rolling(BAND_LENGTH).mean()
- stdev = eth_close.rolling(BAND_LENGTH).std(ddof=0)
- upper = middle + 2.0 * stdev
- lower = middle - 2.0 * stdev
- bandwidth = (upper - lower) / middle
- threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE)
- eth_vol = eth_close.pct_change().rolling(96).std(ddof=0)
- btc_sma = btc_close.rolling(BTC_TREND).mean()
- btc_momentum = btc_close / btc_close.shift(BTC_MOMENTUM) - 1.0
- index = len(frame) - 1
- row = frame.iloc[index]
- candle_ts = int(row["ts"])
- candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z")
- indicators = {
- "eth_close": float(row["close"]),
- "btc_close": float(row["btc_close"]),
- "middle": float(middle.iloc[index]),
- "upper": float(upper.iloc[index]),
- "lower": float(lower.iloc[index]),
- "bandwidth": float(bandwidth.iloc[index]),
- "bandwidth_threshold": float(threshold.iloc[index]),
- "eth_vol_96": float(eth_vol.iloc[index]),
- "btc_sma_480": float(btc_sma.iloc[index]),
- "btc_momentum_96": float(btc_momentum.iloc[index]),
- }
- if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
- return state, {
- "decision_candle_ts": candle_ts,
- "decision_candle_time": candle_time,
- "signal": "state_replay",
- "target_side": state.active_side or "flat",
- "indicators": indicators,
- }
- next_state = ObserverState(
- candle_ts,
- state.active_side,
- state.entry_price,
- state.entry_candle_ts,
- state.cooldown_until_ts,
- state.reentry_side,
- state.reentry_anchor_price,
- state.reentry_until_ts,
- )
- signal = "hold"
- target_side = state.active_side or "flat"
- reentry_gate = False
- if state.active_side is not None:
- entry_price = float(state.entry_price)
- stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT)
- extreme_take = entry_price * (1.0 + EXTREME_TAKE_PROFIT_PCT if state.active_side == "long" else 1.0 - EXTREME_TAKE_PROFIT_PCT)
- stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop)
- extreme_take_hit = (state.active_side == "long" and float(row["high"]) >= extreme_take) or (
- state.active_side == "short" and float(row["low"]) <= extreme_take
- )
- middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or (
- state.active_side == "short" and float(row["close"]) > indicators["middle"]
- )
- if stop_hit or extreme_take_hit or middle_exit:
- signal = "exit_stop" if stop_hit else "exit_extreme_take" if extreme_take_hit else "exit_middle"
- target_side = "flat"
- if extreme_take_hit and not stop_hit:
- next_state = ObserverState(
- candle_ts,
- None,
- None,
- None,
- state.cooldown_until_ts,
- state.active_side,
- extreme_take,
- candle_ts + REENTRY_BARS * 900_000,
- )
- else:
- next_state = ObserverState(candle_ts, None, None, None, candle_ts + COOLDOWN_BARS * 900_000, None, None, None)
- else:
- if state.reentry_side is not None:
- if state.reentry_until_ts is None or candle_ts > state.reentry_until_ts:
- next_state = ObserverState(candle_ts, None, None, None, state.cooldown_until_ts, None, None, None)
- else:
- reentry_gate = (state.reentry_side == "long" and indicators["btc_momentum_96"] < 0.0) or (
- state.reentry_side == "short" and indicators["btc_momentum_96"] > 0.0
- )
- if reentry_gate:
- signal = "reentry_" + state.reentry_side
- target_side = state.reentry_side
- next_state = ObserverState(candle_ts, state.reentry_side, float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None)
- else:
- cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts
- compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"]
- vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP
- btc_up = indicators["btc_close"] > indicators["btc_sma_480"]
- if cooldown_ok and compressed and vol_ok and btc_up and float(row["close"]) > indicators["upper"]:
- signal = "entry_long"
- target_side = "long"
- next_state = ObserverState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None)
- elif cooldown_ok and compressed and vol_ok and btc_up and float(row["close"]) < indicators["lower"]:
- signal = "entry_short"
- target_side = "short"
- next_state = ObserverState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None)
- return next_state, {
- "decision_candle_ts": candle_ts,
- "decision_candle_time": candle_time,
- "signal": signal,
- "target_side": target_side,
- "reentry_gate": reentry_gate,
- "indicators": indicators,
- "params": {
- "band_length": BAND_LENGTH,
- "bandwidth_lookback": BANDWIDTH_LOOKBACK,
- "bandwidth_quantile": BANDWIDTH_QUANTILE,
- "stop_loss_pct": STOP_LOSS_PCT,
- "extreme_take_profit_pct": EXTREME_TAKE_PROFIT_PCT,
- "eth_vol_cap": ETH_VOL_CAP,
- "cooldown_bars": COOLDOWN_BARS,
- "reentry_bars": REENTRY_BARS,
- "entry_btc_filter": "btc-up",
- "reentry_gate_mode": "btc_against",
- },
- }
- def write_heartbeat(state_dir: Path, payload: dict[str, object]) -> None:
- state_dir.mkdir(parents=True, exist_ok=True)
- (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- def run_once(state_dir: Path, frame: pd.DataFrame | None = None) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- state_path = state_dir / STATE_FILE
- previous_state = load_state(state_path)
- if frame is None:
- frame = load_pair_frame(OkxClient())
- next_state, signal = signal_from_frame(frame, previous_state)
- save_state(state_path, next_state)
- payload = {
- "created_at": now_iso(),
- "mode": "bb_squeeze_t_gated_readonly_observer",
- "orders_submitted": 0,
- "strategy": "bb-squeeze-t-l96-bw960-q0.25-sl0.01-xtp0.035-both-btc-up-vc0.006-dd0.25-cd24-tre96-btc_against",
- "previous_state": asdict(previous_state),
- "next_state": asdict(next_state),
- "signal": signal,
- "candles": {
- "rows": len(frame),
- "first_time": pd.Timestamp(frame.iloc[0]["time"]).isoformat().replace("+00:00", "Z"),
- "last_time": pd.Timestamp(frame.iloc[-1]["time"]).isoformat().replace("+00:00", "Z"),
- },
- "risk_limits": {
- "no_order_submission": True,
- "no_cancel_submission": True,
- "execution": "read_only_signal_stream",
- },
- }
- write_heartbeat(state_dir, payload)
- append_jsonl(state_dir / EVENTS_FILE, payload)
- return payload
- def main() -> int:
- parser = argparse.ArgumentParser(description="Run BB squeeze T-gated read-only observer.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--interval-seconds", type=int, default=300)
- parser.add_argument("--once", action="store_true")
- args = parser.parse_args()
- frame: pd.DataFrame | None = None
- while True:
- try:
- frame = refresh_pair_frame(OkxClient(), frame)
- payload = run_once(args.state_dir, frame)
- print(json.dumps(payload, indent=2, sort_keys=True))
- except Exception as exc:
- error = {"created_at": now_iso(), "mode": "bb_squeeze_t_gated_readonly_observer", "orders_submitted": 0, "error": str(exc)}
- write_heartbeat(args.state_dir, error)
- append_jsonl(args.state_dir / EVENTS_FILE, error)
- print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
- if args.once:
- return 1
- if args.once:
- return 0
- time.sleep(args.interval_seconds)
- if __name__ == "__main__":
- raise SystemExit(main())
|