from __future__ import annotations import argparse import json import os import sys import time from dataclasses import asdict from datetime import UTC, datetime from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.config import load_config from okx_codex_trader.candles import align_candles_by_ts from okx_codex_trader.bb_squeeze_strategy import ( BANDWIDTH_LOOKBACK, COOLDOWN_BARS, EMPTY_STATE, STOP_LOSS_PCT, TAKE_PROFIT_PCT, StrategyState, signal_from_frame, strategy_name, ) from okx_codex_trader.live_execution import ( TargetPosition, current_position_from_okx, plan_position_delta, render_market_order_bodies, ) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient ROOT = Path(__file__).resolve().parents[1] STATE_DIR = ROOT / "var" / "bb-squeeze-executor" STATE_FILE = "runtime-state.json" EVENTS_FILE = "events.jsonl" SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" LEVERAGE = 3 LIVE_CANDLE_LIMIT = 1_200 RECENT_CANDLE_LIMIT = 20 def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def load_state(path: Path) -> StrategyState: if not path.exists(): return EMPTY_STATE payload = json.loads(path.read_text(encoding="utf-8")) return StrategyState( last_candle_ts=payload["last_candle_ts"], active_side=payload["active_side"], entry_price=payload["entry_price"], entry_candle_ts=payload["entry_candle_ts"], cooldown_until_ts=payload["cooldown_until_ts"], middle_exit_streak=int(payload.get("middle_exit_streak", 0)), max_favorable_move_pct=payload.get("max_favorable_move_pct"), ) def save_state(path: Path, state: StrategyState) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8") def append_event(state_dir: Path, payload: dict[str, object]) -> None: state_dir.mkdir(parents=True, exist_ok=True) with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True) + "\n") def append_loop_error(state_dir: Path, message: str) -> None: append_event( state_dir, { "created_at": now_iso(), "mode": "bb_squeeze_live_executor_error", "execution_error": message, }, ) def frame_from_candles(candles: list[Candle]) -> pd.DataFrame: frame = pd.DataFrame([asdict(candle) for candle in candles]) frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True) def aligned_frame_from_candles(eth_candles: list[Candle], btc_candles: list[Candle]) -> pd.DataFrame: eth_candles, btc_candles = align_candles_by_ts(eth_candles, btc_candles) eth = frame_from_candles(eth_candles) btc = frame_from_candles(btc_candles)[["ts", "close"]].rename(columns={"close": "btc_close"}) return eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True) def load_live_frame(client: OkxClient) -> pd.DataFrame: return aligned_frame_from_candles( client.get_candles(SYMBOL, BAR, LIVE_CANDLE_LIMIT), client.get_candles(BTC_SYMBOL, BAR, LIVE_CANDLE_LIMIT), ) def refresh_live_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame: if frame is None or len(frame) < BANDWIDTH_LOOKBACK + 2: return load_live_frame(client) recent = aligned_frame_from_candles( client.get_recent_candles(SYMBOL, BAR, RECENT_CANDLE_LIMIT), client.get_recent_candles(BTC_SYMBOL, BAR, RECENT_CANDLE_LIMIT), ) merged = pd.concat([frame, recent], ignore_index=True) merged = merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT) return merged.reset_index(drop=True) def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]: positions = client.get_positions(SYMBOL) metadata = client.get_instrument_meta(SYMBOL) mark_price = client.get_last_price(SYMBOL) current = current_position_from_okx( positions=positions, mark_price=mark_price, metadata=metadata, leverage=LEVERAGE, margin_per_unit_usdt=margin_per_unit_usdt, ) return current, { "positions": [asdict(position) for position in positions], "instrument_meta": asdict(metadata), "mark_price": mark_price, } def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition: side = str(signal["target_side"]) if side == "flat": return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"])) if current.known and current.side == side and current.unit > 0.0: return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position") return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"])) def risk_arg(value: float | None, env_name: str) -> float: if value is not None: return value raw = os.environ.get(env_name) if raw is None or raw == "": raise ValueError(f"{env_name} is required") return float(raw) def run_once( *, state_dir: Path, margin_per_unit_usdt: float, max_new_margin_usdt: float, max_total_margin_usdt: float, submit_live: bool, frame: pd.DataFrame | None = None, ) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) state_path = state_dir / STATE_FILE previous_state = load_state(state_path) client = OkxClient(load_config()) if frame is None: frame = load_live_frame(client) next_state, signal = signal_from_frame(frame, previous_state) current, account = account_current_position(client, margin_per_unit_usdt) if previous_state.active_side is not None and current.known and current.side == "flat": decision_candle_ts = int(signal["decision_candle_ts"]) next_state = StrategyState( decision_candle_ts, None, None, None, decision_candle_ts + (COOLDOWN_BARS * 900_000), 0, None, ) signal = { **signal, "signal": "external_flat_sync", "target_side": "flat", } target = target_position(signal, current) plan = plan_position_delta(current, target) orders = () if current.known and target.known: from okx_codex_trader.models import InstrumentMeta meta = account["instrument_meta"] orders = render_market_order_bodies( plan=plan, symbol=SYMBOL, mark_price=float(account["mark_price"]), metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])), leverage=LEVERAGE, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}", stop_loss_pct=STOP_LOSS_PCT, take_profit_pct=TAKE_PROFIT_PCT, ) snapshot = { "created_at": now_iso(), "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor", "orders_submitted": 0, "strategy": strategy_name(), "previous_state": asdict(previous_state), "next_state": asdict(next_state), "signal": signal, "current_position": asdict(current), "target_position": asdict(target), "execution_plan": { "current": asdict(plan.current), "target": asdict(plan.target), "actions": [asdict(action) for action in plan.actions], }, "rendered_orders": [asdict(order) for order in orders], "account": account, "risk_limits": { "submit_enabled": submit_live, "margin_per_unit_usdt": margin_per_unit_usdt, "max_new_margin_usdt": max_new_margin_usdt, "max_total_margin_usdt": max_total_margin_usdt, }, } if submit_live: client.ensure_hedge_mode() submitted = [] try: for rendered in orders: body = rendered.body client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"]) submitted.append(asdict(client.submit_market_order_body(body))) except ValueError as exc: snapshot["orders_submitted"] = len(submitted) snapshot["submitted_orders"] = submitted snapshot["execution_error"] = str(exc) append_event(state_dir, snapshot) raise snapshot["orders_submitted"] = len(submitted) snapshot["submitted_orders"] = submitted save_state(state_path, next_state) append_event(state_dir, snapshot) return snapshot def main() -> int: parser = argparse.ArgumentParser(description="Run BB squeeze live executor.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--margin-per-unit-usdt", type=float) parser.add_argument("--max-new-margin-usdt", type=float) parser.add_argument("--max-total-margin-usdt", type=float) parser.add_argument("--submit-live", action="store_true") parser.add_argument("--confirm-live", action="store_true") parser.add_argument("--loop", action="store_true") parser.add_argument("--poll-seconds", type=float, default=10.0) args = parser.parse_args() if args.submit_live != args.confirm_live: raise ValueError("--submit-live and --confirm-live must be used together") margin_per_unit_usdt = risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT") max_new_margin_usdt = risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT") max_total_margin_usdt = risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT") if not args.loop: snapshot = run_once( state_dir=args.state_dir, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, submit_live=args.submit_live, ) print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True) return 0 frame: pd.DataFrame | None = None while True: try: frame = refresh_live_frame(OkxClient(), frame) state = load_state(args.state_dir / STATE_FILE) _, loop_signal = signal_from_frame(frame, state) if loop_signal["signal"] != "state_replay": snapshot = run_once( state_dir=args.state_dir, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, submit_live=args.submit_live, frame=frame, ) print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True) except ValueError as exc: append_loop_error(args.state_dir, str(exc)) print(json.dumps({"created_at": now_iso(), "execution_error": str(exc)}, sort_keys=True), flush=True) time.sleep(args.poll_seconds) return 0 if __name__ == "__main__": raise SystemExit(main())