from __future__ import annotations import argparse import sys from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts.search_eth_bearish_price_proxy import Spec, joined_frames, load_frame, resample, run_spec from scripts import search_short_bias_swing as swing from scripts import search_short_overlay_mix as overlay from scripts.search_long_short_fusion import ( INITIAL_EQUITY, component_returns, daily, gated_equity, horizon_rows, markdown_table, metrics, riskoff_long_equity, swing_short, ) OUTPUT_DIR = Path("reports/long-short-fusion") SELECTED_FUSION = OUTPUT_DIR / "fusion-selected.csv" PREFIX = "eth-crash-follow-fusion-overlay-search" CRASH_FOLLOW = Spec("crash_follow", "1H", 20, 120, 8, 0.035, 0.02, 0.06, 96, "btc_riskoff") OVERLAY_WEIGHTS = (0.00, 0.02, 0.04, 0.06, 0.08, 0.10, 0.12) COMPONENT_WEIGHT_COLUMNS = { "long_rotation": "long_weight", "long_rotation_riskoff70": "long_weight", "long_rotation_riskoff50": "long_weight", "long_rotation_riskoff25": "long_weight", "long_rotation_riskoff00": "long_weight", "btc_risk_short": "btc_risk_short_weight", "eth_4h_vol_short": "eth_4h_vol_short_weight", "btc_4h_vol_short": "btc_4h_vol_short_weight", "eth_4h_vol_short_gated": "eth_4h_vol_short_gated_weight", "btc_4h_vol_short_gated": "btc_4h_vol_short_gated_weight", } def crash_follow_equity(spec: Spec) -> pd.Series: eth = load_frame("ETH-USDT-SWAP") btc = load_frame("BTC-USDT-SWAP") frame = joined_frames(resample(eth, spec.bar), resample(btc, spec.bar)) equity, _ = run_spec(spec, frame) equity.name = spec.name return equity def fast_btc_risk_short(years: float) -> tuple[pd.Series, pd.Series]: params = overlay.BtcRiskShort( family="btc_risk_pair", bar="1h", btc_trend=1440, btc_lookback=336, symbol_trend=720, vol_lookback=336, btc_max_momentum=-0.005, btc_min_drop=0.025, min_btc_vol=0.012, symbol_max_momentum=-0.010, short_symbols=("ETH-USDT-SWAP",), ) eth = load_frame("ETH-USDT-SWAP") btc = load_frame("BTC-USDT-SWAP") full = pd.DataFrame( { "BTC-USDT-SWAP": resample(btc, "1H")["close"], "ETH-USDT-SWAP": resample(eth, "1H")["close"], } ).dropna() cutoff = full.index[-1] - pd.DateOffset(years=years) closes = full[full.index >= cutoff] weights = overlay.short_weights(closes, params) equity = daily(overlay.equity_from_weights(closes, weights)) risk_state = daily((weights["ETH-USDT-SWAP"].abs() > 0.0).astype(float)) return equity, risk_state def required_component_keys(selected: pd.DataFrame) -> set[str]: keys = set(selected["long_variant"].astype(str)) for key, column in COMPONENT_WEIGHT_COLUMNS.items(): if key.startswith("long_rotation"): continue if column in selected.columns and bool((selected[column].astype(float) > 0.0).any()): keys.add(key) return keys def build_required_components(years: float, selected: pd.DataFrame) -> dict[str, pd.Series]: required = required_component_keys(selected) base_name, base_equity, _ = overlay.rotation_base(years) base = daily(base_equity) btc_risk_equity, risk_state = fast_btc_risk_short(years) components: dict[str, pd.Series] = {} riskoff_multipliers = { "long_rotation": 1.0, "long_rotation_riskoff70": 0.70, "long_rotation_riskoff50": 0.50, "long_rotation_riskoff25": 0.25, "long_rotation_riskoff00": 0.0, } for key, multiplier in riskoff_multipliers.items(): if key in required: components[key] = base if key == "long_rotation" else riskoff_long_equity(base, risk_state, multiplier) if "btc_risk_short" in required: components["btc_risk_short"] = btc_risk_equity if "eth_4h_vol_short" in required or "eth_4h_vol_short_gated" in required: _, eth_equity, _ = swing_short( swing.Strategy( family="vol_expansion_short", symbol="ETH-USDT-SWAP", bar="4H", params={ "fast": 20, "slow": 80, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8, }, ), years, ) if "eth_4h_vol_short" in required: components["eth_4h_vol_short"] = eth_equity if "eth_4h_vol_short_gated" in required: components["eth_4h_vol_short_gated"] = gated_equity(eth_equity, risk_state) if "btc_4h_vol_short" in required or "btc_4h_vol_short_gated" in required: _, btc_equity, _ = swing_short( swing.Strategy( family="vol_expansion_short", symbol="BTC-USDT-SWAP", bar="4H", params={ "fast": 30, "slow": 120, "entry": 20, "exit": 10, "atr": 14, "stop_atr": 3.0, "take_atr": 6.0, "max_hold": 120, "vol_window": 120, "vol_quantile": 0.8, }, ), years, ) if "btc_4h_vol_short" in required: components["btc_4h_vol_short"] = btc_equity if "btc_4h_vol_short_gated" in required: components["btc_4h_vol_short_gated"] = gated_equity(btc_equity, risk_state) missing = required - set(components) if missing: raise ValueError(f"missing required components: {sorted(missing)}; base={base_name}") return components def weights_from_row(row: pd.Series) -> dict[str, float]: weights: dict[str, float] = {} long_key = str(row["long_variant"]) weights[long_key] = float(row["long_weight"]) for key, column in COMPONENT_WEIGHT_COLUMNS.items(): if key.startswith("long_rotation"): continue weight = float(row[column]) if weight > 0.0: weights[key] = weight return weights def overlay_equity(component_series: dict[str, pd.Series], weights: dict[str, float], crash: pd.Series, overlay_weight: float) -> pd.Series: weighted = { name: component_returns(series) * weights[name] for name, series in component_series.items() if weights.get(name, 0.0) > 0.0 } weighted["eth_crash_follow"] = component_returns(crash) * overlay_weight returns = pd.DataFrame(weighted).dropna().sum(axis=1) equity = INITIAL_EQUITY * (1.0 + returns).cumprod() equity.name = "equity" return equity def horizon_metrics_by_label(series: pd.Series) -> dict[str, dict[str, object]]: return {str(row["horizon"]): row for row in horizon_rows("", "", series)} def search_rows(selected: pd.DataFrame, component_series: dict[str, pd.Series], crash: pd.Series) -> pd.DataFrame: rows: list[dict[str, object]] = [] for _, selected_row in selected.iterrows(): base_name = str(selected_row["name"]) weights = weights_from_row(selected_row) base_equity = overlay_equity(component_series, weights, crash, 0.0) base_horizons = horizon_metrics_by_label(base_equity) for overlay_weight in OVERLAY_WEIGHTS: equity = overlay_equity(component_series, weights, crash, overlay_weight) for horizon, row in horizon_metrics_by_label(equity).items(): base = base_horizons[horizon] rows.append( { "base_name": base_name, "overlay_weight": overlay_weight, "horizon": horizon, "start": row["start"], "end": row["end"], "total_return": row["total_return"], "annualized_return": row["annualized_return"], "max_drawdown": row["max_drawdown"], "calmar": row["calmar"], "baseline_total_return": base["total_return"], "baseline_max_drawdown": base["max_drawdown"], "baseline_calmar": base["calmar"], "delta_total_return": float(row["total_return"]) - float(base["total_return"]), "delta_max_drawdown": float(row["max_drawdown"]) - float(base["max_drawdown"]), "delta_calmar": float(row["calmar"]) - float(base["calmar"]), } ) return pd.DataFrame(rows) def summary_rows(results: pd.DataFrame) -> pd.DataFrame: rows: list[dict[str, object]] = [] grouped = results[results["overlay_weight"] > 0.0].groupby(["base_name", "overlay_weight"]) for (base_name, overlay_weight), group in grouped: full = group[group["horizon"] == "full"].iloc[0] rows.append( { "base_name": base_name, "overlay_weight": overlay_weight, "all_return_improved": bool((group["delta_total_return"] > 0.0).all()), "all_calmar_improved": bool((group["delta_calmar"] > 0.0).all()), "all_dd_not_worse": bool((group["delta_max_drawdown"] <= 0.0).all()), "stable_improvement": bool( (group["delta_total_return"] > 0.0).all() and (group["delta_calmar"] > 0.0).all() and (group["delta_max_drawdown"] <= 0.0).all() ), "full_delta_return": full["delta_total_return"], "full_delta_calmar": full["delta_calmar"], "full_delta_dd": full["delta_max_drawdown"], "worst_delta_return": group["delta_total_return"].min(), "worst_delta_calmar": group["delta_calmar"].min(), "worst_delta_dd": group["delta_max_drawdown"].max(), } ) return pd.DataFrame(rows).sort_values( ["stable_improvement", "all_return_improved", "full_delta_calmar", "worst_delta_dd"], ascending=[False, False, False, True], ) def component_summary(crash: pd.Series) -> pd.DataFrame: rows = [] for row in horizon_rows(CRASH_FOLLOW.name, "component", crash): rows.append( { "name": row["name"], "horizon": row["horizon"], "total_return": row["total_return"], "max_drawdown": row["max_drawdown"], "calmar": row["calmar"], } ) return pd.DataFrame(rows) def report_text(command: str, paths: list[Path], crash_component: pd.DataFrame, summary: pd.DataFrame, results: pd.DataFrame) -> str: stable = summary[summary["stable_improvement"]].copy() if len(stable): verdict = "Worth entering the next fusion main search only as a capped small overlay dimension, because at least one representative fusion combination improved return, Calmar, and drawdown across every tested horizon." else: verdict = "Not worth entering the next fusion main search now. No tested representative fusion combination improved return, Calmar, and drawdown across full/3y/1y/6m/3m simultaneously." keep = [ "base_name", "overlay_weight", "horizon", "total_return", "max_drawdown", "calmar", "delta_total_return", "delta_max_drawdown", "delta_calmar", ] return "\n".join( [ "# ETH Crash-Follow Long-Short Fusion Overlay Search", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", f"Overlay proxy: `{CRASH_FOLLOW.name}`.", f"Representative fusion source: `{SELECTED_FUSION}`.", "Method: rebuild each selected fusion candidate from existing component curves, then add `overlay_weight * crash_follow_daily_return` for weights 0.00 through 0.12. This is research only; no live path was changed.", "", "Stable improvement filter: every tested horizon must have `delta_total_return > 0`, `delta_calmar > 0`, and `delta_max_drawdown <= 0` versus the same fusion candidate with overlay weight 0.00.", "", "## Crash-Follow Component", "", markdown_table(crash_component), "", "## Stable Improvement Candidates", "", markdown_table(stable), "", "## Overlay Summary", "", markdown_table(summary.head(30)), "", "## Horizon Results", "", markdown_table(results[keep]), "", "## Conclusion", "", verdict, "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=8.0) parser.add_argument("--selected-fusion", type=Path, default=SELECTED_FUSION) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() args.output_dir.mkdir(parents=True, exist_ok=True) selected = pd.read_csv(args.selected_fusion) component_series = build_required_components(args.years, selected) crash = crash_follow_equity(CRASH_FOLLOW) results = search_rows(selected, component_series, crash) summary = summary_rows(results) crash_component = component_summary(crash) results_path = args.output_dir / f"{PREFIX}.csv" summary_path = args.output_dir / f"{PREFIX}-summary.csv" report_path = args.output_dir / f"{PREFIX}.md" results.to_csv(results_path, index=False) summary.to_csv(summary_path, index=False) report_path.write_text( report_text( f"rtk .venv/bin/python scripts/search_long_short_fusion_crash_follow_overlay.py --years {args.years}", [results_path, summary_path, report_path], crash_component, summary, results, ), encoding="utf-8", ) print(report_path) print(summary.head(20).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())