| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- from __future__ import annotations
- import json
- from collections import Counter
- from datetime import UTC, datetime, timedelta
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- OUTPUT = ROOT / "reports" / "live-recent" / "readonly-observers-snapshot.md"
- WINDOWS = {
- "24h": timedelta(hours=24),
- "3d": timedelta(days=3),
- "7d": timedelta(days=7),
- }
- EVENT_FILES = {
- "live_bb_squeeze": ROOT / "var" / "bb-squeeze-executor" / "events.jsonl",
- "bb_squeeze_t_gated": ROOT / "var" / "bb-squeeze-t-gated-observer" / "observer-events.jsonl",
- "calendar_fusion": ROOT / "var" / "calendar-fusion" / "observer-events.jsonl",
- "crash_follow_short": ROOT / "var" / "crash-follow-short-observer" / "observer-events.jsonl",
- "eth_focused_portfolio": ROOT / "var" / "eth-focused-portfolio" / "observer-events.jsonl",
- "eth_nextgen_micro": ROOT / "var" / "eth-nextgen-micro" / "observer-events.jsonl",
- "high_frequency_portfolio": ROOT / "var" / "high-frequency-portfolio" / "observer-events.jsonl",
- "short_bias_readonly": ROOT / "var" / "short-bias-readonly" / "observer-events.jsonl",
- }
- def parse_time(payload: dict[str, object]) -> datetime | None:
- value = payload.get("created_at")
- if not isinstance(value, str):
- return None
- return datetime.fromisoformat(value.replace("Z", "+00:00"))
- def read_events(path: Path) -> list[tuple[datetime, dict[str, object]]]:
- if not path.exists():
- return []
- rows = []
- for line in path.read_text(encoding="utf-8").splitlines():
- if not line:
- continue
- payload = json.loads(line)
- created_at = parse_time(payload)
- if created_at is not None:
- rows.append((created_at, payload))
- return rows
- def classify(name: str, payload: dict[str, object]) -> str:
- if "error" in payload:
- return "error"
- if name in {"live_bb_squeeze", "bb_squeeze_t_gated", "crash_follow_short"}:
- signal = payload.get("signal") or {}
- assert isinstance(signal, dict)
- action = str(signal.get("signal") or "hold")
- side = str(signal.get("target_side") or "flat")
- if action in {"entry", "entry_long", "long"} or (signal.get("entry_signal") and side == "long"):
- return "entry_long"
- if action in {"short", "entry_short"} or (signal.get("entry_signal") and side == "short"):
- return "entry_short"
- if action.startswith("exit") or signal.get("exit_signal"):
- return action if action.startswith("exit") else "exit"
- return action
- if name == "calendar_fusion":
- state = payload.get("calendar_state") or {}
- assert isinstance(state, dict)
- if state.get("entry_signal_on_decision_candle"):
- return "entry_long"
- if state.get("theoretical_calendar_position_active"):
- return "active_long"
- return "flat"
- if name == "eth_focused_portfolio":
- entries = []
- exits = []
- for leg in payload.get("legs") or []:
- assert isinstance(leg, dict)
- symbol = str(leg.get("symbol") or leg.get("leg_id") or "leg")
- if leg.get("signal") or leg.get("needs_order"):
- entries.append(symbol)
- if leg.get("exit_signal") or leg.get("needs_cancel"):
- exits.append(symbol)
- if entries:
- return "entry_" + "+".join(sorted(set(entries)))
- if exits:
- return "exit_" + "+".join(sorted(set(exits)))
- return "flat"
- if name == "eth_nextgen_micro":
- signal = payload.get("signal") or {}
- assert isinstance(signal, dict)
- intent = str(signal.get("intent") or "unknown")
- if "long" in intent:
- return "entry_long"
- if "short" in intent:
- return "entry_short"
- return intent
- if name == "high_frequency_portfolio":
- target = payload.get("target") or {}
- assert isinstance(target, dict)
- net_unit = float(target.get("net_unit") or 0.0)
- if net_unit > 0.0:
- return "target_long"
- if net_unit < 0.0:
- return "target_short"
- return "flat"
- if name == "short_bias_readonly":
- decision = payload.get("decision") or payload.get("signal") or {}
- assert isinstance(decision, dict)
- if decision.get("entry_signal"):
- return "entry_short"
- if decision.get("exit_signal"):
- return "exit_short"
- return "hold"
- return "unknown"
- def compact(counter: Counter[str]) -> str:
- return ", ".join(f"{key}:{value}" for key, value in counter.most_common(5)) or "none"
- def readout(name: str, counters: dict[str, Counter[str]]) -> str:
- seven = counters["7d"]
- entries = sum(value for key, value in seven.items() if key.startswith("entry"))
- errors = seven.get("error", 0)
- total = sum(seven.values())
- if name == "live_bb_squeeze":
- return "live path active; recent 24h no entry"
- if total and errors / total > 0.5:
- return "recent errors dominated; fixed services need fresh observation"
- if entries:
- return f"has complementary entries: {entries} in 7d"
- return "healthy but no recent entries"
- def main() -> int:
- now = datetime.now(UTC)
- lines = [
- "# Read-only observer snapshot",
- "",
- f"Generated at `{now.isoformat().replace('+00:00', 'Z')}`.",
- "",
- "| observer | latest_event_utc | 24h | 3d | 7d | readout |",
- "| --- | --- | --- | --- | --- | --- |",
- ]
- for name, path in EVENT_FILES.items():
- events = read_events(path)
- latest = events[-1][0].isoformat().replace("+00:00", "Z") if events else "missing"
- counters = {
- label: Counter(classify(name, payload) for created_at, payload in events if created_at >= now - window)
- for label, window in WINDOWS.items()
- }
- lines.append(
- f"| {name} | {latest} | {compact(counters['24h'])} | {compact(counters['3d'])} | "
- f"{compact(counters['7d'])} | {readout(name, counters)} |"
- )
- recent = ROOT / "reports" / "eth-exploration" / "current-strategy-recent-activity-report.md"
- if recent.exists():
- lines.extend(["", "## Latest server backtest", "", recent.read_text(encoding="utf-8")])
- OUTPUT.parent.mkdir(parents=True, exist_ok=True)
- OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")
- print(f"wrote {OUTPUT}")
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|