from __future__ import annotations import argparse import json import os import sys from dataclasses import asdict from datetime import UTC, datetime from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader import eth_nextgen_micro from okx_codex_trader.config import load_config from okx_codex_trader.live_execution import ( TargetPosition, current_position_from_okx, load_runtime_state, plan_position_delta, render_market_order_bodies, target_from_signal, ) from okx_codex_trader.okx_client import OkxClient ROOT = Path(__file__).resolve().parents[1] STATE_DIR = ROOT / "var" / "eth-nextgen-micro" SYMBOL = "ETH-USDT-SWAP" LEVERAGE = 3 def now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def unknown_current_position(reason: str) -> TargetPosition: return TargetPosition(side="flat", unit=0.0, known=False, reason=reason) def account_current_position(margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]: try: client = OkxClient(load_config()) except ValueError as exc: return unknown_current_position(str(exc)), {"account_error": str(exc)} try: positions = client.get_positions(SYMBOL) metadata = client.get_instrument_meta(SYMBOL) mark_price = client.get_last_price(SYMBOL) current = current_position_from_okx( positions=positions, mark_price=mark_price, metadata=metadata, leverage=LEVERAGE, margin_per_unit_usdt=margin_per_unit_usdt, ) return current, { "positions": [asdict(position) for position in positions], "instrument_meta": asdict(metadata), "mark_price": mark_price, } except ValueError as exc: return unknown_current_position(str(exc)), {"account_error": str(exc)} EXECUTOR_STATE_FILENAME = "executor-runtime-state.json" def build_snapshot(*, state_dir: Path, margin_per_unit_usdt: float, max_new_margin_usdt: float, max_total_margin_usdt: float) -> dict[str, object]: state_dir.mkdir(parents=True, exist_ok=True) payload = eth_nextgen_micro.build_payload() previous_state = load_runtime_state(state_dir / EXECUTOR_STATE_FILENAME) next_state, target = target_from_signal(payload, previous_state) current, account = account_current_position(margin_per_unit_usdt) plan = plan_position_delta(current, target) orders = () if current.known and target.known: orders = render_market_order_bodies( plan=plan, symbol=SYMBOL, mark_price=float(account["mark_price"]), metadata=client_metadata(account), leverage=LEVERAGE, margin_per_unit_usdt=margin_per_unit_usdt, max_new_margin_usdt=max_new_margin_usdt, max_total_margin_usdt=max_total_margin_usdt, client_order_id_prefix=f"ethnm-{target_candle_ts(payload)}", ) return { "created_at": now_iso(), "mode": "dry_run_executor", "orders_submitted": 0, "signal": payload["decision"], "execution_intent": payload["execution_intent"], "previous_state": asdict(previous_state), "next_state": asdict(next_state), "current_position": asdict(current), "target_position": asdict(target), "execution_plan": { "current": asdict(plan.current), "target": asdict(plan.target), "actions": [asdict(action) for action in plan.actions], }, "rendered_orders": [asdict(order) for order in orders], "account": account, "risk_limits": { "submit_enabled": False, "max_new_margin_usdt": max_new_margin_usdt, "max_total_margin_usdt": max_total_margin_usdt, "margin_per_unit_usdt": margin_per_unit_usdt, "state_write_required_before_live": True, }, } def risk_arg(value: float | None, env_name: str) -> float: if value is not None: return value raw = os.environ.get(env_name) if raw is None or raw == "": raise ValueError(f"{env_name} is required") return float(raw) def target_candle_ts(payload: dict[str, object]) -> int: if payload["decision"]["active_engine"] == "nextgen": return int(payload["nextgen"]["data"]["decision_candle_ts"]) return int(payload["micro"]["decision_candle_ts"]) def client_metadata(account: dict[str, object]): from okx_codex_trader.models import InstrumentMeta meta = account["instrument_meta"] return InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"])) def main() -> int: parser = argparse.ArgumentParser(description="Build ETH nextgen+micro live execution dry-run snapshot.") parser.add_argument("--state-dir", type=Path, default=STATE_DIR) parser.add_argument("--margin-per-unit-usdt", type=float) parser.add_argument("--max-new-margin-usdt", type=float) parser.add_argument("--max-total-margin-usdt", type=float) args = parser.parse_args() snapshot = build_snapshot( state_dir=args.state_dir, margin_per_unit_usdt=risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT"), max_new_margin_usdt=risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT"), max_total_margin_usdt=risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT"), ) print(json.dumps(snapshot, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())