from __future__ import annotations import argparse import csv import json import math import sys from dataclasses import dataclass from datetime import UTC, datetime, timedelta from decimal import Decimal, ROUND_DOWN from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from okx_codex_trader.models import Candle from okx_codex_trader.okx_client import OkxClient SYMBOL = "ETH-USDT-SWAP" BAR = "15m" BAR_MS = 900_000 LEVERAGE = 3 TREND_SMA = 60 RSI_LENGTH = 2 RSI_THRESHOLD = 3.0 EXIT_RSI = 50.0 STOP_LOSS_PCT = 0.012 MAX_HOLD_BARS = 48 ENTRY_OFFSETS = (0.003, 0.006, 0.009) ENTRY_VALID_BARS = 4 REPORT_DIR = ROOT / "reports" / "eth-exploration" JSON_REPORT = REPORT_DIR / "eth-signal-intent-readonly.json" MARKDOWN_REPORT = REPORT_DIR / "eth-signal-intent-readonly.md" LOCAL_CANDLES = ROOT / "data" / "okx-candles" / SYMBOL / f"{BAR}.csv" @dataclass(frozen=True) class Instrument: ct_val: Decimal lot_sz: Decimal min_sz: Decimal tick_sz: Decimal def utc_text(ts: int) -> str: return datetime.fromtimestamp(ts / 1000, UTC).strftime("%Y-%m-%d %H:%M:%S UTC") def iso_text(ts: int) -> str: return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z") def decimal_text(value: Decimal) -> str: return format(value.normalize(), "f") def floor_to_step(value: Decimal, step: Decimal) -> Decimal: return (value / step).to_integral_value(rounding=ROUND_DOWN) * step def sma(values: list[float], length: int, index: int) -> float: if index + 1 < length: return float("nan") return sum(values[index + 1 - length : index + 1]) / length def compute_rsi(values: list[float], length: int) -> list[float]: rsi = [float("nan")] * len(values) if len(values) <= length: return rsi gains: list[float] = [0.0] losses: list[float] = [0.0] for prev, current in zip(values, values[1:]): delta = current - prev gains.append(max(delta, 0.0)) losses.append(max(-delta, 0.0)) average_gain = sum(gains[1 : length + 1]) / length average_loss = sum(losses[1 : length + 1]) / length for index in range(length, len(values)): if index > length: average_gain = ((average_gain * (length - 1)) + gains[index]) / length average_loss = ((average_loss * (length - 1)) + losses[index]) / length if average_loss == 0.0: rsi[index] = 100.0 if average_gain > 0.0 else 50.0 else: relative_strength = average_gain / average_loss rsi[index] = 100.0 - (100.0 / (1.0 + relative_strength)) return rsi def load_local_candles(path: Path, symbol: str) -> list[Candle]: candles: list[Candle] = [] with path.open("r", encoding="utf-8", newline="") as handle: for row in csv.DictReader(handle): candles.append( Candle( symbol=symbol, ts=int(row["ts"]), open=float(row["open"]), high=float(row["high"]), low=float(row["low"]), close=float(row["close"]), volume=float(row["volume"]), ) ) return sorted(candles, key=lambda candle: candle.ts) def load_okx_candles(symbol: str, bar: str, limit: int) -> list[Candle]: return OkxClient().get_candles(symbol, bar, limit) def load_okx_instrument(symbol: str) -> Instrument: data = OkxClient()._request("GET", "/api/v5/public/instruments", params={"instType": "SWAP", "instId": symbol}) if not data or not isinstance(data[0], dict): raise ValueError("okx instrument payload is invalid") row = data[0] return Instrument( ct_val=Decimal(str(row["ctVal"])), lot_sz=Decimal(str(row["lotSz"])), min_sz=Decimal(str(row["minSz"])), tick_sz=Decimal(str(row["tickSz"])), ) def build_payload(args: argparse.Namespace) -> dict[str, object]: candles = load_okx_candles(args.symbol, args.bar, args.limit) if args.source == "okx" else load_local_candles(args.local_candles, args.symbol) candles = candles[-args.limit :] if len(candles) < TREND_SMA + 2: raise ValueError("not enough candles") latest_confirmed = candles[-1] decision_index = len(candles) - 2 decision = candles[decision_index] closes = [candle.close for candle in candles[: decision_index + 1]] rsi_values = compute_rsi(closes, RSI_LENGTH) trend = sma(closes, TREND_SMA, len(closes) - 1) rsi2 = rsi_values[-1] if math.isnan(trend) or math.isnan(rsi2): raise ValueError("not enough indicator history") signal = decision.close > trend and rsi2 <= RSI_THRESHOLD instrument = load_okx_instrument(args.symbol) if args.instrument == "okx" else Instrument( ct_val=Decimal(args.ct_val), lot_sz=Decimal(args.lot_sz), min_sz=Decimal(args.min_sz), tick_sz=Decimal(args.tick_sz), ) planned_margin = Decimal(args.planned_margin_usdt) slice_margin = planned_margin / Decimal(len(ENTRY_OFFSETS)) created_at = datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") first_valid_ts = decision.ts + BAR_MS cancel_after_ts = decision.ts + (ENTRY_VALID_BARS + 1) * BAR_MS expires_at = datetime.fromtimestamp(cancel_after_ts / 1000, UTC).isoformat().replace("+00:00", "Z") orders: list[dict[str, object]] = [] if signal: for index, offset in enumerate(ENTRY_OFFSETS, start=1): raw_px = Decimal(str(decision.close)) * (Decimal("1") - Decimal(str(offset))) px = floor_to_step(raw_px, instrument.tick_sz) notional = slice_margin * Decimal(LEVERAGE) sz = floor_to_step(notional / (px * instrument.ct_val), instrument.lot_sz) if sz < instrument.min_sz: raise ValueError("planned margin is below instrument minimum size") orders.append( { "level": index, "offset": offset, "payload": { "instId": args.symbol, "tdMode": "isolated", "side": "buy", "posSide": "long", "ordType": "post_only", "px": decimal_text(px), "sz": decimal_text(sz), "clOrdId": f"ethrtwap15m-{decision.ts}-l{index}", }, "estimated_slice_margin_usdt": decimal_text(slice_margin), "estimated_notional_usdt": decimal_text(notional), "valid_from_candle_ts": first_valid_ts, "valid_from": iso_text(first_valid_ts), "expires_after_candle_ts": decision.ts + ENTRY_VALID_BARS * BAR_MS, "cancel_at_ts": cancel_after_ts, "cancel_at": expires_at, "state_fields": { "client_order_id": f"ethrtwap15m-{decision.ts}-l{index}", "okx_order_id": None, "level": index, "price": decimal_text(px), "requested_size": decimal_text(sz), "filled_size": "0", "state": "intent_only", "created_at": created_at, "expires_at": expires_at, }, } ) return { "mode": "readonly_order_intent", "submitted_orders": 0, "source": {"candles": args.source, "instrument": args.instrument}, "strategy": { "symbol": args.symbol, "bar": args.bar, "direction": "long", "leverage": LEVERAGE, "trend_sma": TREND_SMA, "rsi_length": RSI_LENGTH, "entry_signal": "decision_close > sma60 and rsi2 <= 3.0", "entry_offsets": list(ENTRY_OFFSETS), "entry_valid_bars": ENTRY_VALID_BARS, "exit_rsi": EXIT_RSI, "max_hold_bars": MAX_HOLD_BARS, "stop_loss_pct": STOP_LOSS_PCT, }, "clock": { "latest_confirmed_candle_ts": latest_confirmed.ts, "latest_confirmed_candle_time": utc_text(latest_confirmed.ts), "decision_candle_ts": decision.ts, "decision_candle_time": utc_text(decision.ts), "indicator_rule": "confirmed 15m candles strictly before the latest confirmed candle", }, "decision": { "close": decision.close, "sma60": trend, "rsi2": rsi2, "signal": signal, "reason": "triggered" if signal else "no signal", }, "instrument": { "ctVal": decimal_text(instrument.ct_val), "lotSz": decimal_text(instrument.lot_sz), "minSz": decimal_text(instrument.min_sz), "tickSz": decimal_text(instrument.tick_sz), }, "risk": { "planned_margin_usdt": decimal_text(planned_margin), "slice_margin_usdt": decimal_text(slice_margin), "stop_price_if_all_levels_fill": decimal_text( floor_to_step( sum(Decimal(order["payload"]["px"]) for order in orders) / Decimal(len(orders)) * (Decimal("1") - Decimal(str(STOP_LOSS_PCT))), instrument.tick_sz, ) ) if orders else None, }, "order_intents": orders, "state_preview": { "strategy_id": "eth_robust_twap_15m_readonly", "state": "entry_orders_open" if orders else "idle", "last_confirmed_candle_ts": latest_confirmed.ts, "last_signal_candle_ts": decision.ts if signal else None, "orders": [order["state_fields"] for order in orders], "position": { "filled_contracts": "0", "filled_base_qty": "0", "filled_notional_usdt": "0", "avg_entry_price": None, "margin_used": "0", "stop_price": None, "first_fill_ts": None, "entry_candle_ts": None, }, "updated_at": created_at, }, } def write_reports(payload: dict[str, object]) -> None: REPORT_DIR.mkdir(parents=True, exist_ok=True) JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") decision = payload["decision"] clock = payload["clock"] orders = payload["order_intents"] lines = [ "# ETH signal intent readonly dry-run", "", "No orders were submitted. This report is an order-intent draft only.", "", "## Signal", "", f"- Latest confirmed candle: `{clock['latest_confirmed_candle_time']}` (`{clock['latest_confirmed_candle_ts']}`)", f"- Decision candle: `{clock['decision_candle_time']}` (`{clock['decision_candle_ts']}`)", f"- Decision close: `{decision['close']}`", f"- SMA60: `{decision['sma60']}`", f"- RSI2: `{decision['rsi2']}`", f"- Signal: `{decision['signal']}`", "", "## Order Intents", "", ] if orders: lines.extend( [ "| Level | px | sz | clOrdId | cancel_at |", "| --- | --- | --- | --- | --- |", ] ) for order in orders: body = order["payload"] lines.append(f"| {order['level']} | `{body['px']}` | `{body['sz']}` | `{body['clOrdId']}` | `{order['cancel_at']}` |") else: lines.append("No order intent was produced because the signal condition did not trigger.") lines.extend( [ "", "## State Preview", "", "```json", json.dumps(payload["state_preview"], indent=2, sort_keys=True), "```", "", "## Files", "", f"- JSON: `{JSON_REPORT.relative_to(ROOT)}`", f"- Markdown: `{MARKDOWN_REPORT.relative_to(ROOT)}`", ] ) MARKDOWN_REPORT.write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser(description="Build read-only ETH Robust TWAP signal/order-intent payloads.") parser.add_argument("--source", choices=("local", "okx"), default="local") parser.add_argument("--instrument", choices=("static", "okx"), default="static") parser.add_argument("--symbol", default=SYMBOL) parser.add_argument("--bar", default=BAR) parser.add_argument("--limit", type=int, default=500) parser.add_argument("--local-candles", type=Path, default=LOCAL_CANDLES) parser.add_argument("--planned-margin-usdt", default="30") parser.add_argument("--ct-val", default="0.1") parser.add_argument("--lot-sz", default="0.01") parser.add_argument("--min-sz", default="0.01") parser.add_argument("--tick-sz", default="0.01") parser.add_argument("--no-write", action="store_true") args = parser.parse_args() payload = build_payload(args) if not args.no_write: write_reports(payload) print(json.dumps(payload, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())