| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- from __future__ import annotations
- import argparse
- import csv
- import json
- import math
- import sys
- from dataclasses import dataclass
- from datetime import UTC, datetime, timedelta
- from decimal import Decimal, ROUND_DOWN
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- sys.path.insert(0, str(ROOT))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- SYMBOL = "ETH-USDT-SWAP"
- BAR = "15m"
- BAR_MS = 900_000
- LEVERAGE = 3
- TREND_SMA = 60
- RSI_LENGTH = 2
- RSI_THRESHOLD = 3.0
- EXIT_RSI = 50.0
- STOP_LOSS_PCT = 0.012
- MAX_HOLD_BARS = 48
- ENTRY_OFFSETS = (0.003, 0.006, 0.009)
- ENTRY_VALID_BARS = 4
- REPORT_DIR = ROOT / "reports" / "eth-exploration"
- JSON_REPORT = REPORT_DIR / "eth-signal-intent-readonly.json"
- MARKDOWN_REPORT = REPORT_DIR / "eth-signal-intent-readonly.md"
- LOCAL_CANDLES = ROOT / "data" / "okx-candles" / SYMBOL / f"{BAR}.csv"
- @dataclass(frozen=True)
- class Instrument:
- ct_val: Decimal
- lot_sz: Decimal
- min_sz: Decimal
- tick_sz: Decimal
- def utc_text(ts: int) -> str:
- return datetime.fromtimestamp(ts / 1000, UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
- def iso_text(ts: int) -> str:
- return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
- def decimal_text(value: Decimal) -> str:
- return format(value.normalize(), "f")
- def floor_to_step(value: Decimal, step: Decimal) -> Decimal:
- return (value / step).to_integral_value(rounding=ROUND_DOWN) * step
- def sma(values: list[float], length: int, index: int) -> float:
- if index + 1 < length:
- return float("nan")
- return sum(values[index + 1 - length : index + 1]) / length
- def compute_rsi(values: list[float], length: int) -> list[float]:
- rsi = [float("nan")] * len(values)
- if len(values) <= length:
- return rsi
- gains: list[float] = [0.0]
- losses: list[float] = [0.0]
- for prev, current in zip(values, values[1:]):
- delta = current - prev
- gains.append(max(delta, 0.0))
- losses.append(max(-delta, 0.0))
- average_gain = sum(gains[1 : length + 1]) / length
- average_loss = sum(losses[1 : length + 1]) / length
- for index in range(length, len(values)):
- if index > length:
- average_gain = ((average_gain * (length - 1)) + gains[index]) / length
- average_loss = ((average_loss * (length - 1)) + losses[index]) / length
- if average_loss == 0.0:
- rsi[index] = 100.0 if average_gain > 0.0 else 50.0
- else:
- relative_strength = average_gain / average_loss
- rsi[index] = 100.0 - (100.0 / (1.0 + relative_strength))
- return rsi
- def load_local_candles(path: Path, symbol: str) -> list[Candle]:
- candles: list[Candle] = []
- with path.open("r", encoding="utf-8", newline="") as handle:
- for row in csv.DictReader(handle):
- candles.append(
- Candle(
- symbol=symbol,
- ts=int(row["ts"]),
- open=float(row["open"]),
- high=float(row["high"]),
- low=float(row["low"]),
- close=float(row["close"]),
- volume=float(row["volume"]),
- )
- )
- return sorted(candles, key=lambda candle: candle.ts)
- def load_okx_candles(symbol: str, bar: str, limit: int) -> list[Candle]:
- return OkxClient().get_candles(symbol, bar, limit)
- def load_okx_instrument(symbol: str) -> Instrument:
- data = OkxClient()._request("GET", "/api/v5/public/instruments", params={"instType": "SWAP", "instId": symbol})
- if not data or not isinstance(data[0], dict):
- raise ValueError("okx instrument payload is invalid")
- row = data[0]
- return Instrument(
- ct_val=Decimal(str(row["ctVal"])),
- lot_sz=Decimal(str(row["lotSz"])),
- min_sz=Decimal(str(row["minSz"])),
- tick_sz=Decimal(str(row["tickSz"])),
- )
- def build_payload(args: argparse.Namespace) -> dict[str, object]:
- candles = load_okx_candles(args.symbol, args.bar, args.limit) if args.source == "okx" else load_local_candles(args.local_candles, args.symbol)
- candles = candles[-args.limit :]
- if len(candles) < TREND_SMA + 2:
- raise ValueError("not enough candles")
- latest_confirmed = candles[-1]
- decision_index = len(candles) - 2
- decision = candles[decision_index]
- closes = [candle.close for candle in candles[: decision_index + 1]]
- rsi_values = compute_rsi(closes, RSI_LENGTH)
- trend = sma(closes, TREND_SMA, len(closes) - 1)
- rsi2 = rsi_values[-1]
- if math.isnan(trend) or math.isnan(rsi2):
- raise ValueError("not enough indicator history")
- signal = decision.close > trend and rsi2 <= RSI_THRESHOLD
- instrument = load_okx_instrument(args.symbol) if args.instrument == "okx" else Instrument(
- ct_val=Decimal(args.ct_val),
- lot_sz=Decimal(args.lot_sz),
- min_sz=Decimal(args.min_sz),
- tick_sz=Decimal(args.tick_sz),
- )
- planned_margin = Decimal(args.planned_margin_usdt)
- slice_margin = planned_margin / Decimal(len(ENTRY_OFFSETS))
- created_at = datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
- first_valid_ts = decision.ts + BAR_MS
- cancel_after_ts = decision.ts + (ENTRY_VALID_BARS + 1) * BAR_MS
- expires_at = datetime.fromtimestamp(cancel_after_ts / 1000, UTC).isoformat().replace("+00:00", "Z")
- orders: list[dict[str, object]] = []
- if signal:
- for index, offset in enumerate(ENTRY_OFFSETS, start=1):
- raw_px = Decimal(str(decision.close)) * (Decimal("1") - Decimal(str(offset)))
- px = floor_to_step(raw_px, instrument.tick_sz)
- notional = slice_margin * Decimal(LEVERAGE)
- sz = floor_to_step(notional / (px * instrument.ct_val), instrument.lot_sz)
- if sz < instrument.min_sz:
- raise ValueError("planned margin is below instrument minimum size")
- orders.append(
- {
- "level": index,
- "offset": offset,
- "payload": {
- "instId": args.symbol,
- "tdMode": "isolated",
- "side": "buy",
- "posSide": "long",
- "ordType": "post_only",
- "px": decimal_text(px),
- "sz": decimal_text(sz),
- "clOrdId": f"ethrtwap15m-{decision.ts}-l{index}",
- },
- "estimated_slice_margin_usdt": decimal_text(slice_margin),
- "estimated_notional_usdt": decimal_text(notional),
- "valid_from_candle_ts": first_valid_ts,
- "valid_from": iso_text(first_valid_ts),
- "expires_after_candle_ts": decision.ts + ENTRY_VALID_BARS * BAR_MS,
- "cancel_at_ts": cancel_after_ts,
- "cancel_at": expires_at,
- "state_fields": {
- "client_order_id": f"ethrtwap15m-{decision.ts}-l{index}",
- "okx_order_id": None,
- "level": index,
- "price": decimal_text(px),
- "requested_size": decimal_text(sz),
- "filled_size": "0",
- "state": "intent_only",
- "created_at": created_at,
- "expires_at": expires_at,
- },
- }
- )
- return {
- "mode": "readonly_order_intent",
- "submitted_orders": 0,
- "source": {"candles": args.source, "instrument": args.instrument},
- "strategy": {
- "symbol": args.symbol,
- "bar": args.bar,
- "direction": "long",
- "leverage": LEVERAGE,
- "trend_sma": TREND_SMA,
- "rsi_length": RSI_LENGTH,
- "entry_signal": "decision_close > sma60 and rsi2 <= 3.0",
- "entry_offsets": list(ENTRY_OFFSETS),
- "entry_valid_bars": ENTRY_VALID_BARS,
- "exit_rsi": EXIT_RSI,
- "max_hold_bars": MAX_HOLD_BARS,
- "stop_loss_pct": STOP_LOSS_PCT,
- },
- "clock": {
- "latest_confirmed_candle_ts": latest_confirmed.ts,
- "latest_confirmed_candle_time": utc_text(latest_confirmed.ts),
- "decision_candle_ts": decision.ts,
- "decision_candle_time": utc_text(decision.ts),
- "indicator_rule": "confirmed 15m candles strictly before the latest confirmed candle",
- },
- "decision": {
- "close": decision.close,
- "sma60": trend,
- "rsi2": rsi2,
- "signal": signal,
- "reason": "triggered" if signal else "no signal",
- },
- "instrument": {
- "ctVal": decimal_text(instrument.ct_val),
- "lotSz": decimal_text(instrument.lot_sz),
- "minSz": decimal_text(instrument.min_sz),
- "tickSz": decimal_text(instrument.tick_sz),
- },
- "risk": {
- "planned_margin_usdt": decimal_text(planned_margin),
- "slice_margin_usdt": decimal_text(slice_margin),
- "stop_price_if_all_levels_fill": decimal_text(
- floor_to_step(
- sum(Decimal(order["payload"]["px"]) for order in orders) / Decimal(len(orders)) * (Decimal("1") - Decimal(str(STOP_LOSS_PCT))),
- instrument.tick_sz,
- )
- )
- if orders
- else None,
- },
- "order_intents": orders,
- "state_preview": {
- "strategy_id": "eth_robust_twap_15m_readonly",
- "state": "entry_orders_open" if orders else "idle",
- "last_confirmed_candle_ts": latest_confirmed.ts,
- "last_signal_candle_ts": decision.ts if signal else None,
- "orders": [order["state_fields"] for order in orders],
- "position": {
- "filled_contracts": "0",
- "filled_base_qty": "0",
- "filled_notional_usdt": "0",
- "avg_entry_price": None,
- "margin_used": "0",
- "stop_price": None,
- "first_fill_ts": None,
- "entry_candle_ts": None,
- },
- "updated_at": created_at,
- },
- }
- def write_reports(payload: dict[str, object]) -> None:
- REPORT_DIR.mkdir(parents=True, exist_ok=True)
- JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- decision = payload["decision"]
- clock = payload["clock"]
- orders = payload["order_intents"]
- lines = [
- "# ETH signal intent readonly dry-run",
- "",
- "No orders were submitted. This report is an order-intent draft only.",
- "",
- "## Signal",
- "",
- f"- Latest confirmed candle: `{clock['latest_confirmed_candle_time']}` (`{clock['latest_confirmed_candle_ts']}`)",
- f"- Decision candle: `{clock['decision_candle_time']}` (`{clock['decision_candle_ts']}`)",
- f"- Decision close: `{decision['close']}`",
- f"- SMA60: `{decision['sma60']}`",
- f"- RSI2: `{decision['rsi2']}`",
- f"- Signal: `{decision['signal']}`",
- "",
- "## Order Intents",
- "",
- ]
- if orders:
- lines.extend(
- [
- "| Level | px | sz | clOrdId | cancel_at |",
- "| --- | --- | --- | --- | --- |",
- ]
- )
- for order in orders:
- body = order["payload"]
- lines.append(f"| {order['level']} | `{body['px']}` | `{body['sz']}` | `{body['clOrdId']}` | `{order['cancel_at']}` |")
- else:
- lines.append("No order intent was produced because the signal condition did not trigger.")
- lines.extend(
- [
- "",
- "## State Preview",
- "",
- "```json",
- json.dumps(payload["state_preview"], indent=2, sort_keys=True),
- "```",
- "",
- "## Files",
- "",
- f"- JSON: `{JSON_REPORT.relative_to(ROOT)}`",
- f"- Markdown: `{MARKDOWN_REPORT.relative_to(ROOT)}`",
- ]
- )
- MARKDOWN_REPORT.write_text("\n".join(lines) + "\n", encoding="utf-8")
- def main() -> int:
- parser = argparse.ArgumentParser(description="Build read-only ETH Robust TWAP signal/order-intent payloads.")
- parser.add_argument("--source", choices=("local", "okx"), default="local")
- parser.add_argument("--instrument", choices=("static", "okx"), default="static")
- parser.add_argument("--symbol", default=SYMBOL)
- parser.add_argument("--bar", default=BAR)
- parser.add_argument("--limit", type=int, default=500)
- parser.add_argument("--local-candles", type=Path, default=LOCAL_CANDLES)
- parser.add_argument("--planned-margin-usdt", default="30")
- parser.add_argument("--ct-val", default="0.1")
- parser.add_argument("--lot-sz", default="0.01")
- parser.add_argument("--min-sz", default="0.01")
- parser.add_argument("--tick-sz", default="0.01")
- parser.add_argument("--no-write", action="store_true")
- args = parser.parse_args()
- payload = build_payload(args)
- if not args.no_write:
- write_reports(payload)
- print(json.dumps(payload, indent=2, sort_keys=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|