from __future__ import annotations import json from collections import Counter from datetime import UTC, datetime, timedelta from pathlib import Path ROOT = Path(__file__).resolve().parents[1] OUTPUT = ROOT / "reports" / "live-recent" / "readonly-observers-snapshot.md" WINDOWS = { "24h": timedelta(hours=24), "3d": timedelta(days=3), "7d": timedelta(days=7), } EVENT_FILES = { "live_bb_squeeze": ROOT / "var" / "bb-squeeze-executor" / "events.jsonl", "bb_squeeze_t_gated": ROOT / "var" / "bb-squeeze-t-gated-observer" / "observer-events.jsonl", "calendar_fusion": ROOT / "var" / "calendar-fusion" / "observer-events.jsonl", "crash_follow_short": ROOT / "var" / "crash-follow-short-observer" / "observer-events.jsonl", "eth_focused_portfolio": ROOT / "var" / "eth-focused-portfolio" / "observer-events.jsonl", "eth_nextgen_micro": ROOT / "var" / "eth-nextgen-micro" / "observer-events.jsonl", "high_frequency_portfolio": ROOT / "var" / "high-frequency-portfolio" / "observer-events.jsonl", "short_bias_readonly": ROOT / "var" / "short-bias-readonly" / "observer-events.jsonl", } def parse_time(payload: dict[str, object]) -> datetime | None: value = payload.get("created_at") if not isinstance(value, str): return None return datetime.fromisoformat(value.replace("Z", "+00:00")) def read_events(path: Path) -> list[tuple[datetime, dict[str, object]]]: if not path.exists(): return [] rows = [] for line in path.read_text(encoding="utf-8").splitlines(): if not line: continue payload = json.loads(line) created_at = parse_time(payload) if created_at is not None: rows.append((created_at, payload)) return rows def classify(name: str, payload: dict[str, object]) -> str: if "error" in payload: return "error" if name in {"live_bb_squeeze", "bb_squeeze_t_gated", "crash_follow_short"}: signal = payload.get("signal") or {} assert isinstance(signal, dict) action = str(signal.get("signal") or "hold") side = str(signal.get("target_side") or "flat") if action in {"entry", "entry_long", "long"} or (signal.get("entry_signal") and side == "long"): return "entry_long" if action in {"short", "entry_short"} or (signal.get("entry_signal") and side == "short"): return "entry_short" if action.startswith("exit") or signal.get("exit_signal"): return action if action.startswith("exit") else "exit" return action if name == "calendar_fusion": state = payload.get("calendar_state") or {} assert isinstance(state, dict) if state.get("entry_signal_on_decision_candle"): return "entry_long" if state.get("theoretical_calendar_position_active"): return "active_long" return "flat" if name == "eth_focused_portfolio": entries = [] exits = [] for leg in payload.get("legs") or []: assert isinstance(leg, dict) symbol = str(leg.get("symbol") or leg.get("leg_id") or "leg") if leg.get("signal") or leg.get("needs_order"): entries.append(symbol) if leg.get("exit_signal") or leg.get("needs_cancel"): exits.append(symbol) if entries: return "entry_" + "+".join(sorted(set(entries))) if exits: return "exit_" + "+".join(sorted(set(exits))) return "flat" if name == "eth_nextgen_micro": signal = payload.get("signal") or {} assert isinstance(signal, dict) intent = str(signal.get("intent") or "unknown") if "long" in intent: return "entry_long" if "short" in intent: return "entry_short" return intent if name == "high_frequency_portfolio": target = payload.get("target") or {} assert isinstance(target, dict) net_unit = float(target.get("net_unit") or 0.0) if net_unit > 0.0: return "target_long" if net_unit < 0.0: return "target_short" return "flat" if name == "short_bias_readonly": decision = payload.get("decision") or payload.get("signal") or {} assert isinstance(decision, dict) if decision.get("entry_signal"): return "entry_short" if decision.get("exit_signal"): return "exit_short" return "hold" return "unknown" def compact(counter: Counter[str]) -> str: return ", ".join(f"{key}:{value}" for key, value in counter.most_common(5)) or "none" def readout(name: str, counters: dict[str, Counter[str]]) -> str: seven = counters["7d"] entries = sum(value for key, value in seven.items() if key.startswith("entry")) errors = seven.get("error", 0) total = sum(seven.values()) if name == "live_bb_squeeze": return "live path active; recent 24h no entry" if total and errors / total > 0.5: return "recent errors dominated; fixed services need fresh observation" if entries: return f"has complementary entries: {entries} in 7d" return "healthy but no recent entries" def main() -> int: now = datetime.now(UTC) lines = [ "# Read-only observer snapshot", "", f"Generated at `{now.isoformat().replace('+00:00', 'Z')}`.", "", "| observer | latest_event_utc | 24h | 3d | 7d | readout |", "| --- | --- | --- | --- | --- | --- |", ] for name, path in EVENT_FILES.items(): events = read_events(path) latest = events[-1][0].isoformat().replace("+00:00", "Z") if events else "missing" counters = { label: Counter(classify(name, payload) for created_at, payload in events if created_at >= now - window) for label, window in WINDOWS.items() } lines.append( f"| {name} | {latest} | {compact(counters['24h'])} | {compact(counters['3d'])} | " f"{compact(counters['7d'])} | {readout(name, counters)} |" ) recent = ROOT / "reports" / "eth-exploration" / "current-strategy-recent-activity-report.md" if recent.exists(): lines.extend(["", "## Latest server backtest", "", recent.read_text(encoding="utf-8")]) OUTPUT.parent.mkdir(parents=True, exist_ok=True) OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8") print(f"wrote {OUTPUT}") return 0 if __name__ == "__main__": raise SystemExit(main())