from __future__ import annotations import argparse import csv import json import math from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path ROOT = Path(__file__).resolve().parents[1] CANDLE_DIR = ROOT / "data" / "okx-candles" REPORT_DIR = ROOT / "reports" / "eth-exploration" JSON_REPORT = REPORT_DIR / "eth-btc-nextgen-signal-intent.json" MARKDOWN_REPORT = REPORT_DIR / "eth-btc-nextgen-signal-intent.md" ETH = "ETH-USDT-SWAP" BTC = "BTC-USDT-SWAP" BAR = "15m" LEVERAGE = 3 @dataclass(frozen=True) class Candle: ts: int open: float high: float low: float close: float volume: float @dataclass(frozen=True) class LegSpec: leg_id: str family: str weight: float params: dict[str, float | int] LEGS = ( LegSpec( leg_id="btc_trend_eth_rsi", family="btc_trend_eth_rsi", weight=0.5, params={ "eth_trend_sma": 50, "eth_rsi_threshold": 3.0, "eth_exit_rsi": 55.0, "btc_trend_sma": 480, "btc_momentum_lookback": 240, "btc_min_momentum": 0.0, }, ), LegSpec( leg_id="btc_shock_guard_eth_rsi", family="btc_shock_guard_eth_rsi", weight=0.5, params={ "eth_trend_sma": 50, "eth_rsi_threshold": 3.0, "eth_exit_rsi": 55.0, "btc_trend_sma": 480, "btc_momentum_lookback": 240, "btc_min_momentum": 0.01, "btc_shock_lookback": 96, "btc_max_realized_vol": 0.01, "btc_max_drawdown": 0.05, }, ), ) def iso_text(ts: int) -> str: return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z") def load_candles(symbol: str, bar: str) -> list[Candle]: path = CANDLE_DIR / symbol / f"{bar}.csv" candles: list[Candle] = [] with path.open("r", encoding="utf-8", newline="") as handle: for row in csv.DictReader(handle): candles.append( Candle( ts=int(row["ts"]), open=float(row["open"]), high=float(row["high"]), low=float(row["low"]), close=float(row["close"]), volume=float(row["volume"]), ) ) return sorted(candles, key=lambda candle: candle.ts) def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]: btc_by_ts = {candle.ts: candle for candle in btc} eth_aligned: list[Candle] = [] btc_aligned: list[Candle] = [] for candle in eth: btc_candle = btc_by_ts.get(candle.ts) if btc_candle is not None: eth_aligned.append(candle) btc_aligned.append(btc_candle) return eth_aligned, btc_aligned def sma(values: list[float], length: int, index: int) -> float: if index + 1 < length: return math.nan return sum(values[index + 1 - length : index + 1]) / length def rsi(values: list[float], length: int) -> list[float]: output = [math.nan] * len(values) if len(values) <= length: return output gains = [0.0] losses = [0.0] for previous, current in zip(values, values[1:]): delta = current - previous gains.append(max(delta, 0.0)) losses.append(max(-delta, 0.0)) average_gain = sum(gains[1 : length + 1]) / length average_loss = sum(losses[1 : length + 1]) / length for index in range(length, len(values)): if index > length: average_gain = ((average_gain * (length - 1)) + gains[index]) / length average_loss = ((average_loss * (length - 1)) + losses[index]) / length if average_loss == 0.0: output[index] = 100.0 if average_gain > 0.0 else 50.0 else: relative_strength = average_gain / average_loss output[index] = 100.0 - (100.0 / (1.0 + relative_strength)) return output def rolling_std(values: list[float], length: int, index: int) -> float: if index + 1 < length: return math.nan window = values[index + 1 - length : index + 1] mean = sum(window) / length variance = sum((value - mean) ** 2 for value in window) / (length - 1) return math.sqrt(variance) def rolling_max(values: list[float], length: int, index: int) -> float: if index + 1 < length: return math.nan return max(values[index + 1 - length : index + 1]) def pct_changes(values: list[float]) -> list[float]: output = [math.nan] for previous, current in zip(values, values[1:]): output.append(current / previous - 1.0) return output def threshold_delta(value: float, threshold: float, comparator: str) -> dict[str, float | str | bool]: if comparator == ">": return {"value": value, "threshold": threshold, "passes": value > threshold, "distance_to_pass": max(threshold - value, 0.0)} if comparator == ">=": return {"value": value, "threshold": threshold, "passes": value >= threshold, "distance_to_pass": max(threshold - value, 0.0)} if comparator == "<=": return {"value": value, "threshold": threshold, "passes": value <= threshold, "distance_to_pass": max(value - threshold, 0.0)} raise ValueError(f"unsupported comparator: {comparator}") def evaluate_leg(spec: LegSpec, eth: list[Candle], btc: list[Candle], index: int) -> dict[str, object]: eth_closes = [candle.close for candle in eth] btc_closes = [candle.close for candle in btc] btc_returns = pct_changes(btc_closes) eth_trend_sma = int(spec.params["eth_trend_sma"]) btc_trend_sma = int(spec.params["btc_trend_sma"]) btc_momentum_lookback = int(spec.params["btc_momentum_lookback"]) eth_rsi_threshold = float(spec.params["eth_rsi_threshold"]) btc_min_momentum = float(spec.params["btc_min_momentum"]) eth_trend = sma(eth_closes, eth_trend_sma, index) eth_rsi = rsi(eth_closes[: index + 1], 2)[-1] btc_trend = sma(btc_closes, btc_trend_sma, index) btc_momentum = btc[index].close / btc[index - btc_momentum_lookback].close - 1.0 conditions: dict[str, dict[str, float | str | bool]] = { "eth_close_above_sma50": threshold_delta(eth[index].close, eth_trend, ">"), "eth_rsi2_at_or_below_3": threshold_delta(eth_rsi, eth_rsi_threshold, "<="), "btc_close_above_sma480": threshold_delta(btc[index].close, btc_trend, ">"), "btc_momentum_at_or_above_min": threshold_delta(btc_momentum, btc_min_momentum, ">="), } indicators: dict[str, float | bool] = { "eth_close": eth[index].close, "eth_sma50": eth_trend, "eth_rsi2": eth_rsi, "btc_close": btc[index].close, "btc_sma480": btc_trend, "btc_momentum_240": btc_momentum, } entry_rule = "eth_close > eth_sma50 and eth_rsi2 <= 3 and btc_close > btc_sma480 and btc_momentum_240 >= minimum" if spec.family == "btc_shock_guard_eth_rsi": shock_lookback = int(spec.params["btc_shock_lookback"]) btc_max_realized_vol = float(spec.params["btc_max_realized_vol"]) btc_max_drawdown = float(spec.params["btc_max_drawdown"]) btc_realized_vol = rolling_std(btc_returns, shock_lookback, index) btc_recent_high = rolling_max(btc_closes, shock_lookback, index) btc_drawdown = btc[index].close / btc_recent_high - 1.0 conditions["btc_realized_vol_at_or_below_max"] = threshold_delta(btc_realized_vol, btc_max_realized_vol, "<=") conditions["btc_drawdown_at_or_above_floor"] = threshold_delta(btc_drawdown, -btc_max_drawdown, ">=") indicators["btc_realized_vol_96"] = btc_realized_vol indicators["btc_recent_high_96"] = btc_recent_high indicators["btc_drawdown_96"] = btc_drawdown entry_rule += " and btc_realized_vol_96 <= 0.01 and btc_drawdown_96 >= -0.05" signal = all(bool(condition["passes"]) for condition in conditions.values()) exit_signal = eth_rsi >= float(spec.params["eth_exit_rsi"]) or btc[index].close < btc_trend if spec.family == "btc_shock_guard_eth_rsi": exit_signal = exit_signal or not bool(conditions["btc_realized_vol_at_or_below_max"]["passes"]) or not bool( conditions["btc_drawdown_at_or_above_floor"]["passes"] ) return { "leg_id": spec.leg_id, "family": spec.family, "symbol": ETH, "bar": BAR, "suggested_weight": spec.weight, "direction": "long", "signal": signal, "intent": "long" if signal else "no_signal", "dry_run_action": "observe_long_signal" if signal else "observe_no_signal", "entry_rule": entry_rule, "exit_rule": "eth_rsi2 >= exit_rsi or btc_close < btc_sma480; shock leg also exits when shock guard fails", "exit_signal": exit_signal, "params": spec.params, "indicators": indicators, "conditions": conditions, } def build_payload() -> dict[str, object]: eth, btc = align_pair(load_candles(ETH, BAR), load_candles(BTC, BAR)) minimum_history = max( max(int(leg.params["eth_trend_sma"]), int(leg.params["btc_trend_sma"]), int(leg.params["btc_momentum_lookback"])) for leg in LEGS ) if len(eth) < minimum_history + 2: raise ValueError("not enough aligned ETH/BTC candles") decision_index = len(eth) - 2 latest = eth[-1] decision = eth[decision_index] legs = [evaluate_leg(leg, eth, btc, decision_index) for leg in LEGS] active_weight = sum(float(leg["suggested_weight"]) for leg in legs if leg["signal"]) signal = "long" if active_weight > 0.0 else "no_signal" return { "mode": "readonly_signal_intent", "strategy": { "name": "eth-btc-nextgen equal-2-c0003", "symbol": ETH, "bar": BAR, "direction": "long_only", "short_supported": False, "leverage_observation_reference": LEVERAGE, "source_candidate": "reports/eth-exploration/eth-btc-nextgen-portfolios.csv:equal-2-c0003", }, "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"), "submitted_orders": 0, "private_key_required": False, "order_client": None, "data": { "source": "local_csv", "eth_candles": str((CANDLE_DIR / ETH / f"{BAR}.csv").relative_to(ROOT)), "btc_candles": str((CANDLE_DIR / BTC / f"{BAR}.csv").relative_to(ROOT)), "aligned_candles": len(eth), "latest_aligned_candle_ts": latest.ts, "latest_aligned_candle_time": iso_text(latest.ts), "decision_candle_ts": decision.ts, "decision_candle_time": iso_text(decision.ts), "decision_rule": "use the aligned candle immediately before the latest aligned local candle", }, "decision": { "signal": signal, "active_signal_count": sum(1 for leg in legs if leg["signal"]), "active_suggested_weight": active_weight, "needs_order": False, "needs_cancel": False, "intent": "observe_long_signal" if signal == "long" else "observe_no_signal", }, "observation_parameters": { "execution": "no_order_submission", "purpose": "small-capital futures observation candidate only after separate order-path implementation", "candidate_cost_model": "maker_taker", "candidate_roundtrip_cost_on_margin": 0.0021, "portfolio_weighting": "equal 0.5 / 0.5", "position_direction_to_observe": signal, "bar_close_confirmation": "15m aligned ETH/BTC local candles", "state_assumption": "no live position state read or assumed", }, "readiness_check": { "can_be_used_for_later_small_capital_futures_observation": True, "reason": "signal rules are closed over local public candles and produce no order or cancel payload", "blocked_for_live_trading": True, "blocker": "this script intentionally has no OKX private client, order sizing, or submit path", }, "legs": legs, } def markdown_report(payload: dict[str, object]) -> str: lines = [ "# ETH BTC nextgen signal intent", "", "Read-only signal intent. No order or cancel request was submitted.", "", "## Decision", "", f"- Created at: `{payload['created_at']}`", f"- Strategy: `{payload['strategy']['name']}`", f"- Signal: `{payload['decision']['signal']}`", f"- Active signal count: `{payload['decision']['active_signal_count']}`", f"- Active suggested weight: `{payload['decision']['active_suggested_weight']:.8f}`", f"- Decision candle: `{payload['data']['decision_candle_time']}` (`{payload['data']['decision_candle_ts']}`)", f"- Latest aligned candle: `{payload['data']['latest_aligned_candle_time']}` (`{payload['data']['latest_aligned_candle_ts']}`)", "", "## Legs", "", "| Leg | Signal | Weight | ETH close | ETH RSI2 | BTC momentum 240 | Action |", "| --- | --- | --- | --- | --- | --- | --- |", ] for leg in payload["legs"]: indicators = leg["indicators"] lines.append( f"| `{leg['leg_id']}` | `{leg['signal']}` | `{leg['suggested_weight']:.8f}` | `{indicators['eth_close']}` | `{indicators['eth_rsi2']}` | `{indicators['btc_momentum_240']}` | `{leg['dry_run_action']}` |" ) lines.extend(["", "## Trigger Distance", ""]) for leg in payload["legs"]: lines.extend([f"### {leg['leg_id']}", "", "| Condition | Value | Threshold | Passes | Distance to pass |", "| --- | --- | --- | --- | --- |"]) for name, condition in leg["conditions"].items(): lines.append( f"| `{name}` | `{condition['value']}` | `{condition['threshold']}` | `{condition['passes']}` | `{condition['distance_to_pass']}` |" ) lines.append("") lines.extend( [ "## Observation", "", f"- Can be used for later small-capital futures observation: `{payload['readiness_check']['can_be_used_for_later_small_capital_futures_observation']}`", f"- Live trading blocked: `{payload['readiness_check']['blocked_for_live_trading']}`", f"- Execution: `{payload['observation_parameters']['execution']}`", "", "## Intent JSON", "", "```json", json.dumps(payload, indent=2, sort_keys=True), "```", ] ) return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser(description="Build read-only ETH/BTC nextgen signal intent.") parser.add_argument("--no-write", action="store_true") args = parser.parse_args() payload = build_payload() if not args.no_write: REPORT_DIR.mkdir(parents=True, exist_ok=True) JSON_REPORT.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") MARKDOWN_REPORT.write_text(markdown_report(payload), encoding="utf-8") print(json.dumps(payload, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())