run_bb_squeeze_t_gated_observer.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import sys
  5. import time
  6. from dataclasses import asdict, dataclass
  7. from datetime import UTC, datetime
  8. from pathlib import Path
  9. import pandas as pd
  10. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  11. from okx_codex_trader.models import Candle
  12. from okx_codex_trader.okx_client import OkxClient
  13. ROOT = Path(__file__).resolve().parents[1]
  14. DATA_DIR = ROOT / "data" / "okx-candles"
  15. STATE_DIR = ROOT / "var" / "bb-squeeze-t-gated-observer"
  16. STATE_FILE = "runtime-state.json"
  17. EVENTS_FILE = "observer-events.jsonl"
  18. ETH_SYMBOL = "ETH-USDT-SWAP"
  19. BTC_SYMBOL = "BTC-USDT-SWAP"
  20. BAR = "15m"
  21. LIVE_CANDLE_LIMIT = 1_200
  22. RECENT_CANDLE_LIMIT = 300
  23. BAND_LENGTH = 96
  24. BANDWIDTH_LOOKBACK = 960
  25. BANDWIDTH_QUANTILE = 0.25
  26. STOP_LOSS_PCT = 0.01
  27. EXTREME_TAKE_PROFIT_PCT = 0.035
  28. ETH_VOL_CAP = 0.006
  29. COOLDOWN_BARS = 24
  30. REENTRY_BARS = 96
  31. BTC_TREND = 480
  32. BTC_MOMENTUM = 96
  33. MIN_FRAME_ROWS = BAND_LENGTH + BANDWIDTH_LOOKBACK
  34. @dataclass(frozen=True)
  35. class ObserverState:
  36. last_candle_ts: int | None
  37. active_side: str | None
  38. entry_price: float | None
  39. entry_candle_ts: int | None
  40. cooldown_until_ts: int | None
  41. reentry_side: str | None
  42. reentry_anchor_price: float | None
  43. reentry_until_ts: int | None
  44. EMPTY_STATE = ObserverState(None, None, None, None, None, None, None, None)
  45. def now_iso() -> str:
  46. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  47. def load_state(path: Path) -> ObserverState:
  48. if not path.exists():
  49. return EMPTY_STATE
  50. payload = json.loads(path.read_text(encoding="utf-8"))
  51. return ObserverState(
  52. last_candle_ts=payload["last_candle_ts"],
  53. active_side=payload["active_side"],
  54. entry_price=payload["entry_price"],
  55. entry_candle_ts=payload["entry_candle_ts"],
  56. cooldown_until_ts=payload["cooldown_until_ts"],
  57. reentry_side=payload["reentry_side"],
  58. reentry_anchor_price=payload["reentry_anchor_price"],
  59. reentry_until_ts=payload["reentry_until_ts"],
  60. )
  61. def save_state(path: Path, state: ObserverState) -> None:
  62. path.parent.mkdir(parents=True, exist_ok=True)
  63. path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
  64. def append_jsonl(path: Path, payload: dict[str, object]) -> None:
  65. path.parent.mkdir(parents=True, exist_ok=True)
  66. with path.open("a", encoding="utf-8") as handle:
  67. handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
  68. def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
  69. frame = pd.DataFrame([asdict(candle) for candle in candles])
  70. frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
  71. return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
  72. def load_pair_frame(client: OkxClient) -> pd.DataFrame:
  73. eth_path = DATA_DIR / ETH_SYMBOL / f"{BAR}.csv"
  74. btc_path = DATA_DIR / BTC_SYMBOL / f"{BAR}.csv"
  75. if eth_path.exists() and btc_path.exists():
  76. eth = pd.read_csv(eth_path).tail(LIVE_CANDLE_LIMIT)
  77. btc = pd.read_csv(btc_path).tail(LIVE_CANDLE_LIMIT)
  78. eth["time"] = pd.to_datetime(eth["ts"], unit="ms", utc=True)
  79. btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
  80. return eth.merge(btc, on="ts", how="inner").sort_values("ts").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True)
  81. eth = frame_from_candles(client.get_candles(ETH_SYMBOL, BAR, LIVE_CANDLE_LIMIT))
  82. btc = frame_from_candles(client.get_candles(BTC_SYMBOL, BAR, LIVE_CANDLE_LIMIT))
  83. btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
  84. return eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
  85. def refresh_pair_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
  86. if frame is None or len(frame) < MIN_FRAME_ROWS:
  87. return load_pair_frame(client)
  88. eth = frame_from_candles(client.get_recent_candles(ETH_SYMBOL, BAR, RECENT_CANDLE_LIMIT))
  89. btc = frame_from_candles(client.get_recent_candles(BTC_SYMBOL, BAR, RECENT_CANDLE_LIMIT))
  90. btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
  91. recent = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
  92. merged = pd.concat([frame, recent], ignore_index=True)
  93. return merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True)
  94. def signal_from_frame(frame: pd.DataFrame, state: ObserverState) -> tuple[ObserverState, dict[str, object]]:
  95. if len(frame) < MIN_FRAME_ROWS:
  96. raise ValueError("not enough candles")
  97. eth_close = frame["close"].astype(float)
  98. btc_close = frame["btc_close"].astype(float)
  99. middle = eth_close.rolling(BAND_LENGTH).mean()
  100. stdev = eth_close.rolling(BAND_LENGTH).std(ddof=0)
  101. upper = middle + 2.0 * stdev
  102. lower = middle - 2.0 * stdev
  103. bandwidth = (upper - lower) / middle
  104. threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE)
  105. eth_vol = eth_close.pct_change().rolling(96).std(ddof=0)
  106. btc_sma = btc_close.rolling(BTC_TREND).mean()
  107. btc_momentum = btc_close / btc_close.shift(BTC_MOMENTUM) - 1.0
  108. index = len(frame) - 1
  109. row = frame.iloc[index]
  110. candle_ts = int(row["ts"])
  111. candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z")
  112. indicators = {
  113. "eth_close": float(row["close"]),
  114. "btc_close": float(row["btc_close"]),
  115. "middle": float(middle.iloc[index]),
  116. "upper": float(upper.iloc[index]),
  117. "lower": float(lower.iloc[index]),
  118. "bandwidth": float(bandwidth.iloc[index]),
  119. "bandwidth_threshold": float(threshold.iloc[index]),
  120. "eth_vol_96": float(eth_vol.iloc[index]),
  121. "btc_sma_480": float(btc_sma.iloc[index]),
  122. "btc_momentum_96": float(btc_momentum.iloc[index]),
  123. }
  124. if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
  125. return state, {
  126. "decision_candle_ts": candle_ts,
  127. "decision_candle_time": candle_time,
  128. "signal": "state_replay",
  129. "target_side": state.active_side or "flat",
  130. "indicators": indicators,
  131. }
  132. next_state = ObserverState(
  133. candle_ts,
  134. state.active_side,
  135. state.entry_price,
  136. state.entry_candle_ts,
  137. state.cooldown_until_ts,
  138. state.reentry_side,
  139. state.reentry_anchor_price,
  140. state.reentry_until_ts,
  141. )
  142. signal = "hold"
  143. target_side = state.active_side or "flat"
  144. reentry_gate = False
  145. if state.active_side is not None:
  146. entry_price = float(state.entry_price)
  147. stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT)
  148. extreme_take = entry_price * (1.0 + EXTREME_TAKE_PROFIT_PCT if state.active_side == "long" else 1.0 - EXTREME_TAKE_PROFIT_PCT)
  149. stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop)
  150. extreme_take_hit = (state.active_side == "long" and float(row["high"]) >= extreme_take) or (
  151. state.active_side == "short" and float(row["low"]) <= extreme_take
  152. )
  153. middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or (
  154. state.active_side == "short" and float(row["close"]) > indicators["middle"]
  155. )
  156. if stop_hit or extreme_take_hit or middle_exit:
  157. signal = "exit_stop" if stop_hit else "exit_extreme_take" if extreme_take_hit else "exit_middle"
  158. target_side = "flat"
  159. if extreme_take_hit and not stop_hit:
  160. next_state = ObserverState(
  161. candle_ts,
  162. None,
  163. None,
  164. None,
  165. state.cooldown_until_ts,
  166. state.active_side,
  167. extreme_take,
  168. candle_ts + REENTRY_BARS * 900_000,
  169. )
  170. else:
  171. next_state = ObserverState(candle_ts, None, None, None, candle_ts + COOLDOWN_BARS * 900_000, None, None, None)
  172. else:
  173. if state.reentry_side is not None:
  174. if state.reentry_until_ts is None or candle_ts > state.reentry_until_ts:
  175. next_state = ObserverState(candle_ts, None, None, None, state.cooldown_until_ts, None, None, None)
  176. else:
  177. reentry_gate = (state.reentry_side == "long" and indicators["btc_momentum_96"] < 0.0) or (
  178. state.reentry_side == "short" and indicators["btc_momentum_96"] > 0.0
  179. )
  180. if reentry_gate:
  181. signal = "reentry_" + state.reentry_side
  182. target_side = state.reentry_side
  183. next_state = ObserverState(candle_ts, state.reentry_side, float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None)
  184. else:
  185. cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts
  186. compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"]
  187. vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP
  188. btc_up = indicators["btc_close"] > indicators["btc_sma_480"]
  189. if cooldown_ok and compressed and vol_ok and btc_up and float(row["close"]) > indicators["upper"]:
  190. signal = "entry_long"
  191. target_side = "long"
  192. next_state = ObserverState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None)
  193. elif cooldown_ok and compressed and vol_ok and btc_up and float(row["close"]) < indicators["lower"]:
  194. signal = "entry_short"
  195. target_side = "short"
  196. next_state = ObserverState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts, None, None, None)
  197. return next_state, {
  198. "decision_candle_ts": candle_ts,
  199. "decision_candle_time": candle_time,
  200. "signal": signal,
  201. "target_side": target_side,
  202. "reentry_gate": reentry_gate,
  203. "indicators": indicators,
  204. "params": {
  205. "band_length": BAND_LENGTH,
  206. "bandwidth_lookback": BANDWIDTH_LOOKBACK,
  207. "bandwidth_quantile": BANDWIDTH_QUANTILE,
  208. "stop_loss_pct": STOP_LOSS_PCT,
  209. "extreme_take_profit_pct": EXTREME_TAKE_PROFIT_PCT,
  210. "eth_vol_cap": ETH_VOL_CAP,
  211. "cooldown_bars": COOLDOWN_BARS,
  212. "reentry_bars": REENTRY_BARS,
  213. "entry_btc_filter": "btc-up",
  214. "reentry_gate_mode": "btc_against",
  215. },
  216. }
  217. def write_heartbeat(state_dir: Path, payload: dict[str, object]) -> None:
  218. state_dir.mkdir(parents=True, exist_ok=True)
  219. (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  220. def run_once(state_dir: Path, frame: pd.DataFrame | None = None) -> dict[str, object]:
  221. state_dir.mkdir(parents=True, exist_ok=True)
  222. state_path = state_dir / STATE_FILE
  223. previous_state = load_state(state_path)
  224. if frame is None:
  225. frame = load_pair_frame(OkxClient())
  226. next_state, signal = signal_from_frame(frame, previous_state)
  227. save_state(state_path, next_state)
  228. payload = {
  229. "created_at": now_iso(),
  230. "mode": "bb_squeeze_t_gated_readonly_observer",
  231. "orders_submitted": 0,
  232. "strategy": "bb-squeeze-t-l96-bw960-q0.25-sl0.01-xtp0.035-both-btc-up-vc0.006-dd0.25-cd24-tre96-btc_against",
  233. "previous_state": asdict(previous_state),
  234. "next_state": asdict(next_state),
  235. "signal": signal,
  236. "candles": {
  237. "rows": len(frame),
  238. "first_time": pd.Timestamp(frame.iloc[0]["time"]).isoformat().replace("+00:00", "Z"),
  239. "last_time": pd.Timestamp(frame.iloc[-1]["time"]).isoformat().replace("+00:00", "Z"),
  240. },
  241. "risk_limits": {
  242. "no_order_submission": True,
  243. "no_cancel_submission": True,
  244. "execution": "read_only_signal_stream",
  245. },
  246. }
  247. write_heartbeat(state_dir, payload)
  248. append_jsonl(state_dir / EVENTS_FILE, payload)
  249. return payload
  250. def main() -> int:
  251. parser = argparse.ArgumentParser(description="Run BB squeeze T-gated read-only observer.")
  252. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  253. parser.add_argument("--interval-seconds", type=int, default=300)
  254. parser.add_argument("--once", action="store_true")
  255. args = parser.parse_args()
  256. frame: pd.DataFrame | None = None
  257. while True:
  258. try:
  259. frame = refresh_pair_frame(OkxClient(), frame)
  260. payload = run_once(args.state_dir, frame)
  261. print(json.dumps(payload, indent=2, sort_keys=True))
  262. except Exception as exc:
  263. error = {"created_at": now_iso(), "mode": "bb_squeeze_t_gated_readonly_observer", "orders_submitted": 0, "error": str(exc)}
  264. write_heartbeat(args.state_dir, error)
  265. append_jsonl(args.state_dir / EVENTS_FILE, error)
  266. print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
  267. if args.once:
  268. return 1
  269. if args.once:
  270. return 0
  271. time.sleep(args.interval_seconds)
  272. if __name__ == "__main__":
  273. raise SystemExit(main())