| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- import time
- from dataclasses import asdict
- from datetime import UTC, datetime
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- ROOT = Path(__file__).resolve().parents[1]
- DATA_DIR = ROOT / "data" / "okx-candles"
- STATE_DIR = ROOT / "var" / "crash-follow-short-observer"
- EVENTS_FILE = "observer-events.jsonl"
- ETH_SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- SOURCE_BAR = "15m"
- LIVE_CANDLE_LIMIT = 1_200
- RECENT_CANDLE_LIMIT = 300
- FAST = 20
- SLOW = 120
- LOOKBACK = 8
- THRESHOLD = 0.035
- STOP_LOSS_PCT = 0.02
- TAKE_PROFIT_PCT = 0.06
- HOLD_BARS = 96
- MIN_FRAME_ROWS = max(SLOW, 180, LOOKBACK * 2, 160, 120 + 24) + 2
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def append_jsonl(path: Path, payload: dict[str, object]) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("a", encoding="utf-8") as handle:
- handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
- def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
- frame = pd.DataFrame([asdict(candle) for candle in candles])
- frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
- def load_pair_frame(client: OkxClient) -> pd.DataFrame:
- eth_path = DATA_DIR / ETH_SYMBOL / f"{SOURCE_BAR}.csv"
- btc_path = DATA_DIR / BTC_SYMBOL / f"{SOURCE_BAR}.csv"
- if eth_path.exists() and btc_path.exists():
- eth = pd.read_csv(eth_path).tail(LIVE_CANDLE_LIMIT)
- btc = pd.read_csv(btc_path).tail(LIVE_CANDLE_LIMIT)
- eth["time"] = pd.to_datetime(eth["ts"], unit="ms", utc=True)
- btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
- merged = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
- return resample_hourly(merged)
- eth = frame_from_candles(client.get_candles(ETH_SYMBOL, SOURCE_BAR, LIVE_CANDLE_LIMIT))
- btc = frame_from_candles(client.get_candles(BTC_SYMBOL, SOURCE_BAR, LIVE_CANDLE_LIMIT))
- btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
- merged = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
- return resample_hourly(merged)
- def resample_hourly(frame: pd.DataFrame) -> pd.DataFrame:
- hourly = (
- frame.set_index("time")
- .resample("1h", label="left", closed="left")
- .agg(
- ts=("ts", "first"),
- open=("open", "first"),
- high=("high", "max"),
- low=("low", "min"),
- close=("close", "last"),
- volume=("volume", "sum"),
- btc_close=("btc_close", "last"),
- )
- .dropna()
- .reset_index()
- )
- hourly["ts"] = hourly["ts"].astype(int)
- return hourly
- def refresh_pair_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
- if frame is None or len(frame) < MIN_FRAME_ROWS:
- return load_pair_frame(client)
- eth = frame_from_candles(client.get_recent_candles(ETH_SYMBOL, SOURCE_BAR, RECENT_CANDLE_LIMIT))
- btc = frame_from_candles(client.get_recent_candles(BTC_SYMBOL, SOURCE_BAR, RECENT_CANDLE_LIMIT))
- btc = btc[["ts", "close"]].rename(columns={"close": "btc_close"})
- recent = eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
- recent = resample_hourly(recent)
- merged = pd.concat([frame, recent], ignore_index=True)
- return merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT).reset_index(drop=True)
- def rsi(close: pd.Series, length: int) -> pd.Series:
- diff = close.diff()
- gain = diff.clip(lower=0).ewm(alpha=1 / length, adjust=False).mean()
- loss = (-diff.clip(upper=0)).ewm(alpha=1 / length, adjust=False).mean()
- return 100 - 100 / (1 + gain / loss)
- def latest_signal(frame: pd.DataFrame) -> dict[str, object]:
- if len(frame) < MIN_FRAME_ROWS:
- raise ValueError("not enough candles")
- close = frame["close"].astype(float)
- high = frame["high"].astype(float)
- low = frame["low"].astype(float)
- btc_close = frame["btc_close"].astype(float)
- fast = close.ewm(span=FAST, adjust=False).mean()
- slow = close.ewm(span=SLOW, adjust=False).mean()
- ret = close / close.shift(LOOKBACK) - 1.0
- range_pct = (high - low) / close
- range_rank = range_pct.rolling(160).rank(pct=True)
- rsi14 = rsi(close, 14)
- btc_slow = btc_close.rolling(120).mean()
- btc_drop = btc_close / btc_close.shift(24) - 1.0
- gate = (btc_close < btc_slow) & (btc_drop < -0.015)
- entry = gate & (close < slow) & (ret < -THRESHOLD) & (range_rank > 0.75)
- exit_ = (close > fast) | (rsi14 > 52)
- index = len(frame) - 1
- row = frame.iloc[index]
- active = bool(entry.iloc[index])
- return {
- "decision_candle_ts": int(row["ts"]),
- "decision_candle_time": pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z"),
- "signal": "entry_short" if active else "hold",
- "target_side": "short" if active else "flat",
- "entry_signal": active,
- "exit_signal": bool(exit_.iloc[index]),
- "indicators": {
- "eth_close": float(close.iloc[index]),
- "eth_fast_ema_20": float(fast.iloc[index]),
- "eth_slow_ema_120": float(slow.iloc[index]),
- "eth_return_8h": float(ret.iloc[index]),
- "eth_range_rank_160": float(range_rank.iloc[index]),
- "eth_rsi_14": float(rsi14.iloc[index]),
- "btc_close": float(btc_close.iloc[index]),
- "btc_sma_120": float(btc_slow.iloc[index]),
- "btc_return_24h": float(btc_drop.iloc[index]),
- "btc_riskoff_gate": bool(gate.iloc[index]),
- },
- "params": {
- "bar": "1H",
- "fast": FAST,
- "slow": SLOW,
- "lookback": LOOKBACK,
- "threshold": THRESHOLD,
- "stop_loss_pct": STOP_LOSS_PCT,
- "take_profit_pct": TAKE_PROFIT_PCT,
- "hold_bars": HOLD_BARS,
- "gate": "btc_riskoff",
- },
- }
- def write_heartbeat(state_dir: Path, payload: dict[str, object]) -> None:
- state_dir.mkdir(parents=True, exist_ok=True)
- (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- def run_once(state_dir: Path, frame: pd.DataFrame | None = None) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- if frame is None:
- frame = load_pair_frame(OkxClient())
- signal = latest_signal(frame)
- payload = {
- "created_at": now_iso(),
- "mode": "crash_follow_short_readonly_observer",
- "orders_submitted": 0,
- "strategy": "crash_follow-1H-f20-s120-lb8-th0.035-sl0.02-tp0.06-h96-btc_riskoff",
- "signal": signal,
- "candles": {
- "rows": len(frame),
- "first_time": pd.Timestamp(frame.iloc[0]["time"]).isoformat().replace("+00:00", "Z"),
- "last_time": pd.Timestamp(frame.iloc[-1]["time"]).isoformat().replace("+00:00", "Z"),
- },
- "backtest_summary": {
- "available_full_return": 1.0114,
- "available_full_max_drawdown": 0.4490,
- "available_full_trades": 618,
- "return_3y": 0.4300,
- "return_1y": 0.4003,
- "return_6m": 0.2002,
- "return_3m": 0.0613,
- },
- "risk_limits": {
- "no_order_submission": True,
- "no_cancel_submission": True,
- "execution": "read_only_signal_stream",
- },
- }
- write_heartbeat(state_dir, payload)
- append_jsonl(state_dir / EVENTS_FILE, payload)
- return payload
- def main() -> int:
- parser = argparse.ArgumentParser(description="Run crash-follow short read-only observer.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--interval-seconds", type=int, default=300)
- parser.add_argument("--once", action="store_true")
- args = parser.parse_args()
- frame: pd.DataFrame | None = None
- while True:
- try:
- frame = refresh_pair_frame(OkxClient(), frame)
- payload = run_once(args.state_dir, frame)
- print(json.dumps(payload, indent=2, sort_keys=True))
- except Exception as exc:
- error = {"created_at": now_iso(), "mode": "crash_follow_short_readonly_observer", "orders_submitted": 0, "error": str(exc)}
- write_heartbeat(args.state_dir, error)
- append_jsonl(args.state_dir / EVENTS_FILE, error)
- print(json.dumps(error, indent=2, sort_keys=True), file=sys.stderr)
- if args.once:
- return 1
- if args.once:
- return 0
- time.sleep(args.interval_seconds)
- if __name__ == "__main__":
- raise SystemExit(main())
|