from __future__ import annotations import argparse import json import os import sys import time from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.config import load_config from okx_codex_trader.live_execution import ( TargetPosition, current_position_from_okx, plan_position_delta, render_market_order_bodies, ) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient ROOT = Path(__file__).resolve().parents[1] STATE_DIR = ROOT / "var" / "bb-squeeze-executor" STATE_FILE = "runtime-state.json" EVENTS_FILE = "events.jsonl" SYMBOL = "ETH-USDT-SWAP" BAR = "15m" LEVERAGE = 3 BAND_LENGTH = 48 BANDWIDTH_LOOKBACK = 960 BANDWIDTH_QUANTILE = 0.25 STOP_LOSS_PCT = 0.01 ETH_VOL_CAP = 0.006 COOLDOWN_BARS = 24 LIVE_CANDLE_LIMIT = 1_200 RECENT_CANDLE_LIMIT = 20 @dataclass(frozen=True) class StrategyState: last_candle_ts: int | None active_side: str | None entry_price: float | None entry_candle_ts: int | None cooldown_until_ts: int | None EMPTY_STATE = StrategyState(None, None, None, None, None) def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def load_state(path: Path) -> StrategyState: if not path.exists(): return EMPTY_STATE payload = json.loads(path.read_text(encoding="utf-8")) return StrategyState( last_candle_ts=payload["last_candle_ts"], active_side=payload["active_side"], entry_price=payload["entry_price"], entry_candle_ts=payload["entry_candle_ts"], cooldown_until_ts=payload["cooldown_until_ts"], ) def save_state(path: Path, state: StrategyState) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8") def append_event(state_dir: Path, payload: dict[str, object]) -> None: state_dir.mkdir(parents=True, exist_ok=True) with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True) + "\n") def frame_from_candles(candles: list[Candle]) -> pd.DataFrame: frame = pd.DataFrame([asdict(candle) for candle in candles]) frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True) def load_live_frame(client: OkxClient) -> pd.DataFrame: return frame_from_candles(client.get_candles(SYMBOL, BAR, LIVE_CANDLE_LIMIT)) def refresh_live_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame: if frame is None or len(frame) < BANDWIDTH_LOOKBACK + 2: return load_live_frame(client) recent = frame_from_candles(client.get_recent_candles(SYMBOL, BAR, RECENT_CANDLE_LIMIT)) merged = pd.concat([frame, recent], ignore_index=True) merged = merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT) return merged.reset_index(drop=True) def signal_from_frame(frame: pd.DataFrame, state: StrategyState) -> tuple[StrategyState, dict[str, object]]: if len(frame) < BANDWIDTH_LOOKBACK + 2: raise ValueError("not enough candles") close = frame["close"].astype(float) middle = close.rolling(BAND_LENGTH).mean() stdev = close.rolling(BAND_LENGTH).std(ddof=0) upper = middle + (2.0 * stdev) lower = middle - (2.0 * stdev) bandwidth = (upper - lower) / middle threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE) eth_vol = close.pct_change().rolling(96).std(ddof=0) decision_index = len(frame) - 1 row = frame.iloc[decision_index] candle_ts = int(row["ts"]) candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z") indicators = { "eth_close": float(row["close"]), "middle": float(middle.iloc[decision_index]), "upper": float(upper.iloc[decision_index]), "lower": float(lower.iloc[decision_index]), "bandwidth": float(bandwidth.iloc[decision_index]), "bandwidth_threshold": float(threshold.iloc[decision_index]), "eth_vol_96": float(eth_vol.iloc[decision_index]), } if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts: return state, { "decision_candle_ts": candle_ts, "decision_candle_time": candle_time, "signal": "state_replay", "target_side": state.active_side or "flat", "indicators": indicators, } next_state = StrategyState(candle_ts, state.active_side, state.entry_price, state.entry_candle_ts, state.cooldown_until_ts) signal = "hold" target_side = state.active_side or "flat" if state.active_side is not None: entry_price = float(state.entry_price) stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT) stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop) middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or ( state.active_side == "short" and float(row["close"]) > indicators["middle"] ) if stop_hit or middle_exit: signal = "exit_stop" if stop_hit else "exit_middle" target_side = "flat" next_state = StrategyState( candle_ts, None, None, None, candle_ts + (COOLDOWN_BARS * 900_000), ) else: cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"] vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP if cooldown_ok and compressed and vol_ok and float(row["close"]) > indicators["upper"]: signal = "entry_long" target_side = "long" next_state = StrategyState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts) elif cooldown_ok and compressed and vol_ok and float(row["close"]) < indicators["lower"]: signal = "entry_short" target_side = "short" next_state = StrategyState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts) return next_state, { "decision_candle_ts": candle_ts, "decision_candle_time": candle_time, "signal": signal, "target_side": target_side, "indicators": indicators, "params": { "band_length": BAND_LENGTH, "bandwidth_lookback": BANDWIDTH_LOOKBACK, "bandwidth_quantile": BANDWIDTH_QUANTILE, "stop_loss_pct": STOP_LOSS_PCT, "eth_vol_cap": ETH_VOL_CAP, "cooldown_bars": COOLDOWN_BARS, "side_mode": "both", }, } def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]: positions = client.get_positions(SYMBOL) metadata = client.get_instrument_meta(SYMBOL) mark_price = client.get_last_price(SYMBOL) current = current_position_from_okx( positions=positions, mark_price=mark_price, metadata=metadata, leverage=LEVERAGE, margin_per_unit_usdt=margin_per_unit_usdt, ) return current, { "positions": [asdict(position) for position in positions], "instrument_meta": asdict(metadata), "mark_price": mark_price, } def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition: side = str(signal["target_side"]) if side == "flat": return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"])) if current.known and current.side == side and current.unit > 0.0: return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position") return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"])) def risk_arg(value: float | None, env_name: str) -> float: if value is not None: return value raw = os.environ.get(env_name) if raw is None or raw == "": raise ValueError(f"{env_name} is required") return float(raw) def run_once( *, state_dir: Path, margin_per_unit_usdt: float, max_new_margin_usdt: float, max_total_margin_usdt: float, submit_live: bool, frame: pd.DataFrame | None = None, ) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) state_path = state_dir / STATE_FILE previous_state = load_state(state_path) client = OkxClient(load_config()) if frame is None: frame = load_live_frame(client) next_state, signal = signal_from_frame(frame, previous_state) current, account = account_current_position(client, margin_per_unit_usdt) target = target_position(signal, current) plan = plan_position_delta(current, target) orders = () if current.known and target.known: from okx_codex_trader.models import InstrumentMeta meta = account["instrument_meta"] orders = render_market_order_bodies( plan=plan, symbol=SYMBOL, mark_price=float(account["mark_price"]), metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])), leverage=LEVERAGE, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}", stop_loss_pct=STOP_LOSS_PCT, ) snapshot = { "created_at": now_iso(), "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor", "orders_submitted": 0, "strategy": "bb-squeeze-l48-bw960-q0.25-sl0.01-tpnone-both-none-vc0.006-ddnone-cd24", "previous_state": asdict(previous_state), "next_state": asdict(next_state), "signal": signal, "current_position": asdict(current), "target_position": asdict(target), "execution_plan": { "current": asdict(plan.current), "target": asdict(plan.target), "actions": [asdict(action) for action in plan.actions], }, "rendered_orders": [asdict(order) for order in orders], "account": account, "risk_limits": { "submit_enabled": submit_live, "margin_per_unit_usdt": margin_per_unit_usdt, "max_new_margin_usdt": max_new_margin_usdt, "max_total_margin_usdt": max_total_margin_usdt, }, } if submit_live: client.ensure_hedge_mode() submitted = [] try: for rendered in orders: body = rendered.body client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"]) submitted.append(asdict(client.submit_market_order_body(body))) except ValueError as exc: snapshot["orders_submitted"] = len(submitted) snapshot["submitted_orders"] = submitted snapshot["execution_error"] = str(exc) append_event(state_dir, snapshot) raise snapshot["orders_submitted"] = len(submitted) snapshot["submitted_orders"] = submitted save_state(state_path, next_state) append_event(state_dir, snapshot) return snapshot def main() -> int: parser = argparse.ArgumentParser(description="Run BB squeeze live executor.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--margin-per-unit-usdt", type=float) parser.add_argument("--max-new-margin-usdt", type=float) parser.add_argument("--max-total-margin-usdt", type=float) parser.add_argument("--submit-live", action="store_true") parser.add_argument("--confirm-live", action="store_true") parser.add_argument("--loop", action="store_true") parser.add_argument("--poll-seconds", type=float, default=10.0) args = parser.parse_args() if args.submit_live != args.confirm_live: raise ValueError("--submit-live and --confirm-live must be used together") margin_per_unit_usdt = risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT") max_new_margin_usdt = risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT") max_total_margin_usdt = risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT") if not args.loop: snapshot = run_once( state_dir=args.state_dir, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, submit_live=args.submit_live, ) print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True) return 0 frame: pd.DataFrame | None = None while True: frame = refresh_live_frame(OkxClient(), frame) state = load_state(args.state_dir / STATE_FILE) _, loop_signal = signal_from_frame(frame, state) if loop_signal["signal"] != "state_replay": snapshot = run_once( state_dir=args.state_dir, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, submit_live=args.submit_live, frame=frame, ) print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True) time.sleep(args.poll_seconds) return 0 if __name__ == "__main__": raise SystemExit(main())