diagnose_long_short_fusion_recent_triggers.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. from __future__ import annotations
  2. import argparse
  3. import sys
  4. from pathlib import Path
  5. import pandas as pd
  6. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  7. from scripts import refine_expansion_rotation_risk as rotation_risk
  8. from scripts import search_expansion_rotation as rotation
  9. from scripts import search_long_short_fusion as fusion
  10. from scripts import search_short_bias_swing as swing
  11. from scripts.search_short_bias_overlay import markdown_table
  12. OUTPUT_DIR = Path("reports/eth-exploration")
  13. PREFIX = "long-short-fusion-recent-trigger-evidence"
  14. NEAR_MISS_PATH = OUTPUT_DIR / "eth-bidir-fusion-candidates.csv"
  15. FUSION_TOTAL_PATH = Path("reports/long-short-fusion/fusion-total.csv")
  16. WINDOWS = (("30d", pd.Timedelta(days=30)), ("14d", pd.Timedelta(days=14)))
  17. EPS = 1e-12
  18. def as_utc(value: object) -> pd.Timestamp:
  19. ts = pd.Timestamp(value)
  20. return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC")
  21. def event_rows_from_active(name: str, active: pd.Series) -> list[dict[str, object]]:
  22. active = active.fillna(False).astype(bool)
  23. rows: list[dict[str, object]] = []
  24. previous = False
  25. for ts, value in active.items():
  26. current = bool(value)
  27. if current != previous:
  28. rows.append({"component": name, "time": as_utc(ts), "event": "entry" if current else "exit"})
  29. previous = current
  30. return rows
  31. def event_rows_from_trades(name: str, trades: list[dict[str, object]]) -> list[dict[str, object]]:
  32. rows: list[dict[str, object]] = []
  33. for trade in trades:
  34. rows.append({"component": name, "time": as_utc(trade["entry_time"]), "event": "entry"})
  35. rows.append({"component": name, "time": as_utc(trade["exit_time"]), "event": "exit"})
  36. return rows
  37. def active_days_from_trades(index: pd.DatetimeIndex, trades: list[dict[str, object]]) -> pd.Series:
  38. active = pd.Series(False, index=index)
  39. for trade in trades:
  40. entry = as_utc(trade["entry_time"]).normalize()
  41. exit_ = as_utc(trade["exit_time"]).normalize()
  42. active.loc[(active.index >= entry) & (active.index <= exit_)] = True
  43. return active
  44. def load_near_miss_candidates(limit: int) -> pd.DataFrame:
  45. if NEAR_MISS_PATH.exists():
  46. frame = pd.read_csv(NEAR_MISS_PATH)
  47. frame = frame[(frame["source"] == "long_short_fusion") & frame["all_horizons_nonnegative"]].copy()
  48. if not frame.empty:
  49. return frame.sort_values("score", ascending=False).head(limit)
  50. total = pd.read_csv(FUSION_TOTAL_PATH)
  51. total = total[
  52. (total["total_return"] >= 0.0)
  53. & (total["h3y_return"] >= 0.0)
  54. & (total["h1y_return"] >= 0.0)
  55. & (total["h6m_return"] >= 0.0)
  56. & (total["h3m_return"] >= 0.0)
  57. ].copy()
  58. return total.sort_values(["h1y_return", "h6m_return", "h3m_return"], ascending=False).head(limit)
  59. def candidate_weight_map(row: pd.Series) -> dict[str, float]:
  60. return {
  61. str(row["long_variant"]): float(row["long_weight"]),
  62. "btc_risk_short": float(row["btc_risk_short_weight"]),
  63. "eth_4h_vol_short": float(row["eth_4h_vol_short_weight"]),
  64. "btc_4h_vol_short": float(row["btc_4h_vol_short_weight"]),
  65. "eth_4h_vol_short_gated": float(row["eth_4h_vol_short_gated_weight"]),
  66. "btc_4h_vol_short_gated": float(row["btc_4h_vol_short_gated_weight"]),
  67. }
  68. def recent_return(series: pd.Series, offset: pd.Timedelta) -> float:
  69. scoped = series[series.index >= series.index[-1] - offset]
  70. if len(scoped) < 2:
  71. return 0.0
  72. return float(scoped.iloc[-1] / scoped.iloc[0] - 1.0)
  73. def rotation_activity(years: float, daily_index: pd.DatetimeIndex, risk_state_daily: pd.Series) -> dict[str, tuple[pd.Series, list[dict[str, object]]]]:
  74. row = pd.read_csv(rotation_risk.OUTPUT_DIR / "rotation-risk-top.csv").iloc[0]
  75. base = rotation_risk.params_from_row(row)
  76. params = rotation_risk.RiskParams(base, float(row["leverage"]), float(row["exposure"]), float(row["vol_target"]))
  77. rotation.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  78. frames = rotation.load_symbol_bar_frames(years)
  79. closes = rotation.aligned_closes(frames, base)
  80. weights = rotation_risk.apply_risk_controls(closes, rotation.target_weights(closes, base), params)
  81. executed = weights.shift(1).fillna(0.0)
  82. hourly_active = executed.abs().sum(axis=1) > 0.0
  83. base_active = hourly_active.resample("1D").max().reindex(daily_index).fillna(False).astype(bool)
  84. risk = risk_state_daily.reindex(daily_index).fillna(False).astype(bool)
  85. return {
  86. "long_rotation": (base_active, event_rows_from_active("long_rotation", base_active)),
  87. "long_rotation_riskoff70": (base_active, event_rows_from_active("long_rotation_riskoff70", base_active)),
  88. "long_rotation_riskoff50": (base_active, event_rows_from_active("long_rotation_riskoff50", base_active)),
  89. "long_rotation_riskoff25": (base_active, event_rows_from_active("long_rotation_riskoff25", base_active)),
  90. "long_rotation_riskoff00": (base_active & ~risk, event_rows_from_active("long_rotation_riskoff00", base_active & ~risk)),
  91. }
  92. def btc_risk_activity(years: float, daily_index: pd.DatetimeIndex) -> tuple[pd.Series, list[dict[str, object]], pd.Series]:
  93. params = fusion.overlay.BtcRiskShort(
  94. family="btc_risk_pair",
  95. bar="1h",
  96. btc_trend=1440,
  97. btc_lookback=336,
  98. symbol_trend=720,
  99. vol_lookback=336,
  100. btc_max_momentum=-0.005,
  101. btc_min_drop=0.025,
  102. min_btc_vol=0.012,
  103. symbol_max_momentum=-0.010,
  104. short_symbols=("ETH-USDT-SWAP",),
  105. )
  106. fusion.overlay.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  107. frames = fusion.overlay.load_frames(years)
  108. closes = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in ("BTC-USDT-SWAP", "ETH-USDT-SWAP")}).dropna()
  109. weights = fusion.overlay.short_weights(closes, params)
  110. active = weights.shift(1).fillna(0.0)["ETH-USDT-SWAP"] < 0.0
  111. daily = active.resample("1D").max().reindex(daily_index).fillna(False).astype(bool)
  112. return daily, event_rows_from_active("btc_risk_short", active), daily
  113. def swing_activity(strategy: swing.Strategy, years: float, daily_index: pd.DatetimeIndex) -> tuple[pd.Series, list[dict[str, object]]]:
  114. frame = swing.resample_frame(swing.load_15m_frame(strategy.symbol, years), strategy.bar)
  115. result = swing.run_strategy(strategy, frame, None)
  116. trades = list(result["trades"])
  117. active = active_days_from_trades(daily_index, trades)
  118. return active, event_rows_from_trades(strategy.name, trades)
  119. def component_activity(years: float, daily_index: pd.DatetimeIndex) -> dict[str, tuple[pd.Series, list[dict[str, object]]]]:
  120. btc_risk_active, btc_risk_events, risk_state = btc_risk_activity(years, daily_index)
  121. activity = rotation_activity(years, daily_index, risk_state)
  122. activity["btc_risk_short"] = (btc_risk_active, btc_risk_events)
  123. eth_active, eth_events = swing_activity(
  124. swing.Strategy(
  125. "vol_expansion_short",
  126. "ETH-USDT-SWAP",
  127. "4H",
  128. {"fast": 20, "slow": 80, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8},
  129. ),
  130. years,
  131. daily_index,
  132. )
  133. btc_active, btc_events = swing_activity(
  134. swing.Strategy(
  135. "vol_expansion_short",
  136. "BTC-USDT-SWAP",
  137. "4H",
  138. {"fast": 30, "slow": 120, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8},
  139. ),
  140. years,
  141. daily_index,
  142. )
  143. activity["eth_4h_vol_short"] = (eth_active, eth_events)
  144. activity["btc_4h_vol_short"] = (btc_active, btc_events)
  145. eth_gated = eth_active & risk_state
  146. btc_gated = btc_active & risk_state
  147. activity["eth_4h_vol_short_gated"] = (eth_gated, event_rows_from_active("eth_4h_vol_short_gated", eth_gated))
  148. activity["btc_4h_vol_short_gated"] = (btc_gated, event_rows_from_active("btc_4h_vol_short_gated", btc_gated))
  149. return activity
  150. def summarize_candidate(
  151. row: pd.Series,
  152. component_series: dict[str, pd.Series],
  153. activity: dict[str, tuple[pd.Series, list[dict[str, object]]]],
  154. ) -> tuple[dict[str, object], list[dict[str, object]], pd.DataFrame]:
  155. name = str(row["name"])
  156. weights = candidate_weight_map(row)
  157. equity = fusion.combine_components(component_series, weights)
  158. returns = pd.DataFrame({key: fusion.component_returns(series) for key, series in component_series.items()}).reindex(equity.index).fillna(0.0)
  159. active = pd.DataFrame({key: value[0] for key, value in activity.items()}).reindex(equity.index).fillna(False).astype(bool)
  160. included = {key: weight for key, weight in weights.items() if weight > 0.0}
  161. weighted_active = pd.DataFrame({key: active[key].astype(float) * weight for key, weight in included.items()})
  162. target_changed = weighted_active.diff().abs().sum(axis=1).fillna(weighted_active.abs().sum(axis=1)) > 0.0
  163. component_contrib = pd.DataFrame({key: returns[key] * weight for key, weight in included.items()})
  164. fusion_return = component_contrib.sum(axis=1)
  165. component_rows: list[dict[str, object]] = []
  166. summary: dict[str, object] = {
  167. "name": name,
  168. "last_date": equity.index[-1].strftime("%Y-%m-%d"),
  169. "included_components": ",".join(included),
  170. "prior_report_trades_30d": int(row.get("trades_30d", 0)),
  171. "prior_report_trades_14d": int(row.get("trades_14d", 0)),
  172. "prior_report_return_30d": float(row.get("return_30d", 0.0)),
  173. "prior_report_return_14d": float(row.get("return_14d", 0.0)),
  174. }
  175. for label, offset in WINDOWS:
  176. start = equity.index[-1] - offset
  177. mask = equity.index >= start
  178. events = []
  179. for key in included:
  180. events.extend(event for event in activity[key][1] if start <= event["time"] <= equity.index[-1])
  181. summary[f"recomputed_return_{label}"] = recent_return(equity, offset)
  182. summary[f"component_entry_events_{label}"] = sum(1 for event in events if event["event"] == "entry")
  183. summary[f"component_exit_events_{label}"] = sum(1 for event in events if event["event"] == "exit")
  184. summary[f"fusion_target_change_days_{label}"] = int(target_changed.loc[mask].sum())
  185. summary[f"component_contribution_days_{label}"] = int((component_contrib.loc[mask].abs().sum(axis=1) > EPS).sum())
  186. for key, weight in included.items():
  187. component_events = [event for event in activity[key][1] if start <= event["time"] <= equity.index[-1]]
  188. component_rows.append(
  189. {
  190. "name": name,
  191. "window": label,
  192. "component": key,
  193. "component_weight": weight,
  194. "active_days": int(active.loc[mask, key].sum()),
  195. "contribution_days": int((component_contrib.loc[mask, key].abs() > EPS).sum()),
  196. "entry_events": sum(1 for event in component_events if event["event"] == "entry"),
  197. "exit_events": sum(1 for event in component_events if event["event"] == "exit"),
  198. "latest_event_time": max((event["time"] for event in component_events), default=pd.NaT),
  199. }
  200. )
  201. summary["recent_activity_found"] = (
  202. int(summary["component_entry_events_30d"]) > 0
  203. or int(summary["component_exit_events_30d"]) > 0
  204. or int(summary["fusion_target_change_days_30d"]) > 0
  205. or int(summary["component_contribution_days_30d"]) > 0
  206. )
  207. daily = pd.DataFrame(
  208. {
  209. "name": name,
  210. "date": equity.index.strftime("%Y-%m-%d"),
  211. "fusion_daily_return": fusion_return.to_numpy(),
  212. "fusion_target_changed": target_changed.to_numpy(),
  213. "active_component_count": active[list(included)].sum(axis=1).to_numpy(),
  214. "contributing_component_count": (component_contrib.abs() > EPS).sum(axis=1).to_numpy(),
  215. }
  216. )
  217. for key in included:
  218. daily[f"{key}_active"] = active[key].to_numpy()
  219. daily[f"{key}_contribution"] = component_contrib[key].to_numpy()
  220. return summary, component_rows, daily[daily["date"] >= (equity.index[-1] - pd.Timedelta(days=30)).strftime("%Y-%m-%d")]
  221. def report_text(command: str, paths: list[Path], summary: pd.DataFrame, components: pd.DataFrame, daily: pd.DataFrame) -> str:
  222. display_cols = [
  223. "name",
  224. "recomputed_return_30d",
  225. "recomputed_return_14d",
  226. "component_entry_events_30d",
  227. "component_exit_events_30d",
  228. "component_entry_events_14d",
  229. "component_exit_events_14d",
  230. "fusion_target_change_days_30d",
  231. "fusion_target_change_days_14d",
  232. "component_contribution_days_30d",
  233. "component_contribution_days_14d",
  234. "recent_activity_found",
  235. ]
  236. any_activity = bool(summary["recent_activity_found"].any()) if len(summary) else False
  237. verdict = (
  238. "The previous near-miss fusion report did not prove no recent activity; it lacked the required 30d/14d trigger accounting."
  239. if any_activity
  240. else "The checked near-miss fusion rows had no 30d/14d component activity in the rebuilt evidence."
  241. )
  242. observer = (
  243. "Worth a read-only observer only if it logs component-level activity and target changes; the current search output alone is insufficient."
  244. if any_activity
  245. else "Not worth a read-only observer for this near-miss set because the rebuilt evidence is inactive."
  246. )
  247. recent_daily = daily[(daily["fusion_target_changed"]) | (daily["contributing_component_count"] > 0)].head(80)
  248. return "\n".join(
  249. [
  250. "# Long-Short Fusion Recent Trigger Evidence",
  251. "",
  252. f"Run command: `{command}`",
  253. "",
  254. "Scope: offline reconstruction from local cached candles and existing reports only. No live executor, deployment, credentials, or order submission path was touched.",
  255. "",
  256. "Objective: check whether the previous long-short fusion near-miss rows truly had no recent trades, or whether `scripts/explore_eth_bidir_fusion_candidates.py` only filled 30d/14d trigger fields with zero because fusion outputs lacked those fields.",
  257. "",
  258. "Output files:",
  259. *[f"- `{path}`" for path in paths],
  260. "",
  261. f"Conclusion: {verdict}",
  262. f"Observer decision: {observer}",
  263. "",
  264. "## Candidate Summary",
  265. "",
  266. markdown_table(summary[display_cols]),
  267. "",
  268. "## Component Evidence",
  269. "",
  270. markdown_table(components.head(120)),
  271. "",
  272. "## Recent Daily Evidence",
  273. "",
  274. markdown_table(recent_daily),
  275. "",
  276. ]
  277. )
  278. def main() -> int:
  279. parser = argparse.ArgumentParser()
  280. parser.add_argument("--years", type=float, default=8.0)
  281. parser.add_argument("--limit", type=int, default=12)
  282. parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
  283. args = parser.parse_args()
  284. fusion.overlay.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
  285. candidates = load_near_miss_candidates(args.limit)
  286. components = fusion.build_components(args.years)
  287. component_series = {key: value[1] for key, value in components.items()}
  288. daily_index = next(iter(component_series.values())).index
  289. activity = component_activity(args.years, daily_index)
  290. summary_rows: list[dict[str, object]] = []
  291. component_rows: list[dict[str, object]] = []
  292. daily_frames: list[pd.DataFrame] = []
  293. full_rows = pd.read_csv(FUSION_TOTAL_PATH).set_index("name")
  294. for _, source_row in candidates.iterrows():
  295. row = full_rows.loc[str(source_row["name"])].copy()
  296. row["name"] = str(source_row["name"])
  297. for key in ("trades_30d", "trades_14d", "return_30d", "return_14d"):
  298. row[key] = source_row.get(key, 0)
  299. summary, rows, daily = summarize_candidate(row, component_series, activity)
  300. summary_rows.append(summary)
  301. component_rows.extend(rows)
  302. daily_frames.append(daily)
  303. summary_frame = pd.DataFrame(summary_rows)
  304. component_frame = pd.DataFrame(component_rows)
  305. daily_frame = pd.concat(daily_frames, ignore_index=True) if daily_frames else pd.DataFrame()
  306. args.output_dir.mkdir(parents=True, exist_ok=True)
  307. summary_path = args.output_dir / f"{PREFIX}.csv"
  308. component_path = args.output_dir / f"{PREFIX}-components.csv"
  309. daily_path = args.output_dir / f"{PREFIX}-daily.csv"
  310. report_path = args.output_dir / f"{PREFIX}.md"
  311. paths = [summary_path, component_path, daily_path, report_path]
  312. summary_frame.to_csv(summary_path, index=False)
  313. component_frame.to_csv(component_path, index=False)
  314. daily_frame.to_csv(daily_path, index=False)
  315. command = f"rtk .venv/bin/python scripts/diagnose_long_short_fusion_recent_triggers.py --years {args.years} --limit {args.limit}"
  316. report_path.write_text(report_text(command, paths, summary_frame, component_frame, daily_frame), encoding="utf-8")
  317. print(report_path)
  318. print(summary_frame.to_string(index=False))
  319. return 0
  320. if __name__ == "__main__":
  321. raise SystemExit(main())