from __future__ import annotations import argparse import json import sys import time from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.okx_client import OkxClient from scripts import explore_ultrashort as explore ROOT = Path(__file__).resolve().parents[1] STATE_DIR = ROOT / "var" / "short-bias-readonly" SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") SOURCE_BAR = "15m" HISTORY_LIMIT = 350_400 @dataclass(frozen=True) class OverlayParams: allocation: float = 0.05 bar: str = "1h" btc_trend: int = 1440 btc_lookback: int = 336 vol_lookback: int = 336 btc_max_momentum: float = -0.005 btc_min_drop: float = 0.025 min_btc_vol: float = 0.012 short_symbol: str = "ETH-USDT-SWAP" PARAMS = OverlayParams() def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def append_jsonl(path: Path, payload: dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n") def load_closes(client: OkxClient) -> pd.DataFrame: series = {} for symbol in SYMBOLS: candles = explore.get_candles_cached(client, symbol, SOURCE_BAR, HISTORY_LIMIT, explore.CANDLE_CACHE_DIR) frame = pd.DataFrame([asdict(candle) for candle in candles]) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") series[symbol] = frame["close"].resample(PARAMS.bar, label="left", closed="left").last().dropna() return pd.DataFrame(series).dropna() def annualized_vol(returns: pd.Series) -> pd.Series: return returns.rolling(PARAMS.vol_lookback).std() * (365 * 24) ** 0.5 def latest_signal(closes: pd.DataFrame) -> dict[str, object]: btc = closes["BTC-USDT-SWAP"] eth = closes["ETH-USDT-SWAP"] btc_trend = btc.rolling(PARAMS.btc_trend).mean() btc_momentum = btc / btc.shift(PARAMS.btc_lookback) - 1.0 btc_vol = annualized_vol(btc.pct_change()) risk_state = (btc < btc_trend) & (btc_momentum <= PARAMS.btc_max_momentum) & (btc_momentum <= -PARAMS.btc_min_drop) & (btc_vol >= PARAMS.min_btc_vol) active = bool(risk_state.iloc[-1]) latest = closes.iloc[-1] return { "target_side": "short" if active else "flat", "target_weight": -PARAMS.allocation if active else 0.0, "entry_signal": active and not bool(risk_state.iloc[-2]), "exit_signal": (not active) and bool(risk_state.iloc[-2]), "risk_state": active, "latest_bar": { "ts": closes.index[-1].isoformat(), "btc_close": float(latest["BTC-USDT-SWAP"]), "eth_close": float(latest["ETH-USDT-SWAP"]), }, "indicators": { "btc_sma_1440": float(btc_trend.iloc[-1]), "btc_momentum_336": float(btc_momentum.iloc[-1]), "btc_vol_336": float(btc_vol.iloc[-1]), }, } def run_once(state_dir: Path) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) closes = load_closes(OkxClient()) signal = latest_signal(closes) payload = { "created_at": now_iso(), "mode": "short_bias_readonly_observer", "orders_submitted": 0, "strategy": { "name": "rotation_plus_5pct_btc_risk_eth_short_overlay", "symbol": PARAMS.short_symbol, "bar": PARAMS.bar, "direction": "short_overlay_readonly", "params": asdict(PARAMS), "source_report": "reports/short-bias/overlay-mix-report.md", "backtest_summary": { "total_return": 1.512903, "annualized_return": 0.15593, "max_drawdown": 0.064999, "calmar": 2.398957, "profit_factor": 1.13144, "win_rate": 0.326781, "trades": 452, "return_3y": 0.549897, "return_1y": 0.180151, "return_6m": 0.019057, "return_3m": 0.003631, }, }, "candles": { "rows": len(closes), "first_ts": closes.index[0].isoformat(), "last_ts": closes.index[-1].isoformat(), }, "decision": { "target_side": signal["target_side"], "target_weight": signal["target_weight"], "entry_signal": signal["entry_signal"], "exit_signal": signal["exit_signal"], "intent": "observe_only_no_order_submission", }, "signal": signal, "risk_limits": { "no_order_submission": True, "no_cancel_submission": True, "execution": "read_only_signal_stream", }, } (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") append_jsonl(state_dir / "observer-events.jsonl", payload) return payload def main() -> int: parser = argparse.ArgumentParser(description="Run short-bias overlay read-only observer.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--interval-seconds", type=int, default=300) parser.add_argument("--once", action="store_true") args = parser.parse_args() while True: payload = run_once(args.state_dir) print(json.dumps(payload, indent=2, sort_keys=True)) if args.once: return 0 time.sleep(args.interval_seconds) if __name__ == "__main__": raise SystemExit(main())