| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- from __future__ import annotations
- import argparse
- import csv
- import json
- import math
- from dataclasses import dataclass
- from datetime import UTC, datetime
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- CANDLE_DIR = ROOT / "data" / "okx-candles"
- REPORT_DIR = ROOT / "reports" / "eth-exploration"
- JSON_REPORT = REPORT_DIR / "eth-btc-nextgen-signal-intent.json"
- MARKDOWN_REPORT = REPORT_DIR / "eth-btc-nextgen-signal-intent.md"
- ETH = "ETH-USDT-SWAP"
- BTC = "BTC-USDT-SWAP"
- BAR = "15m"
- LEVERAGE = 3
- @dataclass(frozen=True)
- class Candle:
- ts: int
- open: float
- high: float
- low: float
- close: float
- volume: float
- @dataclass(frozen=True)
- class LegSpec:
- leg_id: str
- family: str
- weight: float
- params: dict[str, float | int]
- LEGS = (
- LegSpec(
- leg_id="btc_trend_eth_rsi",
- family="btc_trend_eth_rsi",
- weight=0.5,
- params={
- "eth_trend_sma": 50,
- "eth_rsi_threshold": 3.0,
- "eth_exit_rsi": 55.0,
- "btc_trend_sma": 480,
- "btc_momentum_lookback": 240,
- "btc_min_momentum": 0.0,
- },
- ),
- LegSpec(
- leg_id="btc_shock_guard_eth_rsi",
- family="btc_shock_guard_eth_rsi",
- weight=0.5,
- params={
- "eth_trend_sma": 50,
- "eth_rsi_threshold": 3.0,
- "eth_exit_rsi": 55.0,
- "btc_trend_sma": 480,
- "btc_momentum_lookback": 240,
- "btc_min_momentum": 0.01,
- "btc_shock_lookback": 96,
- "btc_max_realized_vol": 0.01,
- "btc_max_drawdown": 0.05,
- },
- ),
- )
- def iso_text(ts: int) -> str:
- return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z")
- def load_candles(symbol: str, bar: str) -> list[Candle]:
- path = CANDLE_DIR / symbol / f"{bar}.csv"
- candles: list[Candle] = []
- with path.open("r", encoding="utf-8", newline="") as handle:
- for row in csv.DictReader(handle):
- candles.append(
- Candle(
- ts=int(row["ts"]),
- open=float(row["open"]),
- high=float(row["high"]),
- low=float(row["low"]),
- close=float(row["close"]),
- volume=float(row["volume"]),
- )
- )
- return sorted(candles, key=lambda candle: candle.ts)
- def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]:
- btc_by_ts = {candle.ts: candle for candle in btc}
- eth_aligned: list[Candle] = []
- btc_aligned: list[Candle] = []
- for candle in eth:
- btc_candle = btc_by_ts.get(candle.ts)
- if btc_candle is not None:
- eth_aligned.append(candle)
- btc_aligned.append(btc_candle)
- return eth_aligned, btc_aligned
- def sma(values: list[float], length: int, index: int) -> float:
- if index + 1 < length:
- return math.nan
- return sum(values[index + 1 - length : index + 1]) / length
- def rsi(values: list[float], length: int) -> list[float]:
- output = [math.nan] * len(values)
- if len(values) <= length:
- return output
- gains = [0.0]
- losses = [0.0]
- for previous, current in zip(values, values[1:]):
- delta = current - previous
- gains.append(max(delta, 0.0))
- losses.append(max(-delta, 0.0))
- average_gain = sum(gains[1 : length + 1]) / length
- average_loss = sum(losses[1 : length + 1]) / length
- for index in range(length, len(values)):
- if index > length:
- average_gain = ((average_gain * (length - 1)) + gains[index]) / length
- average_loss = ((average_loss * (length - 1)) + losses[index]) / length
- if average_loss == 0.0:
- output[index] = 100.0 if average_gain > 0.0 else 50.0
- else:
- relative_strength = average_gain / average_loss
- output[index] = 100.0 - (100.0 / (1.0 + relative_strength))
- return output
- def rolling_std(values: list[float], length: int, index: int) -> float:
- if index + 1 < length:
- return math.nan
- window = values[index + 1 - length : index + 1]
- mean = sum(window) / length
- variance = sum((value - mean) ** 2 for value in window) / (length - 1)
- return math.sqrt(variance)
- def rolling_max(values: list[float], length: int, index: int) -> float:
- if index + 1 < length:
- return math.nan
- return max(values[index + 1 - length : index + 1])
- def pct_changes(values: list[float]) -> list[float]:
- output = [math.nan]
- for previous, current in zip(values, values[1:]):
- output.append(current / previous - 1.0)
- return output
- def threshold_delta(value: float, threshold: float, comparator: str) -> dict[str, float | str | bool]:
- if comparator == ">":
- return {"value": value, "threshold": threshold, "passes": value > threshold, "distance_to_pass": max(threshold - value, 0.0)}
- if comparator == ">=":
- return {"value": value, "threshold": threshold, "passes": value >= threshold, "distance_to_pass": max(threshold - value, 0.0)}
- if comparator == "<=":
- return {"value": value, "threshold": threshold, "passes": value <= threshold, "distance_to_pass": max(value - threshold, 0.0)}
- raise ValueError(f"unsupported comparator: {comparator}")
- def evaluate_leg(spec: LegSpec, eth: list[Candle], btc: list[Candle], index: int) -> dict[str, object]:
- eth_closes = [candle.close for candle in eth]
- btc_closes = [candle.close for candle in btc]
- btc_returns = pct_changes(btc_closes)
- eth_trend_sma = int(spec.params["eth_trend_sma"])
- btc_trend_sma = int(spec.params["btc_trend_sma"])
- btc_momentum_lookback = int(spec.params["btc_momentum_lookback"])
- eth_rsi_threshold = float(spec.params["eth_rsi_threshold"])
- btc_min_momentum = float(spec.params["btc_min_momentum"])
- eth_trend = sma(eth_closes, eth_trend_sma, index)
- eth_rsi = rsi(eth_closes[: index + 1], 2)[-1]
- btc_trend = sma(btc_closes, btc_trend_sma, index)
- btc_momentum = btc[index].close / btc[index - btc_momentum_lookback].close - 1.0
- conditions: dict[str, dict[str, float | str | bool]] = {
- "eth_close_above_sma50": threshold_delta(eth[index].close, eth_trend, ">"),
- "eth_rsi2_at_or_below_3": threshold_delta(eth_rsi, eth_rsi_threshold, "<="),
- "btc_close_above_sma480": threshold_delta(btc[index].close, btc_trend, ">"),
- "btc_momentum_at_or_above_min": threshold_delta(btc_momentum, btc_min_momentum, ">="),
- }
- indicators: dict[str, float | bool] = {
- "eth_close": eth[index].close,
- "eth_sma50": eth_trend,
- "eth_rsi2": eth_rsi,
- "btc_close": btc[index].close,
- "btc_sma480": btc_trend,
- "btc_momentum_240": btc_momentum,
- }
- entry_rule = "eth_close > eth_sma50 and eth_rsi2 <= 3 and btc_close > btc_sma480 and btc_momentum_240 >= minimum"
- if spec.family == "btc_shock_guard_eth_rsi":
- shock_lookback = int(spec.params["btc_shock_lookback"])
- btc_max_realized_vol = float(spec.params["btc_max_realized_vol"])
- btc_max_drawdown = float(spec.params["btc_max_drawdown"])
- btc_realized_vol = rolling_std(btc_returns, shock_lookback, index)
- btc_recent_high = rolling_max(btc_closes, shock_lookback, index)
- btc_drawdown = btc[index].close / btc_recent_high - 1.0
- conditions["btc_realized_vol_at_or_below_max"] = threshold_delta(btc_realized_vol, btc_max_realized_vol, "<=")
- conditions["btc_drawdown_at_or_above_floor"] = threshold_delta(btc_drawdown, -btc_max_drawdown, ">=")
- indicators["btc_realized_vol_96"] = btc_realized_vol
- indicators["btc_recent_high_96"] = btc_recent_high
- indicators["btc_drawdown_96"] = btc_drawdown
- entry_rule += " and btc_realized_vol_96 <= 0.01 and btc_drawdown_96 >= -0.05"
- signal = all(bool(condition["passes"]) for condition in conditions.values())
- exit_signal = eth_rsi >= float(spec.params["eth_exit_rsi"]) or btc[index].close < btc_trend
- if spec.family == "btc_shock_guard_eth_rsi":
- exit_signal = exit_signal or not bool(conditions["btc_realized_vol_at_or_below_max"]["passes"]) or not bool(
- conditions["btc_drawdown_at_or_above_floor"]["passes"]
- )
- return {
- "leg_id": spec.leg_id,
- "family": spec.family,
- "symbol": ETH,
- "bar": BAR,
- "suggested_weight": spec.weight,
- "direction": "long",
- "signal": signal,
- "intent": "long" if signal else "no_signal",
- "dry_run_action": "observe_long_signal" if signal else "observe_no_signal",
- "entry_rule": entry_rule,
- "exit_rule": "eth_rsi2 >= exit_rsi or btc_close < btc_sma480; shock leg also exits when shock guard fails",
- "exit_signal": exit_signal,
- "params": spec.params,
- "indicators": indicators,
- "conditions": conditions,
- }
- def build_payload() -> dict[str, object]:
- eth, btc = align_pair(load_candles(ETH, BAR), load_candles(BTC, BAR))
- minimum_history = max(
- max(int(leg.params["eth_trend_sma"]), int(leg.params["btc_trend_sma"]), int(leg.params["btc_momentum_lookback"])) for leg in LEGS
- )
- if len(eth) < minimum_history + 2:
- raise ValueError("not enough aligned ETH/BTC candles")
- decision_index = len(eth) - 2
- latest = eth[-1]
- decision = eth[decision_index]
- legs = [evaluate_leg(leg, eth, btc, decision_index) for leg in LEGS]
- active_weight = sum(float(leg["suggested_weight"]) for leg in legs if leg["signal"])
- signal = "long" if active_weight > 0.0 else "no_signal"
- return {
- "mode": "readonly_signal_intent",
- "strategy": {
- "name": "eth-btc-nextgen equal-2-c0003",
- "symbol": ETH,
- "bar": BAR,
- "direction": "long_only",
- "short_supported": False,
- "leverage_observation_reference": LEVERAGE,
- "source_candidate": "reports/eth-exploration/eth-btc-nextgen-portfolios.csv:equal-2-c0003",
- },
- "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
- "submitted_orders": 0,
- "private_key_required": False,
- "order_client": None,
- "data": {
- "source": "local_csv",
- "eth_candles": str((CANDLE_DIR / ETH / f"{BAR}.csv").relative_to(ROOT)),
- "btc_candles": str((CANDLE_DIR / BTC / f"{BAR}.csv").relative_to(ROOT)),
- "aligned_candles": len(eth),
- "latest_aligned_candle_ts": latest.ts,
- "latest_aligned_candle_time": iso_text(latest.ts),
- "decision_candle_ts": decision.ts,
- "decision_candle_time": iso_text(decision.ts),
- "decision_rule": "use the aligned candle immediately before the latest aligned local candle",
- },
- "decision": {
- "signal": signal,
- "active_signal_count": sum(1 for leg in legs if leg["signal"]),
- "active_suggested_weight": active_weight,
- "needs_order": False,
- "needs_cancel": False,
- "intent": "observe_long_signal" if signal == "long" else "observe_no_signal",
- },
- "observation_parameters": {
- "execution": "no_order_submission",
- "purpose": "small-capital futures observation candidate only after separate order-path implementation",
- "candidate_cost_model": "maker_taker",
- "candidate_roundtrip_cost_on_margin": 0.0021,
- "portfolio_weighting": "equal 0.5 / 0.5",
- "position_direction_to_observe": signal,
- "bar_close_confirmation": "15m aligned ETH/BTC local candles",
- "state_assumption": "no live position state read or assumed",
- },
- "readiness_check": {
- "can_be_used_for_later_small_capital_futures_observation": True,
- "reason": "signal rules are closed over local public candles and produce no order or cancel payload",
- "blocked_for_live_trading": True,
- "blocker": "this script intentionally has no OKX private client, order sizing, or submit path",
- },
- "legs": legs,
- }
- def markdown_report(payload: dict[str, object]) -> str:
- lines = [
- "# ETH BTC nextgen signal intent",
- "",
- "Read-only signal intent. No order or cancel request was submitted.",
- "",
- "## Decision",
- "",
- f"- Created at: `{payload['created_at']}`",
- f"- Strategy: `{payload['strategy']['name']}`",
- f"- Signal: `{payload['decision']['signal']}`",
- f"- Active signal count: `{payload['decision']['active_signal_count']}`",
- f"- Active suggested weight: `{payload['decision']['active_suggested_weight']:.8f}`",
- f"- Decision candle: `{payload['data']['decision_candle_time']}` (`{payload['data']['decision_candle_ts']}`)",
- f"- Latest aligned candle: `{payload['data']['latest_aligned_candle_time']}` (`{payload['data']['latest_aligned_candle_ts']}`)",
- "",
- "## Legs",
- "",
- "| Leg | Signal | Weight | ETH close | ETH RSI2 | BTC momentum 240 | Action |",
- "| --- | --- | --- | --- | --- | --- | --- |",
- ]
- for leg in payload["legs"]:
- indicators = leg["indicators"]
- lines.append(
- f"| `{leg['leg_id']}` | `{leg['signal']}` | `{leg['suggested_weight']:.8f}` | `{indicators['eth_close']}` | `{indicators['eth_rsi2']}` | `{indicators['btc_momentum_240']}` | `{leg['dry_run_action']}` |"
- )
- lines.extend(["", "## Trigger Distance", ""])
- for leg in payload["legs"]:
- lines.extend([f"### {leg['leg_id']}", "", "| Condition | Value | Threshold | Passes | Distance to pass |", "| --- | --- | --- | --- | --- |"])
- for name, condition in leg["conditions"].items():
- lines.append(
- f"| `{name}` | `{condition['value']}` | `{condition['threshold']}` | `{condition['passes']}` | `{condition['distance_to_pass']}` |"
- )
- lines.append("")
- lines.extend(
- [
- "## Observation",
- "",
- f"- Can be used for later small-capital futures observation: `{payload['readiness_check']['can_be_used_for_later_small_capital_futures_observation']}`",
- f"- Live trading blocked: `{payload['readiness_check']['blocked_for_live_trading']}`",
- f"- Execution: `{payload['observation_parameters']['execution']}`",
- "",
- "## Intent JSON",
- "",
- "```json",
- json.dumps(payload, indent=2, sort_keys=True),
- "```",
- ]
- )
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser(description="Build read-only ETH/BTC nextgen signal intent.")
- parser.add_argument("--no-write", action="store_true")
- args = parser.parse_args()
- payload = build_payload()
- if not args.no_write:
- REPORT_DIR.mkdir(parents=True, exist_ok=True)
- JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- MARKDOWN_REPORT.write_text(markdown_report(payload), encoding="utf-8")
- print(json.dumps(payload, indent=2, sort_keys=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|