from __future__ import annotations import argparse import sys from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts import refine_expansion_rotation_risk as rotation_risk from scripts import search_expansion_rotation as rotation from scripts import search_long_short_fusion as fusion from scripts import search_short_bias_swing as swing from scripts.search_short_bias_overlay import markdown_table OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "long-short-fusion-recent-trigger-evidence" NEAR_MISS_PATH = OUTPUT_DIR / "eth-bidir-fusion-candidates.csv" FUSION_TOTAL_PATH = Path("reports/long-short-fusion/fusion-total.csv") WINDOWS = (("30d", pd.Timedelta(days=30)), ("14d", pd.Timedelta(days=14))) EPS = 1e-12 def as_utc(value: object) -> pd.Timestamp: ts = pd.Timestamp(value) return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC") def event_rows_from_active(name: str, active: pd.Series) -> list[dict[str, object]]: active = active.fillna(False).astype(bool) rows: list[dict[str, object]] = [] previous = False for ts, value in active.items(): current = bool(value) if current != previous: rows.append({"component": name, "time": as_utc(ts), "event": "entry" if current else "exit"}) previous = current return rows def event_rows_from_trades(name: str, trades: list[dict[str, object]]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for trade in trades: rows.append({"component": name, "time": as_utc(trade["entry_time"]), "event": "entry"}) rows.append({"component": name, "time": as_utc(trade["exit_time"]), "event": "exit"}) return rows def active_days_from_trades(index: pd.DatetimeIndex, trades: list[dict[str, object]]) -> pd.Series: active = pd.Series(False, index=index) for trade in trades: entry = as_utc(trade["entry_time"]).normalize() exit_ = as_utc(trade["exit_time"]).normalize() active.loc[(active.index >= entry) & (active.index <= exit_)] = True return active def load_near_miss_candidates(limit: int) -> pd.DataFrame: if NEAR_MISS_PATH.exists(): frame = pd.read_csv(NEAR_MISS_PATH) frame = frame[(frame["source"] == "long_short_fusion") & frame["all_horizons_nonnegative"]].copy() if not frame.empty: return frame.sort_values("score", ascending=False).head(limit) total = pd.read_csv(FUSION_TOTAL_PATH) total = total[ (total["total_return"] >= 0.0) & (total["h3y_return"] >= 0.0) & (total["h1y_return"] >= 0.0) & (total["h6m_return"] >= 0.0) & (total["h3m_return"] >= 0.0) ].copy() return total.sort_values(["h1y_return", "h6m_return", "h3m_return"], ascending=False).head(limit) def candidate_weight_map(row: pd.Series) -> dict[str, float]: return { str(row["long_variant"]): float(row["long_weight"]), "btc_risk_short": float(row["btc_risk_short_weight"]), "eth_4h_vol_short": float(row["eth_4h_vol_short_weight"]), "btc_4h_vol_short": float(row["btc_4h_vol_short_weight"]), "eth_4h_vol_short_gated": float(row["eth_4h_vol_short_gated_weight"]), "btc_4h_vol_short_gated": float(row["btc_4h_vol_short_gated_weight"]), } def recent_return(series: pd.Series, offset: pd.Timedelta) -> float: scoped = series[series.index >= series.index[-1] - offset] if len(scoped) < 2: return 0.0 return float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) def rotation_activity(years: float, daily_index: pd.DatetimeIndex, risk_state_daily: pd.Series) -> dict[str, tuple[pd.Series, list[dict[str, object]]]]: row = pd.read_csv(rotation_risk.OUTPUT_DIR / "rotation-risk-top.csv").iloc[0] base = rotation_risk.params_from_row(row) params = rotation_risk.RiskParams(base, float(row["leverage"]), float(row["exposure"]), float(row["vol_target"])) rotation.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") frames = rotation.load_symbol_bar_frames(years) closes = rotation.aligned_closes(frames, base) weights = rotation_risk.apply_risk_controls(closes, rotation.target_weights(closes, base), params) executed = weights.shift(1).fillna(0.0) hourly_active = executed.abs().sum(axis=1) > 0.0 base_active = hourly_active.resample("1D").max().reindex(daily_index).fillna(False).astype(bool) risk = risk_state_daily.reindex(daily_index).fillna(False).astype(bool) return { "long_rotation": (base_active, event_rows_from_active("long_rotation", base_active)), "long_rotation_riskoff70": (base_active, event_rows_from_active("long_rotation_riskoff70", base_active)), "long_rotation_riskoff50": (base_active, event_rows_from_active("long_rotation_riskoff50", base_active)), "long_rotation_riskoff25": (base_active, event_rows_from_active("long_rotation_riskoff25", base_active)), "long_rotation_riskoff00": (base_active & ~risk, event_rows_from_active("long_rotation_riskoff00", base_active & ~risk)), } def btc_risk_activity(years: float, daily_index: pd.DatetimeIndex) -> tuple[pd.Series, list[dict[str, object]], pd.Series]: params = fusion.overlay.BtcRiskShort( family="btc_risk_pair", bar="1h", btc_trend=1440, btc_lookback=336, symbol_trend=720, vol_lookback=336, btc_max_momentum=-0.005, btc_min_drop=0.025, min_btc_vol=0.012, symbol_max_momentum=-0.010, short_symbols=("ETH-USDT-SWAP",), ) fusion.overlay.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") frames = fusion.overlay.load_frames(years) closes = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in ("BTC-USDT-SWAP", "ETH-USDT-SWAP")}).dropna() weights = fusion.overlay.short_weights(closes, params) active = weights.shift(1).fillna(0.0)["ETH-USDT-SWAP"] < 0.0 daily = active.resample("1D").max().reindex(daily_index).fillna(False).astype(bool) return daily, event_rows_from_active("btc_risk_short", active), daily def swing_activity(strategy: swing.Strategy, years: float, daily_index: pd.DatetimeIndex) -> tuple[pd.Series, list[dict[str, object]]]: frame = swing.resample_frame(swing.load_15m_frame(strategy.symbol, years), strategy.bar) result = swing.run_strategy(strategy, frame, None) trades = list(result["trades"]) active = active_days_from_trades(daily_index, trades) return active, event_rows_from_trades(strategy.name, trades) def component_activity(years: float, daily_index: pd.DatetimeIndex) -> dict[str, tuple[pd.Series, list[dict[str, object]]]]: btc_risk_active, btc_risk_events, risk_state = btc_risk_activity(years, daily_index) activity = rotation_activity(years, daily_index, risk_state) activity["btc_risk_short"] = (btc_risk_active, btc_risk_events) eth_active, eth_events = swing_activity( swing.Strategy( "vol_expansion_short", "ETH-USDT-SWAP", "4H", {"fast": 20, "slow": 80, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8}, ), years, daily_index, ) btc_active, btc_events = swing_activity( swing.Strategy( "vol_expansion_short", "BTC-USDT-SWAP", "4H", {"fast": 30, "slow": 120, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8}, ), years, daily_index, ) activity["eth_4h_vol_short"] = (eth_active, eth_events) activity["btc_4h_vol_short"] = (btc_active, btc_events) eth_gated = eth_active & risk_state btc_gated = btc_active & risk_state activity["eth_4h_vol_short_gated"] = (eth_gated, event_rows_from_active("eth_4h_vol_short_gated", eth_gated)) activity["btc_4h_vol_short_gated"] = (btc_gated, event_rows_from_active("btc_4h_vol_short_gated", btc_gated)) return activity def summarize_candidate( row: pd.Series, component_series: dict[str, pd.Series], activity: dict[str, tuple[pd.Series, list[dict[str, object]]]], ) -> tuple[dict[str, object], list[dict[str, object]], pd.DataFrame]: name = str(row["name"]) weights = candidate_weight_map(row) equity = fusion.combine_components(component_series, weights) returns = pd.DataFrame({key: fusion.component_returns(series) for key, series in component_series.items()}).reindex(equity.index).fillna(0.0) active = pd.DataFrame({key: value[0] for key, value in activity.items()}).reindex(equity.index).fillna(False).astype(bool) included = {key: weight for key, weight in weights.items() if weight > 0.0} weighted_active = pd.DataFrame({key: active[key].astype(float) * weight for key, weight in included.items()}) target_changed = weighted_active.diff().abs().sum(axis=1).fillna(weighted_active.abs().sum(axis=1)) > 0.0 component_contrib = pd.DataFrame({key: returns[key] * weight for key, weight in included.items()}) fusion_return = component_contrib.sum(axis=1) component_rows: list[dict[str, object]] = [] summary: dict[str, object] = { "name": name, "last_date": equity.index[-1].strftime("%Y-%m-%d"), "included_components": ",".join(included), "prior_report_trades_30d": int(row.get("trades_30d", 0)), "prior_report_trades_14d": int(row.get("trades_14d", 0)), "prior_report_return_30d": float(row.get("return_30d", 0.0)), "prior_report_return_14d": float(row.get("return_14d", 0.0)), } for label, offset in WINDOWS: start = equity.index[-1] - offset mask = equity.index >= start events = [] for key in included: events.extend(event for event in activity[key][1] if start <= event["time"] <= equity.index[-1]) summary[f"recomputed_return_{label}"] = recent_return(equity, offset) summary[f"component_entry_events_{label}"] = sum(1 for event in events if event["event"] == "entry") summary[f"component_exit_events_{label}"] = sum(1 for event in events if event["event"] == "exit") summary[f"fusion_target_change_days_{label}"] = int(target_changed.loc[mask].sum()) summary[f"component_contribution_days_{label}"] = int((component_contrib.loc[mask].abs().sum(axis=1) > EPS).sum()) for key, weight in included.items(): component_events = [event for event in activity[key][1] if start <= event["time"] <= equity.index[-1]] component_rows.append( { "name": name, "window": label, "component": key, "component_weight": weight, "active_days": int(active.loc[mask, key].sum()), "contribution_days": int((component_contrib.loc[mask, key].abs() > EPS).sum()), "entry_events": sum(1 for event in component_events if event["event"] == "entry"), "exit_events": sum(1 for event in component_events if event["event"] == "exit"), "latest_event_time": max((event["time"] for event in component_events), default=pd.NaT), } ) summary["recent_activity_found"] = ( int(summary["component_entry_events_30d"]) > 0 or int(summary["component_exit_events_30d"]) > 0 or int(summary["fusion_target_change_days_30d"]) > 0 or int(summary["component_contribution_days_30d"]) > 0 ) daily = pd.DataFrame( { "name": name, "date": equity.index.strftime("%Y-%m-%d"), "fusion_daily_return": fusion_return.to_numpy(), "fusion_target_changed": target_changed.to_numpy(), "active_component_count": active[list(included)].sum(axis=1).to_numpy(), "contributing_component_count": (component_contrib.abs() > EPS).sum(axis=1).to_numpy(), } ) for key in included: daily[f"{key}_active"] = active[key].to_numpy() daily[f"{key}_contribution"] = component_contrib[key].to_numpy() return summary, component_rows, daily[daily["date"] >= (equity.index[-1] - pd.Timedelta(days=30)).strftime("%Y-%m-%d")] def report_text(command: str, paths: list[Path], summary: pd.DataFrame, components: pd.DataFrame, daily: pd.DataFrame) -> str: display_cols = [ "name", "recomputed_return_30d", "recomputed_return_14d", "component_entry_events_30d", "component_exit_events_30d", "component_entry_events_14d", "component_exit_events_14d", "fusion_target_change_days_30d", "fusion_target_change_days_14d", "component_contribution_days_30d", "component_contribution_days_14d", "recent_activity_found", ] any_activity = bool(summary["recent_activity_found"].any()) if len(summary) else False verdict = ( "The previous near-miss fusion report did not prove no recent activity; it lacked the required 30d/14d trigger accounting." if any_activity else "The checked near-miss fusion rows had no 30d/14d component activity in the rebuilt evidence." ) observer = ( "Worth a read-only observer only if it logs component-level activity and target changes; the current search output alone is insufficient." if any_activity else "Not worth a read-only observer for this near-miss set because the rebuilt evidence is inactive." ) recent_daily = daily[(daily["fusion_target_changed"]) | (daily["contributing_component_count"] > 0)].head(80) return "\n".join( [ "# Long-Short Fusion Recent Trigger Evidence", "", f"Run command: `{command}`", "", "Scope: offline reconstruction from local cached candles and existing reports only. No live executor, deployment, credentials, or order submission path was touched.", "", "Objective: check whether the previous long-short fusion near-miss rows truly had no recent trades, or whether `scripts/explore_eth_bidir_fusion_candidates.py` only filled 30d/14d trigger fields with zero because fusion outputs lacked those fields.", "", "Output files:", *[f"- `{path}`" for path in paths], "", f"Conclusion: {verdict}", f"Observer decision: {observer}", "", "## Candidate Summary", "", markdown_table(summary[display_cols]), "", "## Component Evidence", "", markdown_table(components.head(120)), "", "## Recent Daily Evidence", "", markdown_table(recent_daily), "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=8.0) parser.add_argument("--limit", type=int, default=12) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() fusion.overlay.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") candidates = load_near_miss_candidates(args.limit) components = fusion.build_components(args.years) component_series = {key: value[1] for key, value in components.items()} daily_index = next(iter(component_series.values())).index activity = component_activity(args.years, daily_index) summary_rows: list[dict[str, object]] = [] component_rows: list[dict[str, object]] = [] daily_frames: list[pd.DataFrame] = [] full_rows = pd.read_csv(FUSION_TOTAL_PATH).set_index("name") for _, source_row in candidates.iterrows(): row = full_rows.loc[str(source_row["name"])].copy() row["name"] = str(source_row["name"]) for key in ("trades_30d", "trades_14d", "return_30d", "return_14d"): row[key] = source_row.get(key, 0) summary, rows, daily = summarize_candidate(row, component_series, activity) summary_rows.append(summary) component_rows.extend(rows) daily_frames.append(daily) summary_frame = pd.DataFrame(summary_rows) component_frame = pd.DataFrame(component_rows) daily_frame = pd.concat(daily_frames, ignore_index=True) if daily_frames else pd.DataFrame() args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / f"{PREFIX}.csv" component_path = args.output_dir / f"{PREFIX}-components.csv" daily_path = args.output_dir / f"{PREFIX}-daily.csv" report_path = args.output_dir / f"{PREFIX}.md" paths = [summary_path, component_path, daily_path, report_path] summary_frame.to_csv(summary_path, index=False) component_frame.to_csv(component_path, index=False) daily_frame.to_csv(daily_path, index=False) command = f"rtk .venv/bin/python scripts/diagnose_long_short_fusion_recent_triggers.py --years {args.years} --limit {args.limit}" report_path.write_text(report_text(command, paths, summary_frame, component_frame, daily_frame), encoding="utf-8") print(report_path) print(summary_frame.to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())