run_crash_follow_short_observer.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import sys
  5. import time
  6. from dataclasses import asdict
  7. from datetime import UTC, datetime
  8. from pathlib import Path
  9. import pandas as pd
  10. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  11. from okx_codex_trader.models import Candle
  12. from okx_codex_trader.okx_client import OkxClient
  13. ROOT = Path(__file__).resolve().parents[1]
  14. DATA_DIR = ROOT / "data" / "okx-candles"
  15. STATE_DIR = ROOT / "var" / "crash-follow-short-observer"
  16. EVENTS_FILE = "observer-events.jsonl"
  17. ETH_SYMBOL = "ETH-USDT-SWAP"
  18. BTC_SYMBOL = "BTC-USDT-SWAP"
  19. SOURCE_BAR = "15m"
  20. LIVE_CANDLE_LIMIT = 1_200
  21. RECENT_CANDLE_LIMIT = 300
  22. FAST = 20
  23. SLOW = 120
  24. LOOKBACK = 8
  25. THRESHOLD = 0.035
  26. STOP_LOSS_PCT = 0.02
  27. TAKE_PROFIT_PCT = 0.06
  28. HOLD_BARS = 96
  29. MIN_FRAME_ROWS = max(SLOW, 180, LOOKBACK * 2, 160, 120 + 24) + 2
  30. def now_iso() -> str:
  31. return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
  32. def append_jsonl(path: Path, payload: dict[str, object]) -> None:
  33. path.parent.mkdir(parents=True, exist_ok=True)
  34. with path.open("a", encoding="utf-8") as handle:
  35. handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
  36. def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
  37. frame = pd.DataFrame([asdict(candle) for candle in candles])
  38. frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
  39. return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
  40. def load_pair_frame(client: OkxClient) -> pd.DataFrame:
  41. eth_path = DATA_DIR / ETH_SYMBOL / f"{SOURCE_BAR}.csv"
  42. btc_path = DATA_DIR / BTC_SYMBOL / f"{SOURCE_BAR}.csv"
  43. if eth_path.exists() and btc_path.exists():
  44. eth = pd.read_csv(eth_path).tail(LIVE_CANDLE_LIMIT)
  45. btc = pd.read_csv(btc_path).tail(LIVE_CANDLE_LIMIT)
  46. eth["time"] = pd.to_datetime(eth["ts"], unit="ms", utc=True)
  47. btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
  48. merged = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
  49. return resample_hourly(merged)
  50. eth = frame_from_candles(client.get_candles(ETH_SYMBOL, SOURCE_BAR, LIVE_CANDLE_LIMIT))
  51. btc = frame_from_candles(client.get_candles(BTC_SYMBOL, SOURCE_BAR, LIVE_CANDLE_LIMIT))
  52. btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
  53. merged = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
  54. return resample_hourly(merged)
  55. def resample_hourly(frame: pd.DataFrame) -> pd.DataFrame:
  56. hourly = (
  57. frame.set_index("time")
  58. .resample("1h", label="left", closed="left")
  59. .agg(
  60. ts=("ts", "first"),
  61. open=("open", "first"),
  62. high=("high", "max"),
  63. low=("low", "min"),
  64. close=("close", "last"),
  65. volume=("volume", "sum"),
  66. btc_close=("btc_close", "last"),
  67. )
  68. .dropna()
  69. .reset_index()
  70. )
  71. hourly["ts"] = hourly["ts"].astype(int)
  72. return hourly
  73. def refresh_pair_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
  74. if frame is None or len(frame) < MIN_FRAME_ROWS:
  75. return load_pair_frame(client)
  76. eth = frame_from_candles(client.get_recent_candles(ETH_SYMBOL, SOURCE_BAR, RECENT_CANDLE_LIMIT))
  77. btc = frame_from_candles(client.get_recent_candles(BTC_SYMBOL, SOURCE_BAR, RECENT_CANDLE_LIMIT))
  78. btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
  79. recent = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
  80. recent = resample_hourly(recent)
  81. merged = pd.concat([frame, recent], ignore_index=True)
  82. return merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True)
  83. def rsi(close: pd.Series, length: int) -> pd.Series:
  84. diff = close.diff()
  85. gain = diff.clip(lower=0).ewm(alpha=1 / length, adjust=False).mean()
  86. loss = (-diff.clip(upper=0)).ewm(alpha=1 / length, adjust=False).mean()
  87. return 100 - 100 / (1 + gain / loss)
  88. def latest_signal(frame: pd.DataFrame) -> dict[str, object]:
  89. if len(frame) < MIN_FRAME_ROWS:
  90. raise ValueError("not enough candles")
  91. close = frame["close"].astype(float)
  92. high = frame["high"].astype(float)
  93. low = frame["low"].astype(float)
  94. btc_close = frame["btc_close"].astype(float)
  95. fast = close.ewm(span=FAST, adjust=False).mean()
  96. slow = close.ewm(span=SLOW, adjust=False).mean()
  97. ret = close / close.shift(LOOKBACK) - 1.0
  98. range_pct = (high - low) / close
  99. range_rank = range_pct.rolling(160).rank(pct=True)
  100. rsi14 = rsi(close, 14)
  101. btc_slow = btc_close.rolling(120).mean()
  102. btc_drop = btc_close / btc_close.shift(24) - 1.0
  103. gate = (btc_close < btc_slow) & (btc_drop < -0.015)
  104. entry = gate & (close < slow) & (ret < -THRESHOLD) & (range_rank > 0.75)
  105. exit_ = (close > fast) | (rsi14 > 52)
  106. index = len(frame) - 1
  107. row = frame.iloc[index]
  108. active = bool(entry.iloc[index])
  109. return {
  110. "decision_candle_ts": int(row["ts"]),
  111. "decision_candle_time": pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z"),
  112. "signal": "entry_short" if active else "hold",
  113. "target_side": "short" if active else "flat",
  114. "entry_signal": active,
  115. "exit_signal": bool(exit_.iloc[index]),
  116. "indicators": {
  117. "eth_close": float(close.iloc[index]),
  118. "eth_fast_ema_20": float(fast.iloc[index]),
  119. "eth_slow_ema_120": float(slow.iloc[index]),
  120. "eth_return_8h": float(ret.iloc[index]),
  121. "eth_range_rank_160": float(range_rank.iloc[index]),
  122. "eth_rsi_14": float(rsi14.iloc[index]),
  123. "btc_close": float(btc_close.iloc[index]),
  124. "btc_sma_120": float(btc_slow.iloc[index]),
  125. "btc_return_24h": float(btc_drop.iloc[index]),
  126. "btc_riskoff_gate": bool(gate.iloc[index]),
  127. },
  128. "params": {
  129. "bar": "1H",
  130. "fast": FAST,
  131. "slow": SLOW,
  132. "lookback": LOOKBACK,
  133. "threshold": THRESHOLD,
  134. "stop_loss_pct": STOP_LOSS_PCT,
  135. "take_profit_pct": TAKE_PROFIT_PCT,
  136. "hold_bars": HOLD_BARS,
  137. "gate": "btc_riskoff",
  138. },
  139. }
  140. def write_heartbeat(state_dir: Path, payload: dict[str, object]) -> None:
  141. state_dir.mkdir(parents=True, exist_ok=True)
  142. (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  143. def run_once(state_dir: Path, frame: pd.DataFrame | None = None) -> dict[str, object]:
  144. state_dir.mkdir(parents=True, exist_ok=True)
  145. if frame is None:
  146. frame = load_pair_frame(OkxClient())
  147. signal = latest_signal(frame)
  148. payload = {
  149. "created_at": now_iso(),
  150. "mode": "crash_follow_short_readonly_observer",
  151. "orders_submitted": 0,
  152. "strategy": "crash_follow-1H-f20-s120-lb8-th0.035-sl0.02-tp0.06-h96-btc_riskoff",
  153. "signal": signal,
  154. "candles": {
  155. "rows": len(frame),
  156. "first_time": pd.Timestamp(frame.iloc[0]["time"]).isoformat().replace("+00:00", "Z"),
  157. "last_time": pd.Timestamp(frame.iloc[-1]["time"]).isoformat().replace("+00:00", "Z"),
  158. },
  159. "backtest_summary": {
  160. "available_full_return": 1.0114,
  161. "available_full_max_drawdown": 0.4490,
  162. "available_full_trades": 618,
  163. "return_3y": 0.4300,
  164. "return_1y": 0.4003,
  165. "return_6m": 0.2002,
  166. "return_3m": 0.0613,
  167. },
  168. "risk_limits": {
  169. "no_order_submission": True,
  170. "no_cancel_submission": True,
  171. "execution": "read_only_signal_stream",
  172. },
  173. }
  174. write_heartbeat(state_dir, payload)
  175. append_jsonl(state_dir / EVENTS_FILE, payload)
  176. return payload
  177. def main() -> int:
  178. parser = argparse.ArgumentParser(description="Run crash-follow short read-only observer.")
  179. parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
  180. parser.add_argument("--interval-seconds", type=int, default=300)
  181. parser.add_argument("--once", action="store_true")
  182. args = parser.parse_args()
  183. frame: pd.DataFrame | None = None
  184. while True:
  185. try:
  186. frame = refresh_pair_frame(OkxClient(), frame)
  187. payload = run_once(args.state_dir, frame)
  188. print(json.dumps(payload, indent=2, sort_keys=True))
  189. except Exception as exc:
  190. error = {"created_at": now_iso(), "mode": "crash_follow_short_readonly_observer", "orders_submitted": 0, "error": str(exc)}
  191. write_heartbeat(args.state_dir, error)
  192. append_jsonl(args.state_dir / EVENTS_FILE, error)
  193. print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
  194. if args.once:
  195. return 1
  196. if args.once:
  197. return 0
  198. time.sleep(args.interval_seconds)
  199. if __name__ == "__main__":
  200. raise SystemExit(main())