| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- from __future__ import annotations
- import argparse
- import json
- import os
- import sys
- import time
- from dataclasses import asdict, dataclass
- from datetime import UTC, datetime
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.config import load_config
- from okx_codex_trader.live_execution import (
- TargetPosition,
- current_position_from_okx,
- plan_position_delta,
- render_market_order_bodies,
- )
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- ROOT = Path(__file__).resolve().parents[1]
- STATE_DIR = ROOT / "var" / "bb-squeeze-executor"
- STATE_FILE = "runtime-state.json"
- EVENTS_FILE = "events.jsonl"
- SYMBOL = "ETH-USDT-SWAP"
- BAR = "15m"
- LEVERAGE = 3
- BAND_LENGTH = 48
- BANDWIDTH_LOOKBACK = 960
- BANDWIDTH_QUANTILE = 0.25
- STOP_LOSS_PCT = 0.01
- ETH_VOL_CAP = 0.006
- COOLDOWN_BARS = 24
- LIVE_CANDLE_LIMIT = 1_200
- RECENT_CANDLE_LIMIT = 20
- @dataclass(frozen=True)
- class StrategyState:
- last_candle_ts: int | None
- active_side: str | None
- entry_price: float | None
- entry_candle_ts: int | None
- cooldown_until_ts: int | None
- EMPTY_STATE = StrategyState(None, None, None, None, None)
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def load_state(path: Path) -> StrategyState:
- if not path.exists():
- return EMPTY_STATE
- payload = json.loads(path.read_text(encoding="utf-8"))
- return StrategyState(
- last_candle_ts=payload["last_candle_ts"],
- active_side=payload["active_side"],
- entry_price=payload["entry_price"],
- entry_candle_ts=payload["entry_candle_ts"],
- cooldown_until_ts=payload["cooldown_until_ts"],
- )
- def save_state(path: Path, state: StrategyState) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
- def append_event(state_dir: Path, payload: dict[str, object]) -> None:
- state_dir.mkdir(parents=True, exist_ok=True)
- with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle:
- handle.write(json.dumps(payload, sort_keys=True) + "\n")
- def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
- frame = pd.DataFrame([asdict(candle) for candle in candles])
- frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
- def load_live_frame(client: OkxClient) -> pd.DataFrame:
- return frame_from_candles(client.get_candles(SYMBOL, BAR, LIVE_CANDLE_LIMIT))
- def refresh_live_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
- if frame is None or len(frame) < BANDWIDTH_LOOKBACK + 2:
- return load_live_frame(client)
- recent = frame_from_candles(client.get_recent_candles(SYMBOL, BAR, RECENT_CANDLE_LIMIT))
- merged = pd.concat([frame, recent], ignore_index=True)
- merged = merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT)
- return merged.reset_index(drop=True)
- def signal_from_frame(frame: pd.DataFrame, state: StrategyState) -> tuple[StrategyState, dict[str, object]]:
- if len(frame) < BANDWIDTH_LOOKBACK + 2:
- raise ValueError("not enough candles")
- close = frame["close"].astype(float)
- middle = close.rolling(BAND_LENGTH).mean()
- stdev = close.rolling(BAND_LENGTH).std(ddof=0)
- upper = middle + (2.0 * stdev)
- lower = middle - (2.0 * stdev)
- bandwidth = (upper - lower) / middle
- threshold = bandwidth.rolling(BANDWIDTH_LOOKBACK).quantile(BANDWIDTH_QUANTILE)
- eth_vol = close.pct_change().rolling(96).std(ddof=0)
- decision_index = len(frame) - 1
- row = frame.iloc[decision_index]
- candle_ts = int(row["ts"])
- candle_time = pd.Timestamp(row["time"]).isoformat().replace("+00:00", "Z")
- indicators = {
- "eth_close": float(row["close"]),
- "middle": float(middle.iloc[decision_index]),
- "upper": float(upper.iloc[decision_index]),
- "lower": float(lower.iloc[decision_index]),
- "bandwidth": float(bandwidth.iloc[decision_index]),
- "bandwidth_threshold": float(threshold.iloc[decision_index]),
- "eth_vol_96": float(eth_vol.iloc[decision_index]),
- }
- if state.last_candle_ts is not None and candle_ts <= state.last_candle_ts:
- return state, {
- "decision_candle_ts": candle_ts,
- "decision_candle_time": candle_time,
- "signal": "state_replay",
- "target_side": state.active_side or "flat",
- "indicators": indicators,
- }
- next_state = StrategyState(candle_ts, state.active_side, state.entry_price, state.entry_candle_ts, state.cooldown_until_ts)
- signal = "hold"
- target_side = state.active_side or "flat"
- if state.active_side is not None:
- entry_price = float(state.entry_price)
- stop = entry_price * (1.0 - STOP_LOSS_PCT if state.active_side == "long" else 1.0 + STOP_LOSS_PCT)
- stop_hit = (state.active_side == "long" and float(row["low"]) <= stop) or (state.active_side == "short" and float(row["high"]) >= stop)
- middle_exit = (state.active_side == "long" and float(row["close"]) < indicators["middle"]) or (
- state.active_side == "short" and float(row["close"]) > indicators["middle"]
- )
- if stop_hit or middle_exit:
- signal = "exit_stop" if stop_hit else "exit_middle"
- target_side = "flat"
- next_state = StrategyState(
- candle_ts,
- None,
- None,
- None,
- candle_ts + (COOLDOWN_BARS * 900_000),
- )
- else:
- cooldown_ok = state.cooldown_until_ts is None or candle_ts >= state.cooldown_until_ts
- compressed = indicators["bandwidth"] <= indicators["bandwidth_threshold"]
- vol_ok = indicators["eth_vol_96"] <= ETH_VOL_CAP
- if cooldown_ok and compressed and vol_ok and float(row["close"]) > indicators["upper"]:
- signal = "entry_long"
- target_side = "long"
- next_state = StrategyState(candle_ts, "long", float(row["close"]), candle_ts, state.cooldown_until_ts)
- elif cooldown_ok and compressed and vol_ok and float(row["close"]) < indicators["lower"]:
- signal = "entry_short"
- target_side = "short"
- next_state = StrategyState(candle_ts, "short", float(row["close"]), candle_ts, state.cooldown_until_ts)
- return next_state, {
- "decision_candle_ts": candle_ts,
- "decision_candle_time": candle_time,
- "signal": signal,
- "target_side": target_side,
- "indicators": indicators,
- "params": {
- "band_length": BAND_LENGTH,
- "bandwidth_lookback": BANDWIDTH_LOOKBACK,
- "bandwidth_quantile": BANDWIDTH_QUANTILE,
- "stop_loss_pct": STOP_LOSS_PCT,
- "eth_vol_cap": ETH_VOL_CAP,
- "cooldown_bars": COOLDOWN_BARS,
- "side_mode": "both",
- },
- }
- def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]:
- positions = client.get_positions(SYMBOL)
- metadata = client.get_instrument_meta(SYMBOL)
- mark_price = client.get_last_price(SYMBOL)
- current = current_position_from_okx(
- positions=positions,
- mark_price=mark_price,
- metadata=metadata,
- leverage=LEVERAGE,
- margin_per_unit_usdt=margin_per_unit_usdt,
- )
- return current, {
- "positions": [asdict(position) for position in positions],
- "instrument_meta": asdict(metadata),
- "mark_price": mark_price,
- }
- def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition:
- side = str(signal["target_side"])
- if side == "flat":
- return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"]))
- if current.known and current.side == side and current.unit > 0.0:
- return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position")
- return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"]))
- def risk_arg(value: float | None, env_name: str) -> float:
- if value is not None:
- return value
- raw = os.environ.get(env_name)
- if raw is None or raw == "":
- raise ValueError(f"{env_name} is required")
- return float(raw)
- def run_once(
- *,
- state_dir: Path,
- margin_per_unit_usdt: float,
- max_new_margin_usdt: float,
- max_total_margin_usdt: float,
- submit_live: bool,
- frame: pd.DataFrame | None = None,
- ) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- state_path = state_dir / STATE_FILE
- previous_state = load_state(state_path)
- client = OkxClient(load_config())
- if frame is None:
- frame = load_live_frame(client)
- next_state, signal = signal_from_frame(frame, previous_state)
- current, account = account_current_position(client, margin_per_unit_usdt)
- target = target_position(signal, current)
- plan = plan_position_delta(current, target)
- orders = ()
- if current.known and target.known:
- from okx_codex_trader.models import InstrumentMeta
- meta = account["instrument_meta"]
- orders = render_market_order_bodies(
- plan=plan,
- symbol=SYMBOL,
- mark_price=float(account["mark_price"]),
- metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])),
- leverage=LEVERAGE,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}",
- stop_loss_pct=STOP_LOSS_PCT,
- )
- snapshot = {
- "created_at": now_iso(),
- "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor",
- "orders_submitted": 0,
- "strategy": "bb-squeeze-l48-bw960-q0.25-sl0.01-tpnone-both-none-vc0.006-ddnone-cd24",
- "previous_state": asdict(previous_state),
- "next_state": asdict(next_state),
- "signal": signal,
- "current_position": asdict(current),
- "target_position": asdict(target),
- "execution_plan": {
- "current": asdict(plan.current),
- "target": asdict(plan.target),
- "actions": [asdict(action) for action in plan.actions],
- },
- "rendered_orders": [asdict(order) for order in orders],
- "account": account,
- "risk_limits": {
- "submit_enabled": submit_live,
- "margin_per_unit_usdt": margin_per_unit_usdt,
- "max_new_margin_usdt": max_new_margin_usdt,
- "max_total_margin_usdt": max_total_margin_usdt,
- },
- }
- if submit_live:
- client.ensure_hedge_mode()
- submitted = []
- try:
- for rendered in orders:
- body = rendered.body
- client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"])
- submitted.append(asdict(client.submit_market_order_body(body)))
- except ValueError as exc:
- snapshot["orders_submitted"] = len(submitted)
- snapshot["submitted_orders"] = submitted
- snapshot["execution_error"] = str(exc)
- append_event(state_dir, snapshot)
- raise
- snapshot["orders_submitted"] = len(submitted)
- snapshot["submitted_orders"] = submitted
- save_state(state_path, next_state)
- append_event(state_dir, snapshot)
- return snapshot
- def main() -> int:
- parser = argparse.ArgumentParser(description="Run BB squeeze live executor.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--margin-per-unit-usdt", type=float)
- parser.add_argument("--max-new-margin-usdt", type=float)
- parser.add_argument("--max-total-margin-usdt", type=float)
- parser.add_argument("--submit-live", action="store_true")
- parser.add_argument("--confirm-live", action="store_true")
- parser.add_argument("--loop", action="store_true")
- parser.add_argument("--poll-seconds", type=float, default=10.0)
- args = parser.parse_args()
- if args.submit_live != args.confirm_live:
- raise ValueError("--submit-live and --confirm-live must be used together")
- margin_per_unit_usdt = risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT")
- max_new_margin_usdt = risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT")
- max_total_margin_usdt = risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT")
- if not args.loop:
- snapshot = run_once(
- state_dir=args.state_dir,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- submit_live=args.submit_live,
- )
- print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
- return 0
- frame: pd.DataFrame | None = None
- while True:
- frame = refresh_live_frame(OkxClient(), frame)
- state = load_state(args.state_dir / STATE_FILE)
- _, loop_signal = signal_from_frame(frame, state)
- if loop_signal["signal"] != "state_replay":
- snapshot = run_once(
- state_dir=args.state_dir,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- submit_live=args.submit_live,
- frame=frame,
- )
- print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
- time.sleep(args.poll_seconds)
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|