build_eth_btc_nextgen_signal_intent.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. from __future__ import annotations
  2. import argparse
  3. import csv
  4. import json
  5. import math
  6. from dataclasses import dataclass
  7. from datetime import UTC, datetime
  8. from pathlib import Path
  9. ROOT = Path(__file__).resolve().parents[1]
  10. CANDLE_DIR = ROOT / "data" / "okx-candles"
  11. REPORT_DIR = ROOT / "reports" / "eth-exploration"
  12. JSON_REPORT = REPORT_DIR / "eth-btc-nextgen-signal-intent.json"
  13. MARKDOWN_REPORT = REPORT_DIR / "eth-btc-nextgen-signal-intent.md"
  14. ETH = "ETH-USDT-SWAP"
  15. BTC = "BTC-USDT-SWAP"
  16. BAR = "15m"
  17. LEVERAGE = 3
  18. @dataclass(frozen=True)
  19. class Candle:
  20. ts: int
  21. open: float
  22. high: float
  23. low: float
  24. close: float
  25. volume: float
  26. @dataclass(frozen=True)
  27. class LegSpec:
  28. leg_id: str
  29. family: str
  30. weight: float
  31. params: dict[str, float | int]
  32. LEGS = (
  33. LegSpec(
  34. leg_id="btc_trend_eth_rsi",
  35. family="btc_trend_eth_rsi",
  36. weight=0.5,
  37. params={
  38. "eth_trend_sma": 50,
  39. "eth_rsi_threshold": 3.0,
  40. "eth_exit_rsi": 55.0,
  41. "btc_trend_sma": 480,
  42. "btc_momentum_lookback": 240,
  43. "btc_min_momentum": 0.0,
  44. },
  45. ),
  46. LegSpec(
  47. leg_id="btc_shock_guard_eth_rsi",
  48. family="btc_shock_guard_eth_rsi",
  49. weight=0.5,
  50. params={
  51. "eth_trend_sma": 50,
  52. "eth_rsi_threshold": 3.0,
  53. "eth_exit_rsi": 55.0,
  54. "btc_trend_sma": 480,
  55. "btc_momentum_lookback": 240,
  56. "btc_min_momentum": 0.01,
  57. "btc_shock_lookback": 96,
  58. "btc_max_realized_vol": 0.01,
  59. "btc_max_drawdown": 0.05,
  60. },
  61. ),
  62. )
  63. def iso_text(ts: int) -> str:
  64. return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
  65. def load_candles(symbol: str, bar: str) -> list[Candle]:
  66. path = CANDLE_DIR / symbol / f"{bar}.csv"
  67. candles: list[Candle] = []
  68. with path.open("r", encoding="utf-8", newline="") as handle:
  69. for row in csv.DictReader(handle):
  70. candles.append(
  71. Candle(
  72. ts=int(row["ts"]),
  73. open=float(row["open"]),
  74. high=float(row["high"]),
  75. low=float(row["low"]),
  76. close=float(row["close"]),
  77. volume=float(row["volume"]),
  78. )
  79. )
  80. return sorted(candles, key=lambda candle: candle.ts)
  81. def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]:
  82. btc_by_ts = {candle.ts: candle for candle in btc}
  83. eth_aligned: list[Candle] = []
  84. btc_aligned: list[Candle] = []
  85. for candle in eth:
  86. btc_candle = btc_by_ts.get(candle.ts)
  87. if btc_candle is not None:
  88. eth_aligned.append(candle)
  89. btc_aligned.append(btc_candle)
  90. return eth_aligned, btc_aligned
  91. def sma(values: list[float], length: int, index: int) -> float:
  92. if index + 1 < length:
  93. return math.nan
  94. return sum(values[index + 1 - length : index + 1]) / length
  95. def rsi(values: list[float], length: int) -> list[float]:
  96. output = [math.nan] * len(values)
  97. if len(values) <= length:
  98. return output
  99. gains = [0.0]
  100. losses = [0.0]
  101. for previous, current in zip(values, values[1:]):
  102. delta = current - previous
  103. gains.append(max(delta, 0.0))
  104. losses.append(max(-delta, 0.0))
  105. average_gain = sum(gains[1 : length + 1]) / length
  106. average_loss = sum(losses[1 : length + 1]) / length
  107. for index in range(length, len(values)):
  108. if index > length:
  109. average_gain = ((average_gain * (length - 1)) + gains[index]) / length
  110. average_loss = ((average_loss * (length - 1)) + losses[index]) / length
  111. if average_loss == 0.0:
  112. output[index] = 100.0 if average_gain > 0.0 else 50.0
  113. else:
  114. relative_strength = average_gain / average_loss
  115. output[index] = 100.0 - (100.0 / (1.0 + relative_strength))
  116. return output
  117. def rolling_std(values: list[float], length: int, index: int) -> float:
  118. if index + 1 < length:
  119. return math.nan
  120. window = values[index + 1 - length : index + 1]
  121. mean = sum(window) / length
  122. variance = sum((value - mean) ** 2 for value in window) / (length - 1)
  123. return math.sqrt(variance)
  124. def rolling_max(values: list[float], length: int, index: int) -> float:
  125. if index + 1 < length:
  126. return math.nan
  127. return max(values[index + 1 - length : index + 1])
  128. def pct_changes(values: list[float]) -> list[float]:
  129. output = [math.nan]
  130. for previous, current in zip(values, values[1:]):
  131. output.append(current / previous - 1.0)
  132. return output
  133. def threshold_delta(value: float, threshold: float, comparator: str) -> dict[str, float | str | bool]:
  134. if comparator == ">":
  135. return {"value": value, "threshold": threshold, "passes": value > threshold, "distance_to_pass": max(threshold - value, 0.0)}
  136. if comparator == ">=":
  137. return {"value": value, "threshold": threshold, "passes": value >= threshold, "distance_to_pass": max(threshold - value, 0.0)}
  138. if comparator == "<=":
  139. return {"value": value, "threshold": threshold, "passes": value <= threshold, "distance_to_pass": max(value - threshold, 0.0)}
  140. raise ValueError(f"unsupported comparator: {comparator}")
  141. def evaluate_leg(spec: LegSpec, eth: list[Candle], btc: list[Candle], index: int) -> dict[str, object]:
  142. eth_closes = [candle.close for candle in eth]
  143. btc_closes = [candle.close for candle in btc]
  144. btc_returns = pct_changes(btc_closes)
  145. eth_trend_sma = int(spec.params["eth_trend_sma"])
  146. btc_trend_sma = int(spec.params["btc_trend_sma"])
  147. btc_momentum_lookback = int(spec.params["btc_momentum_lookback"])
  148. eth_rsi_threshold = float(spec.params["eth_rsi_threshold"])
  149. btc_min_momentum = float(spec.params["btc_min_momentum"])
  150. eth_trend = sma(eth_closes, eth_trend_sma, index)
  151. eth_rsi = rsi(eth_closes[: index + 1], 2)[-1]
  152. btc_trend = sma(btc_closes, btc_trend_sma, index)
  153. btc_momentum = btc[index].close / btc[index - btc_momentum_lookback].close - 1.0
  154. conditions: dict[str, dict[str, float | str | bool]] = {
  155. "eth_close_above_sma50": threshold_delta(eth[index].close, eth_trend, ">"),
  156. "eth_rsi2_at_or_below_3": threshold_delta(eth_rsi, eth_rsi_threshold, "<="),
  157. "btc_close_above_sma480": threshold_delta(btc[index].close, btc_trend, ">"),
  158. "btc_momentum_at_or_above_min": threshold_delta(btc_momentum, btc_min_momentum, ">="),
  159. }
  160. indicators: dict[str, float | bool] = {
  161. "eth_close": eth[index].close,
  162. "eth_sma50": eth_trend,
  163. "eth_rsi2": eth_rsi,
  164. "btc_close": btc[index].close,
  165. "btc_sma480": btc_trend,
  166. "btc_momentum_240": btc_momentum,
  167. }
  168. entry_rule = "eth_close > eth_sma50 and eth_rsi2 <= 3 and btc_close > btc_sma480 and btc_momentum_240 >= minimum"
  169. if spec.family == "btc_shock_guard_eth_rsi":
  170. shock_lookback = int(spec.params["btc_shock_lookback"])
  171. btc_max_realized_vol = float(spec.params["btc_max_realized_vol"])
  172. btc_max_drawdown = float(spec.params["btc_max_drawdown"])
  173. btc_realized_vol = rolling_std(btc_returns, shock_lookback, index)
  174. btc_recent_high = rolling_max(btc_closes, shock_lookback, index)
  175. btc_drawdown = btc[index].close / btc_recent_high - 1.0
  176. conditions["btc_realized_vol_at_or_below_max"] = threshold_delta(btc_realized_vol, btc_max_realized_vol, "<=")
  177. conditions["btc_drawdown_at_or_above_floor"] = threshold_delta(btc_drawdown, -btc_max_drawdown, ">=")
  178. indicators["btc_realized_vol_96"] = btc_realized_vol
  179. indicators["btc_recent_high_96"] = btc_recent_high
  180. indicators["btc_drawdown_96"] = btc_drawdown
  181. entry_rule += " and btc_realized_vol_96 <= 0.01 and btc_drawdown_96 >= -0.05"
  182. signal = all(bool(condition["passes"]) for condition in conditions.values())
  183. exit_signal = eth_rsi >= float(spec.params["eth_exit_rsi"]) or btc[index].close < btc_trend
  184. if spec.family == "btc_shock_guard_eth_rsi":
  185. exit_signal = exit_signal or not bool(conditions["btc_realized_vol_at_or_below_max"]["passes"]) or not bool(
  186. conditions["btc_drawdown_at_or_above_floor"]["passes"]
  187. )
  188. return {
  189. "leg_id": spec.leg_id,
  190. "family": spec.family,
  191. "symbol": ETH,
  192. "bar": BAR,
  193. "suggested_weight": spec.weight,
  194. "direction": "long",
  195. "signal": signal,
  196. "intent": "long" if signal else "no_signal",
  197. "dry_run_action": "observe_long_signal" if signal else "observe_no_signal",
  198. "entry_rule": entry_rule,
  199. "exit_rule": "eth_rsi2 >= exit_rsi or btc_close < btc_sma480; shock leg also exits when shock guard fails",
  200. "exit_signal": exit_signal,
  201. "params": spec.params,
  202. "indicators": indicators,
  203. "conditions": conditions,
  204. }
  205. def build_payload() -> dict[str, object]:
  206. eth, btc = align_pair(load_candles(ETH, BAR), load_candles(BTC, BAR))
  207. minimum_history = max(
  208. max(int(leg.params["eth_trend_sma"]), int(leg.params["btc_trend_sma"]), int(leg.params["btc_momentum_lookback"])) for leg in LEGS
  209. )
  210. if len(eth) < minimum_history + 2:
  211. raise ValueError("not enough aligned ETH/BTC candles")
  212. decision_index = len(eth) - 2
  213. latest = eth[-1]
  214. decision = eth[decision_index]
  215. legs = [evaluate_leg(leg, eth, btc, decision_index) for leg in LEGS]
  216. active_weight = sum(float(leg["suggested_weight"]) for leg in legs if leg["signal"])
  217. signal = "long" if active_weight > 0.0 else "no_signal"
  218. return {
  219. "mode": "readonly_signal_intent",
  220. "strategy": {
  221. "name": "eth-btc-nextgen equal-2-c0003",
  222. "symbol": ETH,
  223. "bar": BAR,
  224. "direction": "long_only",
  225. "short_supported": False,
  226. "leverage_observation_reference": LEVERAGE,
  227. "source_candidate": "reports/eth-exploration/eth-btc-nextgen-portfolios.csv:equal-2-c0003",
  228. },
  229. "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
  230. "submitted_orders": 0,
  231. "private_key_required": False,
  232. "order_client": None,
  233. "data": {
  234. "source": "local_csv",
  235. "eth_candles": str((CANDLE_DIR / ETH / f"{BAR}.csv").relative_to(ROOT)),
  236. "btc_candles": str((CANDLE_DIR / BTC / f"{BAR}.csv").relative_to(ROOT)),
  237. "aligned_candles": len(eth),
  238. "latest_aligned_candle_ts": latest.ts,
  239. "latest_aligned_candle_time": iso_text(latest.ts),
  240. "decision_candle_ts": decision.ts,
  241. "decision_candle_time": iso_text(decision.ts),
  242. "decision_rule": "use the aligned candle immediately before the latest aligned local candle",
  243. },
  244. "decision": {
  245. "signal": signal,
  246. "active_signal_count": sum(1 for leg in legs if leg["signal"]),
  247. "active_suggested_weight": active_weight,
  248. "needs_order": False,
  249. "needs_cancel": False,
  250. "intent": "observe_long_signal" if signal == "long" else "observe_no_signal",
  251. },
  252. "observation_parameters": {
  253. "execution": "no_order_submission",
  254. "purpose": "small-capital futures observation candidate only after separate order-path implementation",
  255. "candidate_cost_model": "maker_taker",
  256. "candidate_roundtrip_cost_on_margin": 0.0021,
  257. "portfolio_weighting": "equal 0.5 / 0.5",
  258. "position_direction_to_observe": signal,
  259. "bar_close_confirmation": "15m aligned ETH/BTC local candles",
  260. "state_assumption": "no live position state read or assumed",
  261. },
  262. "readiness_check": {
  263. "can_be_used_for_later_small_capital_futures_observation": True,
  264. "reason": "signal rules are closed over local public candles and produce no order or cancel payload",
  265. "blocked_for_live_trading": True,
  266. "blocker": "this script intentionally has no OKX private client, order sizing, or submit path",
  267. },
  268. "legs": legs,
  269. }
  270. def markdown_report(payload: dict[str, object]) -> str:
  271. lines = [
  272. "# ETH BTC nextgen signal intent",
  273. "",
  274. "Read-only signal intent. No order or cancel request was submitted.",
  275. "",
  276. "## Decision",
  277. "",
  278. f"- Created at: `{payload['created_at']}`",
  279. f"- Strategy: `{payload['strategy']['name']}`",
  280. f"- Signal: `{payload['decision']['signal']}`",
  281. f"- Active signal count: `{payload['decision']['active_signal_count']}`",
  282. f"- Active suggested weight: `{payload['decision']['active_suggested_weight']:.8f}`",
  283. f"- Decision candle: `{payload['data']['decision_candle_time']}` (`{payload['data']['decision_candle_ts']}`)",
  284. f"- Latest aligned candle: `{payload['data']['latest_aligned_candle_time']}` (`{payload['data']['latest_aligned_candle_ts']}`)",
  285. "",
  286. "## Legs",
  287. "",
  288. "| Leg | Signal | Weight | ETH close | ETH RSI2 | BTC momentum 240 | Action |",
  289. "| --- | --- | --- | --- | --- | --- | --- |",
  290. ]
  291. for leg in payload["legs"]:
  292. indicators = leg["indicators"]
  293. lines.append(
  294. f"| `{leg['leg_id']}` | `{leg['signal']}` | `{leg['suggested_weight']:.8f}` | `{indicators['eth_close']}` | `{indicators['eth_rsi2']}` | `{indicators['btc_momentum_240']}` | `{leg['dry_run_action']}` |"
  295. )
  296. lines.extend(["", "## Trigger Distance", ""])
  297. for leg in payload["legs"]:
  298. lines.extend([f"### {leg['leg_id']}", "", "| Condition | Value | Threshold | Passes | Distance to pass |", "| --- | --- | --- | --- | --- |"])
  299. for name, condition in leg["conditions"].items():
  300. lines.append(
  301. f"| `{name}` | `{condition['value']}` | `{condition['threshold']}` | `{condition['passes']}` | `{condition['distance_to_pass']}` |"
  302. )
  303. lines.append("")
  304. lines.extend(
  305. [
  306. "## Observation",
  307. "",
  308. f"- Can be used for later small-capital futures observation: `{payload['readiness_check']['can_be_used_for_later_small_capital_futures_observation']}`",
  309. f"- Live trading blocked: `{payload['readiness_check']['blocked_for_live_trading']}`",
  310. f"- Execution: `{payload['observation_parameters']['execution']}`",
  311. "",
  312. "## Intent JSON",
  313. "",
  314. "```json",
  315. json.dumps(payload, indent=2, sort_keys=True),
  316. "```",
  317. ]
  318. )
  319. return "\n".join(lines) + "\n"
  320. def main() -> int:
  321. parser = argparse.ArgumentParser(description="Build read-only ETH/BTC nextgen signal intent.")
  322. parser.add_argument("--no-write", action="store_true")
  323. args = parser.parse_args()
  324. payload = build_payload()
  325. if not args.no_write:
  326. REPORT_DIR.mkdir(parents=True, exist_ok=True)
  327. JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  328. MARKDOWN_REPORT.write_text(markdown_report(payload), encoding="utf-8")
  329. print(json.dumps(payload, indent=2, sort_keys=True))
  330. return 0
  331. if __name__ == "__main__":
  332. raise SystemExit(main())