| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- import time
- from dataclasses import asdict, dataclass
- from datetime import UTC, datetime
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.okx_client import OkxClient
- from scripts import explore_ultrashort as explore
- ROOT = Path(__file__).resolve().parents[1]
- STATE_DIR = ROOT / "var" / "short-bias-readonly"
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- SOURCE_BAR = "15m"
- HISTORY_LIMIT = 350_400
- @dataclass(frozen=True)
- class OverlayParams:
- allocation: float = 0.05
- bar: str = "1h"
- btc_trend: int = 1440
- btc_lookback: int = 336
- vol_lookback: int = 336
- btc_max_momentum: float = -0.005
- btc_min_drop: float = 0.025
- min_btc_vol: float = 0.012
- short_symbol: str = "ETH-USDT-SWAP"
- PARAMS = OverlayParams()
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def append_jsonl(path: Path, payload: dict[str, object]) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with path.open("a", encoding="utf-8") as handle:
- handle.write(json.dumps(payload, sort_keys=True, separators=(",", ":")) + "\n")
- def load_closes(client: OkxClient) -> pd.DataFrame:
- series = {}
- for symbol in SYMBOLS:
- candles = explore.get_candles_cached(client, symbol, SOURCE_BAR, HISTORY_LIMIT, explore.CANDLE_CACHE_DIR)
- frame = pd.DataFrame([asdict(candle) for candle in candles])
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts")
- series[symbol] = frame["close"].resample(PARAMS.bar, label="left", closed="left").last().dropna()
- return pd.DataFrame(series).dropna()
- def annualized_vol(returns: pd.Series) -> pd.Series:
- return returns.rolling(PARAMS.vol_lookback).std() * (365 * 24) ** 0.5
- def latest_signal(closes: pd.DataFrame) -> dict[str, object]:
- btc = closes["BTC-USDT-SWAP"]
- eth = closes["ETH-USDT-SWAP"]
- btc_trend = btc.rolling(PARAMS.btc_trend).mean()
- btc_momentum = btc / btc.shift(PARAMS.btc_lookback) - 1.0
- btc_vol = annualized_vol(btc.pct_change())
- risk_state = (btc < btc_trend) & (btc_momentum <= PARAMS.btc_max_momentum) & (btc_momentum <= -PARAMS.btc_min_drop) & (btc_vol >= PARAMS.min_btc_vol)
- active = bool(risk_state.iloc[-1])
- latest = closes.iloc[-1]
- return {
- "target_side": "short" if active else "flat",
- "target_weight": -PARAMS.allocation if active else 0.0,
- "entry_signal": active and not bool(risk_state.iloc[-2]),
- "exit_signal": (not active) and bool(risk_state.iloc[-2]),
- "risk_state": active,
- "latest_bar": {
- "ts": closes.index[-1].isoformat(),
- "btc_close": float(latest["BTC-USDT-SWAP"]),
- "eth_close": float(latest["ETH-USDT-SWAP"]),
- },
- "indicators": {
- "btc_sma_1440": float(btc_trend.iloc[-1]),
- "btc_momentum_336": float(btc_momentum.iloc[-1]),
- "btc_vol_336": float(btc_vol.iloc[-1]),
- },
- }
- def run_once(state_dir: Path) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- closes = load_closes(OkxClient())
- signal = latest_signal(closes)
- payload = {
- "created_at": now_iso(),
- "mode": "short_bias_readonly_observer",
- "orders_submitted": 0,
- "strategy": {
- "name": "rotation_plus_5pct_btc_risk_eth_short_overlay",
- "symbol": PARAMS.short_symbol,
- "bar": PARAMS.bar,
- "direction": "short_overlay_readonly",
- "params": asdict(PARAMS),
- "source_report": "reports/short-bias/overlay-mix-report.md",
- "backtest_summary": {
- "total_return": 1.512903,
- "annualized_return": 0.15593,
- "max_drawdown": 0.064999,
- "calmar": 2.398957,
- "profit_factor": 1.13144,
- "win_rate": 0.326781,
- "trades": 452,
- "return_3y": 0.549897,
- "return_1y": 0.180151,
- "return_6m": 0.019057,
- "return_3m": 0.003631,
- },
- },
- "candles": {
- "rows": len(closes),
- "first_ts": closes.index[0].isoformat(),
- "last_ts": closes.index[-1].isoformat(),
- },
- "decision": {
- "target_side": signal["target_side"],
- "target_weight": signal["target_weight"],
- "entry_signal": signal["entry_signal"],
- "exit_signal": signal["exit_signal"],
- "intent": "observe_only_no_order_submission",
- },
- "signal": signal,
- "risk_limits": {
- "no_order_submission": True,
- "no_cancel_submission": True,
- "execution": "read_only_signal_stream",
- },
- }
- (state_dir / "heartbeat.json").write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- append_jsonl(state_dir / "observer-events.jsonl", payload)
- return payload
- def main() -> int:
- parser = argparse.ArgumentParser(description="Run short-bias overlay read-only observer.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--interval-seconds", type=int, default=300)
- parser.add_argument("--once", action="store_true")
- args = parser.parse_args()
- while True:
- payload = run_once(args.state_dir)
- print(json.dumps(payload, indent=2, sort_keys=True))
- if args.once:
- return 0
- time.sleep(args.interval_seconds)
- if __name__ == "__main__":
- raise SystemExit(main())
|