from __future__ import annotations import json import sys from datetime import UTC, datetime from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from scripts import build_eth_btc_nextgen_signal_intent as nextgen_intent from scripts import search_eth_microstructure_variants as micro from scripts import search_eth_nextgen_micro_portfolio as portfolio REPORT_DIR = ROOT / "reports" / "eth-exploration" JSON_REPORT = REPORT_DIR / "eth-nextgen-micro-signal-intent.json" MARKDOWN_REPORT = REPORT_DIR / "eth-nextgen-micro-signal-intent.md" ETH = "ETH-USDT-SWAP" BAR = "15m" TARGET_NAME = "switch-l30-r96_q0.15_mf0.25_us" MICRO_NAME = "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.25-us" LOOKBACK_DAYS = 30 ROUNDTRIP_COST_ON_MARGIN = 0.0021 MICRO_PARAMS = { "range_window": 96, "atr_window": 48, "atr_quantile_window": 480, "atr_quantile": 0.15, "stop_loss_pct": 0.008, "take_profit_pct": 0.016, "max_hold_bars": 32, "margin_fraction": 0.25, "session": "us", } def iso_text(ts: int) -> str: return datetime.fromtimestamp(ts / 1000, UTC).isoformat().replace("+00:00", "Z") def latest_active_engine() -> dict[str, object]: existing_equity = pd.read_csv(REPORT_DIR / "eth-btc-nextgen-equity.csv") base = existing_equity[ (existing_equity["cost_model"] == portfolio.PRIMARY_COST) & (existing_equity["name"] == portfolio.NEXTGEN_BASELINE) ].copy() if base.empty: raise KeyError(f"missing existing nextgen equity for {portfolio.NEXTGEN_BASELINE}") index = pd.DatetimeIndex(pd.to_datetime(base["date"], utc=True)) nextgen_series, _ = portfolio.load_nextgen(index, ROUNDTRIP_COST_ON_MARGIN) micro_series = portfolio.load_micro_candidates(index, ROUNDTRIP_COST_ON_MARGIN)[MICRO_NAME][0] nextgen_regime = nextgen_series / nextgen_series.shift(LOOKBACK_DAYS) - 1.0 micro_regime = micro_series / micro_series.shift(LOOKBACK_DAYS) - 1.0 active = ((nextgen_regime < 0.0) & (micro_regime > 0.0)).shift(1).fillna(False).astype(bool) decision_date = active.index[-1] micro_active = bool(active.iloc[-1]) return { "active_engine": "micro" if micro_active else "nextgen", "decision_date": decision_date.strftime("%Y-%m-%d"), "switch_rule": "prior completed daily nextgen 30d return < 0 and micro 30d return > 0", "lookback_days": LOOKBACK_DAYS, "nextgen_30d_return": float(nextgen_regime.iloc[-2]) if len(nextgen_regime) >= 2 else None, "micro_30d_return": float(micro_regime.iloc[-2]) if len(micro_regime) >= 2 else None, "nextgen_equity": float(nextgen_series.iloc[-1]), "micro_equity": float(micro_series.iloc[-1]), } def micro_signal() -> dict[str, object]: candles = micro._load_candles(ETH, BAR) decision_index = len(candles) - 2 if decision_index < max(int(MICRO_PARAMS["range_window"]), int(MICRO_PARAMS["atr_window"]), int(MICRO_PARAMS["atr_quantile_window"])): raise ValueError("not enough ETH candles for micro signal") highs = pd.Series([c.high for c in candles], dtype=float) lows = pd.Series([c.low for c in candles], dtype=float) closes = pd.Series([c.close for c in candles], dtype=float) prev_close = closes.shift(1) true_range = pd.concat([(highs - lows), (highs - prev_close).abs(), (lows - prev_close).abs()], axis=1).max(axis=1) atr = true_range.rolling(int(MICRO_PARAMS["atr_window"])).mean() / closes atr_limit = atr.rolling(int(MICRO_PARAMS["atr_quantile_window"])).quantile(float(MICRO_PARAMS["atr_quantile"])) range_high = highs.shift(1).rolling(int(MICRO_PARAMS["range_window"])).max() range_low = lows.shift(1).rolling(int(MICRO_PARAMS["range_window"])).min() candle = candles[decision_index] compressed = bool(float(atr.iloc[decision_index - 1]) <= float(atr_limit.iloc[decision_index - 1])) long_signal = compressed and candle.close > float(range_high.iloc[decision_index]) short_signal = compressed and candle.close < float(range_low.iloc[decision_index]) session_ok = micro._session_ok(candle, str(MICRO_PARAMS["session"])) signal = "long" if session_ok and long_signal else "short" if session_ok and short_signal else "no_signal" return { "engine": "micro", "candidate": MICRO_NAME, "symbol": ETH, "bar": BAR, "decision_candle_ts": candle.ts, "decision_candle_time": iso_text(candle.ts), "latest_local_candle_ts": candles[-1].ts, "latest_local_candle_time": iso_text(candles[-1].ts), "signal": signal, "raw_long_signal": bool(long_signal), "raw_short_signal": bool(short_signal), "session_ok": session_ok, "indicators": { "eth_close": candle.close, "atr_previous": float(atr.iloc[decision_index - 1]), "atr_limit_previous": float(atr_limit.iloc[decision_index - 1]), "range_high": float(range_high.iloc[decision_index]), "range_low": float(range_low.iloc[decision_index]), }, "params": MICRO_PARAMS, } def execution_intent(active_engine: str, selected_signal: str, nextgen_payload: dict[str, object]) -> dict[str, object]: if selected_signal == "no_signal": entry_unit = 0.0 elif active_engine == "nextgen": entry_unit = float(nextgen_payload["decision"]["active_suggested_weight"]) else: entry_unit = 1.0 return { "entry_signal": selected_signal, "entry_unit": entry_unit, "target_position_known": False, "target_position": None, "blocker": "persistent strategy position state is required before entry signals can be reconciled to target position", } def build_payload() -> dict[str, object]: switch_state = latest_active_engine() nextgen_payload = nextgen_intent.build_payload() micro_payload = micro_signal() active_engine = str(switch_state["active_engine"]) selected_signal = str(nextgen_payload["decision"]["signal"]) if active_engine == "nextgen" else str(micro_payload["signal"]) return { "mode": "readonly_signal_intent", "strategy": { "name": TARGET_NAME, "symbol": ETH, "bar": BAR, "direction": "nextgen_long_only_or_micro_observation", "source_report": "reports/eth-exploration/eth-nextgen-micro-portfolio-report.md", "cost_model": "maker_taker", "roundtrip_cost_on_margin": ROUNDTRIP_COST_ON_MARGIN, }, "created_at": datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z"), "submitted_orders": 0, "private_key_required": False, "order_client": None, "switch_state": switch_state, "decision": { "active_engine": active_engine, "selected_signal": selected_signal, "needs_order": False, "needs_cancel": False, "intent": f"observe_{active_engine}_{selected_signal}", }, "execution_intent": execution_intent(active_engine, selected_signal, nextgen_payload), "risk_limits": { "no_order_submission": True, "no_cancel_submission": True, "no_position_state_assumed": True, "execution": "intent_only", "blocked_for_live_trading": True, "blocker": "persistent virtual position state is not maintained by this read-only signal builder", }, "nextgen": { "decision": nextgen_payload["decision"], "data": nextgen_payload["data"], "legs": nextgen_payload["legs"], }, "micro": micro_payload, } def markdown_report(payload: dict[str, object]) -> str: lines = [ "# ETH nextgen + micro signal intent", "", "Read-only signal intent. No order or cancel request was submitted.", "", "## Decision", "", f"- Created at: `{payload['created_at']}`", f"- Strategy: `{payload['strategy']['name']}`", f"- Active engine: `{payload['decision']['active_engine']}`", f"- Selected signal: `{payload['decision']['selected_signal']}`", f"- Entry unit: `{payload['execution_intent']['entry_unit']}`", f"- Target position known: `{payload['execution_intent']['target_position_known']}`", f"- Needs order: `{payload['decision']['needs_order']}`", f"- Blocked for live trading: `{payload['risk_limits']['blocked_for_live_trading']}`", f"- Blocker: `{payload['risk_limits']['blocker']}`", "", "## Intent JSON", "", "```json", json.dumps(payload, indent=2, sort_keys=True), "```", ] return "\n".join(lines) + "\n"