summarize_readonly_observers.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. from __future__ import annotations
  2. import json
  3. from collections import Counter
  4. from datetime import UTC, datetime, timedelta
  5. from pathlib import Path
  6. ROOT = Path(__file__).resolve().parents[1]
  7. OUTPUT = ROOT / "reports" / "live-recent" / "readonly-observers-snapshot.md"
  8. WINDOWS = {
  9. "24h": timedelta(hours=24),
  10. "3d": timedelta(days=3),
  11. "7d": timedelta(days=7),
  12. }
  13. EVENT_FILES = {
  14. "live_bb_squeeze": ROOT / "var" / "bb-squeeze-executor" / "events.jsonl",
  15. "bb_squeeze_t_gated": ROOT / "var" / "bb-squeeze-t-gated-observer" / "observer-events.jsonl",
  16. "calendar_fusion": ROOT / "var" / "calendar-fusion" / "observer-events.jsonl",
  17. "crash_follow_short": ROOT / "var" / "crash-follow-short-observer" / "observer-events.jsonl",
  18. "eth_focused_portfolio": ROOT / "var" / "eth-focused-portfolio" / "observer-events.jsonl",
  19. "eth_nextgen_micro": ROOT / "var" / "eth-nextgen-micro" / "observer-events.jsonl",
  20. "high_frequency_portfolio": ROOT / "var" / "high-frequency-portfolio" / "observer-events.jsonl",
  21. "short_bias_readonly": ROOT / "var" / "short-bias-readonly" / "observer-events.jsonl",
  22. }
  23. def parse_time(payload: dict[str, object]) -> datetime | None:
  24. value = payload.get("created_at")
  25. if not isinstance(value, str):
  26. return None
  27. return datetime.fromisoformat(value.replace("Z", "+00:00"))
  28. def read_events(path: Path) -> list[tuple[datetime, dict[str, object]]]:
  29. if not path.exists():
  30. return []
  31. rows = []
  32. for line in path.read_text(encoding="utf-8").splitlines():
  33. if not line:
  34. continue
  35. payload = json.loads(line)
  36. created_at = parse_time(payload)
  37. if created_at is not None:
  38. rows.append((created_at, payload))
  39. return rows
  40. def classify(name: str, payload: dict[str, object]) -> str:
  41. if "error" in payload:
  42. return "error"
  43. if name in {"live_bb_squeeze", "bb_squeeze_t_gated", "crash_follow_short"}:
  44. signal = payload.get("signal") or {}
  45. assert isinstance(signal, dict)
  46. action = str(signal.get("signal") or "hold")
  47. side = str(signal.get("target_side") or "flat")
  48. if action in {"entry", "entry_long", "long"} or (signal.get("entry_signal") and side == "long"):
  49. return "entry_long"
  50. if action in {"short", "entry_short"} or (signal.get("entry_signal") and side == "short"):
  51. return "entry_short"
  52. if action.startswith("exit") or signal.get("exit_signal"):
  53. return action if action.startswith("exit") else "exit"
  54. return action
  55. if name == "calendar_fusion":
  56. state = payload.get("calendar_state") or {}
  57. assert isinstance(state, dict)
  58. if state.get("entry_signal_on_decision_candle"):
  59. return "entry_long"
  60. if state.get("theoretical_calendar_position_active"):
  61. return "active_long"
  62. return "flat"
  63. if name == "eth_focused_portfolio":
  64. entries = []
  65. exits = []
  66. for leg in payload.get("legs") or []:
  67. assert isinstance(leg, dict)
  68. symbol = str(leg.get("symbol") or leg.get("leg_id") or "leg")
  69. if leg.get("signal") or leg.get("needs_order"):
  70. entries.append(symbol)
  71. if leg.get("exit_signal") or leg.get("needs_cancel"):
  72. exits.append(symbol)
  73. if entries:
  74. return "entry_" + "+".join(sorted(set(entries)))
  75. if exits:
  76. return "exit_" + "+".join(sorted(set(exits)))
  77. return "flat"
  78. if name == "eth_nextgen_micro":
  79. signal = payload.get("signal") or {}
  80. assert isinstance(signal, dict)
  81. intent = str(signal.get("intent") or "unknown")
  82. if "long" in intent:
  83. return "entry_long"
  84. if "short" in intent:
  85. return "entry_short"
  86. return intent
  87. if name == "high_frequency_portfolio":
  88. target = payload.get("target") or {}
  89. assert isinstance(target, dict)
  90. net_unit = float(target.get("net_unit") or 0.0)
  91. if net_unit > 0.0:
  92. return "target_long"
  93. if net_unit < 0.0:
  94. return "target_short"
  95. return "flat"
  96. if name == "short_bias_readonly":
  97. decision = payload.get("decision") or payload.get("signal") or {}
  98. assert isinstance(decision, dict)
  99. if decision.get("entry_signal"):
  100. return "entry_short"
  101. if decision.get("exit_signal"):
  102. return "exit_short"
  103. return "hold"
  104. return "unknown"
  105. def compact(counter: Counter[str]) -> str:
  106. return ", ".join(f"{key}:{value}" for key, value in counter.most_common(5)) or "none"
  107. def readout(name: str, counters: dict[str, Counter[str]]) -> str:
  108. seven = counters["7d"]
  109. entries = sum(value for key, value in seven.items() if key.startswith("entry"))
  110. errors = seven.get("error", 0)
  111. total = sum(seven.values())
  112. if name == "live_bb_squeeze":
  113. return "live path active; recent 24h no entry"
  114. if total and errors / total > 0.5:
  115. return "recent errors dominated; fixed services need fresh observation"
  116. if entries:
  117. return f"has complementary entries: {entries} in 7d"
  118. return "healthy but no recent entries"
  119. def main() -> int:
  120. now = datetime.now(UTC)
  121. lines = [
  122. "# Read-only observer snapshot",
  123. "",
  124. f"Generated at `{now.isoformat().replace('+00:00', 'Z')}`.",
  125. "",
  126. "| observer | latest_event_utc | 24h | 3d | 7d | readout |",
  127. "| --- | --- | --- | --- | --- | --- |",
  128. ]
  129. for name, path in EVENT_FILES.items():
  130. events = read_events(path)
  131. latest = events[-1][0].isoformat().replace("+00:00", "Z") if events else "missing"
  132. counters = {
  133. label: Counter(classify(name, payload) for created_at, payload in events if created_at >= now - window)
  134. for label, window in WINDOWS.items()
  135. }
  136. lines.append(
  137. f"| {name} | {latest} | {compact(counters['24h'])} | {compact(counters['3d'])} | "
  138. f"{compact(counters['7d'])} | {readout(name, counters)} |"
  139. )
  140. recent = ROOT / "reports" / "eth-exploration" / "current-strategy-recent-activity-report.md"
  141. if recent.exists():
  142. lines.extend(["", "## Latest server backtest", "", recent.read_text(encoding="utf-8")])
  143. OUTPUT.parent.mkdir(parents=True, exist_ok=True)
  144. OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")
  145. print(f"wrote {OUTPUT}")
  146. return 0
  147. if __name__ == "__main__":
  148. raise SystemExit(main())