build_eth_focused_portfolio_signal_intent.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. from __future__ import annotations
  2. import argparse
  3. import csv
  4. import json
  5. import math
  6. from dataclasses import dataclass
  7. from datetime import UTC, datetime
  8. from pathlib import Path
  9. ROOT = Path(__file__).resolve().parents[1]
  10. CANDLE_DIR = ROOT / "data" / "okx-candles"
  11. REPORT_DIR = ROOT / "reports" / "eth-exploration"
  12. JSON_REPORT = REPORT_DIR / "eth-focused-portfolio-signal-intent.json"
  13. MARKDOWN_REPORT = REPORT_DIR / "eth-focused-portfolio-signal-intent.md"
  14. ETH = "ETH-USDT-SWAP"
  15. BTC = "BTC-USDT-SWAP"
  16. LEVERAGE = 3
  17. @dataclass(frozen=True)
  18. class Candle:
  19. ts: int
  20. open: float
  21. high: float
  22. low: float
  23. close: float
  24. volume: float
  25. @dataclass(frozen=True)
  26. class LegSpec:
  27. leg_id: str
  28. family: str
  29. bar: str
  30. weight: float
  31. params: dict[str, float | int]
  32. LEGS = (
  33. LegSpec(
  34. leg_id="eth_btc_rsi_filter_15m",
  35. family="eth_btc_rsi_filter",
  36. bar="15m",
  37. weight=0.80314757,
  38. params={
  39. "eth_trend_sma": 50,
  40. "eth_rsi_threshold": 3.0,
  41. "eth_exit_rsi": 55.0,
  42. "btc_trend_sma": 480,
  43. "btc_momentum_lookback": 240,
  44. "btc_min_momentum": 0.0,
  45. },
  46. ),
  47. LegSpec(
  48. leg_id="btc_lead_eth_lag_15m",
  49. family="btc_lead_eth_lag",
  50. bar="15m",
  51. weight=0.09459139,
  52. params={
  53. "lead_lookback": 8,
  54. "btc_return_threshold": 0.018,
  55. "lag_gap": 0.006,
  56. "max_hold_bars": 8,
  57. "stop_loss_pct": 0.006,
  58. "take_profit_pct": 0.018,
  59. },
  60. ),
  61. LegSpec(
  62. leg_id="btc_lead_eth_lag_5m",
  63. family="btc_lead_eth_lag",
  64. bar="5m",
  65. weight=0.10226104,
  66. params={
  67. "lead_lookback": 16,
  68. "btc_return_threshold": 0.012,
  69. "lag_gap": 0.006,
  70. "max_hold_bars": 8,
  71. "stop_loss_pct": 0.006,
  72. "take_profit_pct": 0.018,
  73. },
  74. ),
  75. )
  76. def iso_text(ts: int) -> str:
  77. return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
  78. def load_candles(symbol: str, bar: str) -> list[Candle]:
  79. path = CANDLE_DIR / symbol / f"{bar}.csv"
  80. candles: list[Candle] = []
  81. with path.open("r", encoding="utf-8", newline="") as handle:
  82. for row in csv.DictReader(handle):
  83. candles.append(
  84. Candle(
  85. ts=int(row["ts"]),
  86. open=float(row["open"]),
  87. high=float(row["high"]),
  88. low=float(row["low"]),
  89. close=float(row["close"]),
  90. volume=float(row["volume"]),
  91. )
  92. )
  93. return sorted(candles, key=lambda candle: candle.ts)
  94. def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]:
  95. btc_by_ts = {candle.ts: candle for candle in btc}
  96. eth_aligned: list[Candle] = []
  97. btc_aligned: list[Candle] = []
  98. for candle in eth:
  99. btc_candle = btc_by_ts.get(candle.ts)
  100. if btc_candle is not None:
  101. eth_aligned.append(candle)
  102. btc_aligned.append(btc_candle)
  103. return eth_aligned, btc_aligned
  104. def sma(values: list[float], length: int, index: int) -> float:
  105. if index + 1 < length:
  106. return math.nan
  107. return sum(values[index + 1 - length : index + 1]) / length
  108. def rsi(values: list[float], length: int) -> list[float]:
  109. output = [math.nan] * len(values)
  110. if len(values) <= length:
  111. return output
  112. gains = [0.0]
  113. losses = [0.0]
  114. for previous, current in zip(values, values[1:]):
  115. delta = current - previous
  116. gains.append(max(delta, 0.0))
  117. losses.append(max(-delta, 0.0))
  118. average_gain = sum(gains[1 : length + 1]) / length
  119. average_loss = sum(losses[1 : length + 1]) / length
  120. for index in range(length, len(values)):
  121. if index > length:
  122. average_gain = ((average_gain * (length - 1)) + gains[index]) / length
  123. average_loss = ((average_loss * (length - 1)) + losses[index]) / length
  124. if average_loss == 0.0:
  125. output[index] = 100.0 if average_gain > 0.0 else 50.0
  126. else:
  127. relative_strength = average_gain / average_loss
  128. output[index] = 100.0 - (100.0 / (1.0 + relative_strength))
  129. return output
  130. def eth_btc_rsi_filter_signal(spec: LegSpec, eth: list[Candle], btc: list[Candle], index: int) -> dict[str, object]:
  131. eth_closes = [candle.close for candle in eth]
  132. btc_closes = [candle.close for candle in btc]
  133. eth_trend_sma = int(spec.params["eth_trend_sma"])
  134. btc_trend_sma = int(spec.params["btc_trend_sma"])
  135. btc_momentum_lookback = int(spec.params["btc_momentum_lookback"])
  136. eth_trend = sma(eth_closes, eth_trend_sma, index)
  137. btc_trend = sma(btc_closes, btc_trend_sma, index)
  138. eth_rsi = rsi(eth_closes[: index + 1], 2)[-1]
  139. btc_momentum = btc[index].close / btc[index - btc_momentum_lookback].close - 1.0
  140. btc_risk_on = btc[index].close > btc_trend and btc_momentum >= float(spec.params["btc_min_momentum"])
  141. eth_pullback = eth[index].close > eth_trend and eth_rsi <= float(spec.params["eth_rsi_threshold"])
  142. signal = btc_risk_on and eth_pullback
  143. exit_signal = eth_rsi >= float(spec.params["eth_exit_rsi"]) or btc[index].close < btc_trend
  144. return {
  145. "signal": signal,
  146. "entry_rule": "btc_close > btc_sma and btc_momentum >= minimum and eth_close > eth_sma and eth_rsi2 <= threshold",
  147. "exit_signal": exit_signal,
  148. "indicators": {
  149. "eth_close": eth[index].close,
  150. "eth_sma": eth_trend,
  151. "eth_rsi2": eth_rsi,
  152. "btc_close": btc[index].close,
  153. "btc_sma": btc_trend,
  154. "btc_momentum": btc_momentum,
  155. },
  156. }
  157. def btc_lead_eth_lag_signal(spec: LegSpec, eth: list[Candle], btc: list[Candle], index: int) -> dict[str, object]:
  158. lead_lookback = int(spec.params["lead_lookback"])
  159. btc_return = btc[index].close / btc[index - lead_lookback].close - 1.0
  160. eth_return = eth[index].close / eth[index - lead_lookback].close - 1.0
  161. return_gap = btc_return - eth_return
  162. signal = btc_return >= float(spec.params["btc_return_threshold"]) and return_gap >= float(spec.params["lag_gap"])
  163. return {
  164. "signal": signal,
  165. "entry_rule": "btc_return >= threshold and btc_return - eth_return >= lag_gap",
  166. "exit_signal": False,
  167. "indicators": {
  168. "eth_close": eth[index].close,
  169. "btc_close": btc[index].close,
  170. "btc_return": btc_return,
  171. "eth_return": eth_return,
  172. "return_gap": return_gap,
  173. },
  174. }
  175. def evaluate_leg(spec: LegSpec, data: dict[tuple[str, str], list[Candle]]) -> dict[str, object]:
  176. eth, btc = align_pair(data[(ETH, spec.bar)], data[(BTC, spec.bar)])
  177. index = len(eth) - 2
  178. if spec.family == "eth_btc_rsi_filter":
  179. decision = eth_btc_rsi_filter_signal(spec, eth, btc, index)
  180. else:
  181. decision = btc_lead_eth_lag_signal(spec, eth, btc, index)
  182. stop_loss_pct = spec.params.get("stop_loss_pct")
  183. take_profit_pct = spec.params.get("take_profit_pct")
  184. return {
  185. "leg_id": spec.leg_id,
  186. "family": spec.family,
  187. "symbol": ETH,
  188. "bar": spec.bar,
  189. "decision_candle_ts": eth[index].ts,
  190. "decision_candle_time": iso_text(eth[index].ts),
  191. "latest_local_candle_ts": eth[-1].ts,
  192. "latest_local_candle_time": iso_text(eth[-1].ts),
  193. "suggested_weight": spec.weight,
  194. "signal": decision["signal"],
  195. "needs_order": bool(decision["signal"]),
  196. "needs_cancel": False,
  197. "dry_run_action": "would_open_long" if decision["signal"] else "hold",
  198. "risk_limits": {
  199. "leverage": LEVERAGE,
  200. "max_weight": spec.weight,
  201. "stop_loss_pct": stop_loss_pct,
  202. "take_profit_pct": take_profit_pct,
  203. "max_hold_bars": spec.params.get("max_hold_bars"),
  204. },
  205. "params": spec.params,
  206. "entry_rule": decision["entry_rule"],
  207. "exit_signal": decision["exit_signal"],
  208. "indicators": decision["indicators"],
  209. }
  210. def build_payload() -> dict[str, object]:
  211. bars = sorted({leg.bar for leg in LEGS})
  212. data = {(symbol, bar): load_candles(symbol, bar) for symbol in (ETH, BTC) for bar in bars}
  213. legs = [evaluate_leg(spec, data) for spec in LEGS]
  214. active_weight = sum(float(leg["suggested_weight"]) for leg in legs if leg["signal"])
  215. return {
  216. "mode": "dry_run_readonly_portfolio_signal_intent",
  217. "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
  218. "submitted_orders": 0,
  219. "order_client": None,
  220. "private_key_required": False,
  221. "portfolio": {
  222. "name": "eth_focused_conservative_signal_intent",
  223. "symbol": ETH,
  224. "direction": "long_only",
  225. "basis": "ETH/BTC RSI filter + BTC lead ETH lag 5m/15m",
  226. "leverage": LEVERAGE,
  227. "active_signal_count": sum(1 for leg in legs if leg["signal"]),
  228. "active_suggested_weight": active_weight,
  229. "needs_order": any(bool(leg["needs_order"]) for leg in legs),
  230. "needs_cancel": any(bool(leg["needs_cancel"]) for leg in legs),
  231. "dry_run_action": "would_open_or_rebalance_long" if active_weight > 0.0 else "hold",
  232. },
  233. "risk_limits": {
  234. "portfolio_max_gross_weight": 1.0,
  235. "leg_weights_sum": sum(spec.weight for spec in LEGS),
  236. "no_order_submission": True,
  237. "no_cancel_submission": True,
  238. "no_position_state_assumed": True,
  239. "execution": "intent_only",
  240. },
  241. "legs": legs,
  242. }
  243. def markdown_report(payload: dict[str, object]) -> str:
  244. lines = [
  245. "# ETH-focused portfolio signal intent",
  246. "",
  247. "Dry-run only. No order or cancel request was submitted.",
  248. "",
  249. "## Portfolio",
  250. "",
  251. f"- Created at: `{payload['created_at']}`",
  252. f"- Direction: `{payload['portfolio']['direction']}`",
  253. f"- Active signal count: `{payload['portfolio']['active_signal_count']}`",
  254. f"- Active suggested weight: `{payload['portfolio']['active_suggested_weight']:.8f}`",
  255. f"- Needs order: `{payload['portfolio']['needs_order']}`",
  256. f"- Needs cancel: `{payload['portfolio']['needs_cancel']}`",
  257. "",
  258. "## Legs",
  259. "",
  260. "| Leg | Bar | Signal | Weight | Action | Decision candle |",
  261. "| --- | --- | --- | --- | --- | --- |",
  262. ]
  263. for leg in payload["legs"]:
  264. lines.append(
  265. f"| `{leg['leg_id']}` | `{leg['bar']}` | `{leg['signal']}` | `{leg['suggested_weight']:.8f}` | `{leg['dry_run_action']}` | `{leg['decision_candle_time']}` |"
  266. )
  267. lines.extend(
  268. [
  269. "",
  270. "## Intent JSON",
  271. "",
  272. "```json",
  273. json.dumps(payload, indent=2, sort_keys=True),
  274. "```",
  275. ]
  276. )
  277. return "\n".join(lines) + "\n"
  278. def main() -> int:
  279. parser = argparse.ArgumentParser(description="Build a read-only ETH-focused portfolio signal/intent payload.")
  280. parser.add_argument("--no-write", action="store_true")
  281. args = parser.parse_args()
  282. payload = build_payload()
  283. if not args.no_write:
  284. REPORT_DIR.mkdir(parents=True, exist_ok=True)
  285. JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
  286. MARKDOWN_REPORT.write_text(markdown_report(payload), encoding="utf-8")
  287. print(json.dumps(payload, indent=2, sort_keys=True))
  288. return 0
  289. if __name__ == "__main__":
  290. raise SystemExit(main())