| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- from __future__ import annotations
- import argparse
- import json
- import os
- import sys
- import time
- from dataclasses import asdict
- from datetime import UTC, datetime
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.config import load_config
- from okx_codex_trader.candles import align_candles_by_ts
- from okx_codex_trader.bb_squeeze_strategy import (
- BANDWIDTH_LOOKBACK,
- COOLDOWN_BARS,
- EMPTY_STATE,
- STOP_LOSS_PCT,
- TAKE_PROFIT_PCT,
- StrategyState,
- signal_from_frame,
- strategy_name,
- )
- from okx_codex_trader.live_execution import (
- TargetPosition,
- current_position_from_okx,
- plan_position_delta,
- render_market_order_bodies,
- )
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- ROOT = Path(__file__).resolve().parents[1]
- STATE_DIR = ROOT / "var" / "bb-squeeze-executor"
- STATE_FILE = "runtime-state.json"
- EVENTS_FILE = "events.jsonl"
- SYMBOL = "ETH-USDT-SWAP"
- BTC_SYMBOL = "BTC-USDT-SWAP"
- BAR = "15m"
- LEVERAGE = 3
- LIVE_CANDLE_LIMIT = 1_200
- RECENT_CANDLE_LIMIT = 20
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def load_state(path: Path) -> StrategyState:
- if not path.exists():
- return EMPTY_STATE
- payload = json.loads(path.read_text(encoding="utf-8"))
- return StrategyState(
- last_candle_ts=payload["last_candle_ts"],
- active_side=payload["active_side"],
- entry_price=payload["entry_price"],
- entry_candle_ts=payload["entry_candle_ts"],
- cooldown_until_ts=payload["cooldown_until_ts"],
- middle_exit_streak=int(payload.get("middle_exit_streak", 0)),
- max_favorable_move_pct=payload.get("max_favorable_move_pct"),
- )
- def save_state(path: Path, state: StrategyState) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(json.dumps(asdict(state), indent=2, sort_keys=True) + "\n", encoding="utf-8")
- def append_event(state_dir: Path, payload: dict[str, object]) -> None:
- state_dir.mkdir(parents=True, exist_ok=True)
- with (state_dir / EVENTS_FILE).open("a", encoding="utf-8") as handle:
- handle.write(json.dumps(payload, sort_keys=True) + "\n")
- def append_loop_error(state_dir: Path, message: str) -> None:
- append_event(
- state_dir,
- {
- "created_at": now_iso(),
- "mode": "bb_squeeze_live_executor_error",
- "execution_error": message,
- },
- )
- def frame_from_candles(candles: list[Candle]) -> pd.DataFrame:
- frame = pd.DataFrame([asdict(candle) for candle in candles])
- frame["time"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
- def aligned_frame_from_candles(eth_candles: list[Candle], btc_candles: list[Candle]) -> pd.DataFrame:
- eth_candles, btc_candles = align_candles_by_ts(eth_candles, btc_candles)
- eth = frame_from_candles(eth_candles)
- btc = frame_from_candles(btc_candles)[["ts", "close"]].rename(columns={"close": "btc_close"})
- return eth.merge(btc, on="ts", how="inner").sort_values("ts").reset_index(drop=True)
- def load_live_frame(client: OkxClient) -> pd.DataFrame:
- return aligned_frame_from_candles(
- client.get_candles(SYMBOL, BAR, LIVE_CANDLE_LIMIT),
- client.get_candles(BTC_SYMBOL, BAR, LIVE_CANDLE_LIMIT),
- )
- def refresh_live_frame(client: OkxClient, frame: pd.DataFrame | None) -> pd.DataFrame:
- if frame is None or len(frame) < BANDWIDTH_LOOKBACK + 2:
- return load_live_frame(client)
- recent = aligned_frame_from_candles(
- client.get_recent_candles(SYMBOL, BAR, RECENT_CANDLE_LIMIT),
- client.get_recent_candles(BTC_SYMBOL, BAR, RECENT_CANDLE_LIMIT),
- )
- merged = pd.concat([frame, recent], ignore_index=True)
- merged = merged.sort_values("ts").drop_duplicates("ts", keep="last").tail(LIVE_CANDLE_LIMIT)
- return merged.reset_index(drop=True)
- def account_current_position(client: OkxClient, margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]:
- positions = client.get_positions(SYMBOL)
- metadata = client.get_instrument_meta(SYMBOL)
- mark_price = client.get_last_price(SYMBOL)
- current = current_position_from_okx(
- positions=positions,
- mark_price=mark_price,
- metadata=metadata,
- leverage=LEVERAGE,
- margin_per_unit_usdt=margin_per_unit_usdt,
- )
- return current, {
- "positions": [asdict(position) for position in positions],
- "instrument_meta": asdict(metadata),
- "mark_price": mark_price,
- }
- def target_position(signal: dict[str, object], current: TargetPosition) -> TargetPosition:
- side = str(signal["target_side"])
- if side == "flat":
- return TargetPosition(side="flat", unit=0.0, known=True, reason=str(signal["signal"]))
- if current.known and current.side == side and current.unit > 0.0:
- return TargetPosition(side=side, unit=current.unit, known=True, reason="keep existing same-side position")
- return TargetPosition(side=side, unit=1.0, known=True, reason=str(signal["signal"]))
- def risk_arg(value: float | None, env_name: str) -> float:
- if value is not None:
- return value
- raw = os.environ.get(env_name)
- if raw is None or raw == "":
- raise ValueError(f"{env_name} is required")
- return float(raw)
- def run_once(
- *,
- state_dir: Path,
- margin_per_unit_usdt: float,
- max_new_margin_usdt: float,
- max_total_margin_usdt: float,
- submit_live: bool,
- frame: pd.DataFrame | None = None,
- ) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- state_path = state_dir / STATE_FILE
- previous_state = load_state(state_path)
- client = OkxClient(load_config())
- if frame is None:
- frame = load_live_frame(client)
- next_state, signal = signal_from_frame(frame, previous_state)
- current, account = account_current_position(client, margin_per_unit_usdt)
- if previous_state.active_side is not None and current.known and current.side == "flat":
- decision_candle_ts = int(signal["decision_candle_ts"])
- next_state = StrategyState(
- decision_candle_ts,
- None,
- None,
- None,
- decision_candle_ts + (COOLDOWN_BARS * 900_000),
- 0,
- None,
- )
- signal = {
- **signal,
- "signal": "external_flat_sync",
- "target_side": "flat",
- }
- target = target_position(signal, current)
- plan = plan_position_delta(current, target)
- orders = ()
- if current.known and target.known:
- from okx_codex_trader.models import InstrumentMeta
- meta = account["instrument_meta"]
- orders = render_market_order_bodies(
- plan=plan,
- symbol=SYMBOL,
- mark_price=float(account["mark_price"]),
- metadata=InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])),
- leverage=LEVERAGE,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- client_order_id_prefix=f"bbsq-{signal['decision_candle_ts']}",
- stop_loss_pct=STOP_LOSS_PCT,
- take_profit_pct=TAKE_PROFIT_PCT,
- )
- snapshot = {
- "created_at": now_iso(),
- "mode": "bb_squeeze_live_executor" if submit_live else "bb_squeeze_dry_run_executor",
- "orders_submitted": 0,
- "strategy": strategy_name(),
- "previous_state": asdict(previous_state),
- "next_state": asdict(next_state),
- "signal": signal,
- "current_position": asdict(current),
- "target_position": asdict(target),
- "execution_plan": {
- "current": asdict(plan.current),
- "target": asdict(plan.target),
- "actions": [asdict(action) for action in plan.actions],
- },
- "rendered_orders": [asdict(order) for order in orders],
- "account": account,
- "risk_limits": {
- "submit_enabled": submit_live,
- "margin_per_unit_usdt": margin_per_unit_usdt,
- "max_new_margin_usdt": max_new_margin_usdt,
- "max_total_margin_usdt": max_total_margin_usdt,
- },
- }
- if submit_live:
- client.ensure_hedge_mode()
- submitted = []
- try:
- for rendered in orders:
- body = rendered.body
- client.set_leverage(symbol=SYMBOL, leverage=LEVERAGE, pos_side=body["posSide"])
- submitted.append(asdict(client.submit_market_order_body(body)))
- except ValueError as exc:
- snapshot["orders_submitted"] = len(submitted)
- snapshot["submitted_orders"] = submitted
- snapshot["execution_error"] = str(exc)
- append_event(state_dir, snapshot)
- raise
- snapshot["orders_submitted"] = len(submitted)
- snapshot["submitted_orders"] = submitted
- save_state(state_path, next_state)
- append_event(state_dir, snapshot)
- return snapshot
- def main() -> int:
- parser = argparse.ArgumentParser(description="Run BB squeeze live executor.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--margin-per-unit-usdt", type=float)
- parser.add_argument("--max-new-margin-usdt", type=float)
- parser.add_argument("--max-total-margin-usdt", type=float)
- parser.add_argument("--submit-live", action="store_true")
- parser.add_argument("--confirm-live", action="store_true")
- parser.add_argument("--loop", action="store_true")
- parser.add_argument("--poll-seconds", type=float, default=10.0)
- args = parser.parse_args()
- if args.submit_live != args.confirm_live:
- raise ValueError("--submit-live and --confirm-live must be used together")
- margin_per_unit_usdt = risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT")
- max_new_margin_usdt = risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT")
- max_total_margin_usdt = risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT")
- if not args.loop:
- snapshot = run_once(
- state_dir=args.state_dir,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- submit_live=args.submit_live,
- )
- print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
- return 0
- frame: pd.DataFrame | None = None
- while True:
- try:
- frame = refresh_live_frame(OkxClient(), frame)
- state = load_state(args.state_dir / STATE_FILE)
- _, loop_signal = signal_from_frame(frame, state)
- if loop_signal["signal"] != "state_replay":
- snapshot = run_once(
- state_dir=args.state_dir,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- submit_live=args.submit_live,
- frame=frame,
- )
- print(json.dumps(snapshot, indent=2, sort_keys=True), flush=True)
- except ValueError as exc:
- append_loop_error(args.state_dir, str(exc))
- print(json.dumps({"created_at": now_iso(), "execution_error": str(exc)}, sort_keys=True), flush=True)
- time.sleep(args.poll_seconds)
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|