| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- from __future__ import annotations
- import argparse
- import sys
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts import refine_expansion_rotation_risk as rotation_risk
- from scripts import search_expansion_rotation as rotation
- from scripts import search_long_short_fusion as fusion
- from scripts import search_short_bias_swing as swing
- from scripts.search_short_bias_overlay import markdown_table
- OUTPUT_DIR = Path("reports/eth-exploration")
- PREFIX = "long-short-fusion-recent-trigger-evidence"
- NEAR_MISS_PATH = OUTPUT_DIR / "eth-bidir-fusion-candidates.csv"
- FUSION_TOTAL_PATH = Path("reports/long-short-fusion/fusion-total.csv")
- WINDOWS = (("30d", pd.Timedelta(days=30)), ("14d", pd.Timedelta(days=14)))
- EPS = 1e-12
- def as_utc(value: object) -> pd.Timestamp:
- ts = pd.Timestamp(value)
- return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC")
- def event_rows_from_active(name: str, active: pd.Series) -> list[dict[str, object]]:
- active = active.fillna(False).astype(bool)
- rows: list[dict[str, object]] = []
- previous = False
- for ts, value in active.items():
- current = bool(value)
- if current != previous:
- rows.append({"component": name, "time": as_utc(ts), "event": "entry" if current else "exit"})
- previous = current
- return rows
- def event_rows_from_trades(name: str, trades: list[dict[str, object]]) -> list[dict[str, object]]:
- rows: list[dict[str, object]] = []
- for trade in trades:
- rows.append({"component": name, "time": as_utc(trade["entry_time"]), "event": "entry"})
- rows.append({"component": name, "time": as_utc(trade["exit_time"]), "event": "exit"})
- return rows
- def active_days_from_trades(index: pd.DatetimeIndex, trades: list[dict[str, object]]) -> pd.Series:
- active = pd.Series(False, index=index)
- for trade in trades:
- entry = as_utc(trade["entry_time"]).normalize()
- exit_ = as_utc(trade["exit_time"]).normalize()
- active.loc[(active.index >= entry) & (active.index <= exit_)] = True
- return active
- def load_near_miss_candidates(limit: int) -> pd.DataFrame:
- if NEAR_MISS_PATH.exists():
- frame = pd.read_csv(NEAR_MISS_PATH)
- frame = frame[(frame["source"] == "long_short_fusion") & frame["all_horizons_nonnegative"]].copy()
- if not frame.empty:
- return frame.sort_values("score", ascending=False).head(limit)
- total = pd.read_csv(FUSION_TOTAL_PATH)
- total = total[
- (total["total_return"] >= 0.0)
- & (total["h3y_return"] >= 0.0)
- & (total["h1y_return"] >= 0.0)
- & (total["h6m_return"] >= 0.0)
- & (total["h3m_return"] >= 0.0)
- ].copy()
- return total.sort_values(["h1y_return", "h6m_return", "h3m_return"], ascending=False).head(limit)
- def candidate_weight_map(row: pd.Series) -> dict[str, float]:
- return {
- str(row["long_variant"]): float(row["long_weight"]),
- "btc_risk_short": float(row["btc_risk_short_weight"]),
- "eth_4h_vol_short": float(row["eth_4h_vol_short_weight"]),
- "btc_4h_vol_short": float(row["btc_4h_vol_short_weight"]),
- "eth_4h_vol_short_gated": float(row["eth_4h_vol_short_gated_weight"]),
- "btc_4h_vol_short_gated": float(row["btc_4h_vol_short_gated_weight"]),
- }
- def recent_return(series: pd.Series, offset: pd.Timedelta) -> float:
- scoped = series[series.index >= series.index[-1] - offset]
- if len(scoped) < 2:
- return 0.0
- return float(scoped.iloc[-1] / scoped.iloc[0] - 1.0)
- def rotation_activity(years: float, daily_index: pd.DatetimeIndex, risk_state_daily: pd.Series) -> dict[str, tuple[pd.Series, list[dict[str, object]]]]:
- row = pd.read_csv(rotation_risk.OUTPUT_DIR / "rotation-risk-top.csv").iloc[0]
- base = rotation_risk.params_from_row(row)
- params = rotation_risk.RiskParams(base, float(row["leverage"]), float(row["exposure"]), float(row["vol_target"]))
- rotation.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- frames = rotation.load_symbol_bar_frames(years)
- closes = rotation.aligned_closes(frames, base)
- weights = rotation_risk.apply_risk_controls(closes, rotation.target_weights(closes, base), params)
- executed = weights.shift(1).fillna(0.0)
- hourly_active = executed.abs().sum(axis=1) > 0.0
- base_active = hourly_active.resample("1D").max().reindex(daily_index).fillna(False).astype(bool)
- risk = risk_state_daily.reindex(daily_index).fillna(False).astype(bool)
- return {
- "long_rotation": (base_active, event_rows_from_active("long_rotation", base_active)),
- "long_rotation_riskoff70": (base_active, event_rows_from_active("long_rotation_riskoff70", base_active)),
- "long_rotation_riskoff50": (base_active, event_rows_from_active("long_rotation_riskoff50", base_active)),
- "long_rotation_riskoff25": (base_active, event_rows_from_active("long_rotation_riskoff25", base_active)),
- "long_rotation_riskoff00": (base_active & ~risk, event_rows_from_active("long_rotation_riskoff00", base_active & ~risk)),
- }
- def btc_risk_activity(years: float, daily_index: pd.DatetimeIndex) -> tuple[pd.Series, list[dict[str, object]], pd.Series]:
- params = fusion.overlay.BtcRiskShort(
- family="btc_risk_pair",
- bar="1h",
- btc_trend=1440,
- btc_lookback=336,
- symbol_trend=720,
- vol_lookback=336,
- btc_max_momentum=-0.005,
- btc_min_drop=0.025,
- min_btc_vol=0.012,
- symbol_max_momentum=-0.010,
- short_symbols=("ETH-USDT-SWAP",),
- )
- fusion.overlay.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- frames = fusion.overlay.load_frames(years)
- closes = pd.DataFrame({symbol: frames[(symbol, params.bar)]["close"] for symbol in ("BTC-USDT-SWAP", "ETH-USDT-SWAP")}).dropna()
- weights = fusion.overlay.short_weights(closes, params)
- active = weights.shift(1).fillna(0.0)["ETH-USDT-SWAP"] < 0.0
- daily = active.resample("1D").max().reindex(daily_index).fillna(False).astype(bool)
- return daily, event_rows_from_active("btc_risk_short", active), daily
- def swing_activity(strategy: swing.Strategy, years: float, daily_index: pd.DatetimeIndex) -> tuple[pd.Series, list[dict[str, object]]]:
- frame = swing.resample_frame(swing.load_15m_frame(strategy.symbol, years), strategy.bar)
- result = swing.run_strategy(strategy, frame, None)
- trades = list(result["trades"])
- active = active_days_from_trades(daily_index, trades)
- return active, event_rows_from_trades(strategy.name, trades)
- def component_activity(years: float, daily_index: pd.DatetimeIndex) -> dict[str, tuple[pd.Series, list[dict[str, object]]]]:
- btc_risk_active, btc_risk_events, risk_state = btc_risk_activity(years, daily_index)
- activity = rotation_activity(years, daily_index, risk_state)
- activity["btc_risk_short"] = (btc_risk_active, btc_risk_events)
- eth_active, eth_events = swing_activity(
- swing.Strategy(
- "vol_expansion_short",
- "ETH-USDT-SWAP",
- "4H",
- {"fast": 20, "slow": 80, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8},
- ),
- years,
- daily_index,
- )
- btc_active, btc_events = swing_activity(
- swing.Strategy(
- "vol_expansion_short",
- "BTC-USDT-SWAP",
- "4H",
- {"fast": 30, "slow": 120, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8},
- ),
- years,
- daily_index,
- )
- activity["eth_4h_vol_short"] = (eth_active, eth_events)
- activity["btc_4h_vol_short"] = (btc_active, btc_events)
- eth_gated = eth_active & risk_state
- btc_gated = btc_active & risk_state
- activity["eth_4h_vol_short_gated"] = (eth_gated, event_rows_from_active("eth_4h_vol_short_gated", eth_gated))
- activity["btc_4h_vol_short_gated"] = (btc_gated, event_rows_from_active("btc_4h_vol_short_gated", btc_gated))
- return activity
- def summarize_candidate(
- row: pd.Series,
- component_series: dict[str, pd.Series],
- activity: dict[str, tuple[pd.Series, list[dict[str, object]]]],
- ) -> tuple[dict[str, object], list[dict[str, object]], pd.DataFrame]:
- name = str(row["name"])
- weights = candidate_weight_map(row)
- equity = fusion.combine_components(component_series, weights)
- returns = pd.DataFrame({key: fusion.component_returns(series) for key, series in component_series.items()}).reindex(equity.index).fillna(0.0)
- active = pd.DataFrame({key: value[0] for key, value in activity.items()}).reindex(equity.index).fillna(False).astype(bool)
- included = {key: weight for key, weight in weights.items() if weight > 0.0}
- weighted_active = pd.DataFrame({key: active[key].astype(float) * weight for key, weight in included.items()})
- target_changed = weighted_active.diff().abs().sum(axis=1).fillna(weighted_active.abs().sum(axis=1)) > 0.0
- component_contrib = pd.DataFrame({key: returns[key] * weight for key, weight in included.items()})
- fusion_return = component_contrib.sum(axis=1)
- component_rows: list[dict[str, object]] = []
- summary: dict[str, object] = {
- "name": name,
- "last_date": equity.index[-1].strftime("%Y-%m-%d"),
- "included_components": ",".join(included),
- "prior_report_trades_30d": int(row.get("trades_30d", 0)),
- "prior_report_trades_14d": int(row.get("trades_14d", 0)),
- "prior_report_return_30d": float(row.get("return_30d", 0.0)),
- "prior_report_return_14d": float(row.get("return_14d", 0.0)),
- }
- for label, offset in WINDOWS:
- start = equity.index[-1] - offset
- mask = equity.index >= start
- events = []
- for key in included:
- events.extend(event for event in activity[key][1] if start <= event["time"] <= equity.index[-1])
- summary[f"recomputed_return_{label}"] = recent_return(equity, offset)
- summary[f"component_entry_events_{label}"] = sum(1 for event in events if event["event"] == "entry")
- summary[f"component_exit_events_{label}"] = sum(1 for event in events if event["event"] == "exit")
- summary[f"fusion_target_change_days_{label}"] = int(target_changed.loc[mask].sum())
- summary[f"component_contribution_days_{label}"] = int((component_contrib.loc[mask].abs().sum(axis=1) > EPS).sum())
- for key, weight in included.items():
- component_events = [event for event in activity[key][1] if start <= event["time"] <= equity.index[-1]]
- component_rows.append(
- {
- "name": name,
- "window": label,
- "component": key,
- "component_weight": weight,
- "active_days": int(active.loc[mask, key].sum()),
- "contribution_days": int((component_contrib.loc[mask, key].abs() > EPS).sum()),
- "entry_events": sum(1 for event in component_events if event["event"] == "entry"),
- "exit_events": sum(1 for event in component_events if event["event"] == "exit"),
- "latest_event_time": max((event["time"] for event in component_events), default=pd.NaT),
- }
- )
- summary["recent_activity_found"] = (
- int(summary["component_entry_events_30d"]) > 0
- or int(summary["component_exit_events_30d"]) > 0
- or int(summary["fusion_target_change_days_30d"]) > 0
- or int(summary["component_contribution_days_30d"]) > 0
- )
- daily = pd.DataFrame(
- {
- "name": name,
- "date": equity.index.strftime("%Y-%m-%d"),
- "fusion_daily_return": fusion_return.to_numpy(),
- "fusion_target_changed": target_changed.to_numpy(),
- "active_component_count": active[list(included)].sum(axis=1).to_numpy(),
- "contributing_component_count": (component_contrib.abs() > EPS).sum(axis=1).to_numpy(),
- }
- )
- for key in included:
- daily[f"{key}_active"] = active[key].to_numpy()
- daily[f"{key}_contribution"] = component_contrib[key].to_numpy()
- return summary, component_rows, daily[daily["date"] >= (equity.index[-1] - pd.Timedelta(days=30)).strftime("%Y-%m-%d")]
- def report_text(command: str, paths: list[Path], summary: pd.DataFrame, components: pd.DataFrame, daily: pd.DataFrame) -> str:
- display_cols = [
- "name",
- "recomputed_return_30d",
- "recomputed_return_14d",
- "component_entry_events_30d",
- "component_exit_events_30d",
- "component_entry_events_14d",
- "component_exit_events_14d",
- "fusion_target_change_days_30d",
- "fusion_target_change_days_14d",
- "component_contribution_days_30d",
- "component_contribution_days_14d",
- "recent_activity_found",
- ]
- any_activity = bool(summary["recent_activity_found"].any()) if len(summary) else False
- verdict = (
- "The previous near-miss fusion report did not prove no recent activity; it lacked the required 30d/14d trigger accounting."
- if any_activity
- else "The checked near-miss fusion rows had no 30d/14d component activity in the rebuilt evidence."
- )
- observer = (
- "Worth a read-only observer only if it logs component-level activity and target changes; the current search output alone is insufficient."
- if any_activity
- else "Not worth a read-only observer for this near-miss set because the rebuilt evidence is inactive."
- )
- recent_daily = daily[(daily["fusion_target_changed"]) | (daily["contributing_component_count"] > 0)].head(80)
- return "\n".join(
- [
- "# Long-Short Fusion Recent Trigger Evidence",
- "",
- f"Run command: `{command}`",
- "",
- "Scope: offline reconstruction from local cached candles and existing reports only. No live executor, deployment, credentials, or order submission path was touched.",
- "",
- "Objective: check whether the previous long-short fusion near-miss rows truly had no recent trades, or whether `scripts/explore_eth_bidir_fusion_candidates.py` only filled 30d/14d trigger fields with zero because fusion outputs lacked those fields.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- f"Conclusion: {verdict}",
- f"Observer decision: {observer}",
- "",
- "## Candidate Summary",
- "",
- markdown_table(summary[display_cols]),
- "",
- "## Component Evidence",
- "",
- markdown_table(components.head(120)),
- "",
- "## Recent Daily Evidence",
- "",
- markdown_table(recent_daily),
- "",
- ]
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=8.0)
- parser.add_argument("--limit", type=int, default=12)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- fusion.overlay.SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- candidates = load_near_miss_candidates(args.limit)
- components = fusion.build_components(args.years)
- component_series = {key: value[1] for key, value in components.items()}
- daily_index = next(iter(component_series.values())).index
- activity = component_activity(args.years, daily_index)
- summary_rows: list[dict[str, object]] = []
- component_rows: list[dict[str, object]] = []
- daily_frames: list[pd.DataFrame] = []
- full_rows = pd.read_csv(FUSION_TOTAL_PATH).set_index("name")
- for _, source_row in candidates.iterrows():
- row = full_rows.loc[str(source_row["name"])].copy()
- row["name"] = str(source_row["name"])
- for key in ("trades_30d", "trades_14d", "return_30d", "return_14d"):
- row[key] = source_row.get(key, 0)
- summary, rows, daily = summarize_candidate(row, component_series, activity)
- summary_rows.append(summary)
- component_rows.extend(rows)
- daily_frames.append(daily)
- summary_frame = pd.DataFrame(summary_rows)
- component_frame = pd.DataFrame(component_rows)
- daily_frame = pd.concat(daily_frames, ignore_index=True) if daily_frames else pd.DataFrame()
- args.output_dir.mkdir(parents=True, exist_ok=True)
- summary_path = args.output_dir / f"{PREFIX}.csv"
- component_path = args.output_dir / f"{PREFIX}-components.csv"
- daily_path = args.output_dir / f"{PREFIX}-daily.csv"
- report_path = args.output_dir / f"{PREFIX}.md"
- paths = [summary_path, component_path, daily_path, report_path]
- summary_frame.to_csv(summary_path, index=False)
- component_frame.to_csv(component_path, index=False)
- daily_frame.to_csv(daily_path, index=False)
- command = f"rtk .venv/bin/python scripts/diagnose_long_short_fusion_recent_triggers.py --years {args.years} --limit {args.limit}"
- report_path.write_text(report_text(command, paths, summary_frame, component_frame, daily_frame), encoding="utf-8")
- print(report_path)
- print(summary_frame.to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|