| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- from __future__ import annotations
- import argparse
- import json
- import os
- import sys
- from dataclasses import asdict
- from datetime import UTC, datetime
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader import eth_nextgen_micro
- from okx_codex_trader.config import load_config
- from okx_codex_trader.live_execution import (
- TargetPosition,
- current_position_from_okx,
- load_runtime_state,
- plan_position_delta,
- render_market_order_bodies,
- target_from_signal,
- )
- from okx_codex_trader.okx_client import OkxClient
- ROOT = Path(__file__).resolve().parents[1]
- STATE_DIR = ROOT / "var" / "eth-nextgen-micro"
- SYMBOL = "ETH-USDT-SWAP"
- LEVERAGE = 3
- def now_iso() -> str:
- return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- def unknown_current_position(reason: str) -> TargetPosition:
- return TargetPosition(side="flat", unit=0.0, known=False, reason=reason)
- def account_current_position(margin_per_unit_usdt: float) -> tuple[TargetPosition, dict[str, object]]:
- try:
- client = OkxClient(load_config())
- except ValueError as exc:
- return unknown_current_position(str(exc)), {"account_error": str(exc)}
- try:
- positions = client.get_positions(SYMBOL)
- metadata = client.get_instrument_meta(SYMBOL)
- mark_price = client.get_last_price(SYMBOL)
- current = current_position_from_okx(
- positions=positions,
- mark_price=mark_price,
- metadata=metadata,
- leverage=LEVERAGE,
- margin_per_unit_usdt=margin_per_unit_usdt,
- )
- return current, {
- "positions": [asdict(position) for position in positions],
- "instrument_meta": asdict(metadata),
- "mark_price": mark_price,
- }
- except ValueError as exc:
- return unknown_current_position(str(exc)), {"account_error": str(exc)}
- EXECUTOR_STATE_FILENAME = "executor-runtime-state.json"
- def build_snapshot(*, state_dir: Path, margin_per_unit_usdt: float, max_new_margin_usdt: float, max_total_margin_usdt: float) -> dict[str, object]:
- state_dir.mkdir(parents=True, exist_ok=True)
- payload = eth_nextgen_micro.build_payload()
- previous_state = load_runtime_state(state_dir / EXECUTOR_STATE_FILENAME)
- next_state, target = target_from_signal(payload, previous_state)
- current, account = account_current_position(margin_per_unit_usdt)
- plan = plan_position_delta(current, target)
- orders = ()
- if current.known and target.known:
- orders = render_market_order_bodies(
- plan=plan,
- symbol=SYMBOL,
- mark_price=float(account["mark_price"]),
- metadata=client_metadata(account),
- leverage=LEVERAGE,
- margin_per_unit_usdt=margin_per_unit_usdt,
- max_new_margin_usdt=max_new_margin_usdt,
- max_total_margin_usdt=max_total_margin_usdt,
- client_order_id_prefix=f"ethnm-{target_candle_ts(payload)}",
- )
- return {
- "created_at": now_iso(),
- "mode": "dry_run_executor",
- "orders_submitted": 0,
- "signal": payload["decision"],
- "execution_intent": payload["execution_intent"],
- "previous_state": asdict(previous_state),
- "next_state": asdict(next_state),
- "current_position": asdict(current),
- "target_position": asdict(target),
- "execution_plan": {
- "current": asdict(plan.current),
- "target": asdict(plan.target),
- "actions": [asdict(action) for action in plan.actions],
- },
- "rendered_orders": [asdict(order) for order in orders],
- "account": account,
- "risk_limits": {
- "submit_enabled": False,
- "max_new_margin_usdt": max_new_margin_usdt,
- "max_total_margin_usdt": max_total_margin_usdt,
- "margin_per_unit_usdt": margin_per_unit_usdt,
- "state_write_required_before_live": True,
- },
- }
- def risk_arg(value: float | None, env_name: str) -> float:
- if value is not None:
- return value
- raw = os.environ.get(env_name)
- if raw is None or raw == "":
- raise ValueError(f"{env_name} is required")
- return float(raw)
- def target_candle_ts(payload: dict[str, object]) -> int:
- if payload["decision"]["active_engine"] == "nextgen":
- return int(payload["nextgen"]["data"]["decision_candle_ts"])
- return int(payload["micro"]["decision_candle_ts"])
- def client_metadata(account: dict[str, object]):
- from okx_codex_trader.models import InstrumentMeta
- meta = account["instrument_meta"]
- return InstrumentMeta(ct_val=float(meta["ct_val"]), lot_sz=float(meta["lot_sz"]), min_sz=float(meta["min_sz"]))
- def main() -> int:
- parser = argparse.ArgumentParser(description="Build ETH nextgen+micro live execution dry-run snapshot.")
- parser.add_argument("--state-dir", type=Path, default=STATE_DIR)
- parser.add_argument("--margin-per-unit-usdt", type=float)
- parser.add_argument("--max-new-margin-usdt", type=float)
- parser.add_argument("--max-total-margin-usdt", type=float)
- args = parser.parse_args()
- snapshot = build_snapshot(
- state_dir=args.state_dir,
- margin_per_unit_usdt=risk_arg(args.margin_per_unit_usdt, "ETH_NEXTGEN_MARGIN_PER_UNIT_USDT"),
- max_new_margin_usdt=risk_arg(args.max_new_margin_usdt, "ETH_NEXTGEN_MAX_NEW_MARGIN_USDT"),
- max_total_margin_usdt=risk_arg(args.max_total_margin_usdt, "ETH_NEXTGEN_MAX_TOTAL_MARGIN_USDT"),
- )
- print(json.dumps(snapshot, indent=2, sort_keys=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|