| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- from __future__ import annotations
- import argparse
- import sys
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts.search_eth_bearish_price_proxy import Spec, joined_frames, load_frame, resample, run_spec
- from scripts import search_short_bias_swing as swing
- from scripts import search_short_overlay_mix as overlay
- from scripts.search_long_short_fusion import (
- INITIAL_EQUITY,
- component_returns,
- daily,
- gated_equity,
- horizon_rows,
- markdown_table,
- metrics,
- riskoff_long_equity,
- swing_short,
- )
- OUTPUT_DIR = Path("reports/long-short-fusion")
- SELECTED_FUSION = OUTPUT_DIR / "fusion-selected.csv"
- PREFIX = "eth-crash-follow-fusion-overlay-search"
- CRASH_FOLLOW = Spec("crash_follow", "1H", 20, 120, 8, 0.035, 0.02, 0.06, 96, "btc_riskoff")
- OVERLAY_WEIGHTS = (0.00, 0.02, 0.04, 0.06, 0.08, 0.10, 0.12)
- COMPONENT_WEIGHT_COLUMNS = {
- "long_rotation": "long_weight",
- "long_rotation_riskoff70": "long_weight",
- "long_rotation_riskoff50": "long_weight",
- "long_rotation_riskoff25": "long_weight",
- "long_rotation_riskoff00": "long_weight",
- "btc_risk_short": "btc_risk_short_weight",
- "eth_4h_vol_short": "eth_4h_vol_short_weight",
- "btc_4h_vol_short": "btc_4h_vol_short_weight",
- "eth_4h_vol_short_gated": "eth_4h_vol_short_gated_weight",
- "btc_4h_vol_short_gated": "btc_4h_vol_short_gated_weight",
- }
- def crash_follow_equity(spec: Spec) -> pd.Series:
- eth = load_frame("ETH-USDT-SWAP")
- btc = load_frame("BTC-USDT-SWAP")
- frame = joined_frames(resample(eth, spec.bar), resample(btc, spec.bar))
- equity, _ = run_spec(spec, frame)
- equity.name = spec.name
- return equity
- def fast_btc_risk_short(years: float) -> tuple[pd.Series, pd.Series]:
- params = overlay.BtcRiskShort(
- family="btc_risk_pair",
- bar="1h",
- btc_trend=1440,
- btc_lookback=336,
- symbol_trend=720,
- vol_lookback=336,
- btc_max_momentum=-0.005,
- btc_min_drop=0.025,
- min_btc_vol=0.012,
- symbol_max_momentum=-0.010,
- short_symbols=("ETH-USDT-SWAP",),
- )
- eth = load_frame("ETH-USDT-SWAP")
- btc = load_frame("BTC-USDT-SWAP")
- full = pd.DataFrame(
- {
- "BTC-USDT-SWAP": resample(btc, "1H")["close"],
- "ETH-USDT-SWAP": resample(eth, "1H")["close"],
- }
- ).dropna()
- cutoff = full.index[-1] - pd.DateOffset(years=years)
- closes = full[full.index >= cutoff]
- weights = overlay.short_weights(closes, params)
- equity = daily(overlay.equity_from_weights(closes, weights))
- risk_state = daily((weights["ETH-USDT-SWAP"].abs() > 0.0).astype(float))
- return equity, risk_state
- def required_component_keys(selected: pd.DataFrame) -> set[str]:
- keys = set(selected["long_variant"].astype(str))
- for key, column in COMPONENT_WEIGHT_COLUMNS.items():
- if key.startswith("long_rotation"):
- continue
- if column in selected.columns and bool((selected[column].astype(float) > 0.0).any()):
- keys.add(key)
- return keys
- def build_required_components(years: float, selected: pd.DataFrame) -> dict[str, pd.Series]:
- required = required_component_keys(selected)
- base_name, base_equity, _ = overlay.rotation_base(years)
- base = daily(base_equity)
- btc_risk_equity, risk_state = fast_btc_risk_short(years)
- components: dict[str, pd.Series] = {}
- riskoff_multipliers = {
- "long_rotation": 1.0,
- "long_rotation_riskoff70": 0.70,
- "long_rotation_riskoff50": 0.50,
- "long_rotation_riskoff25": 0.25,
- "long_rotation_riskoff00": 0.0,
- }
- for key, multiplier in riskoff_multipliers.items():
- if key in required:
- components[key] = base if key == "long_rotation" else riskoff_long_equity(base, risk_state, multiplier)
- if "btc_risk_short" in required:
- components["btc_risk_short"] = btc_risk_equity
- if "eth_4h_vol_short" in required or "eth_4h_vol_short_gated" in required:
- _, eth_equity, _ = swing_short(
- swing.Strategy(
- family="vol_expansion_short",
- symbol="ETH-USDT-SWAP",
- bar="4H",
- params={
- "fast": 20,
- "slow": 80,
- "entry": 20,
- "exit": 10,
- "atr": 14,
- "stop_atr": 3.0,
- "take_atr": 6.0,
- "max_hold": 120,
- "vol_window": 120,
- "vol_quantile": 0.8,
- },
- ),
- years,
- )
- if "eth_4h_vol_short" in required:
- components["eth_4h_vol_short"] = eth_equity
- if "eth_4h_vol_short_gated" in required:
- components["eth_4h_vol_short_gated"] = gated_equity(eth_equity, risk_state)
- if "btc_4h_vol_short" in required or "btc_4h_vol_short_gated" in required:
- _, btc_equity, _ = swing_short(
- swing.Strategy(
- family="vol_expansion_short",
- symbol="BTC-USDT-SWAP",
- bar="4H",
- params={
- "fast": 30,
- "slow": 120,
- "entry": 20,
- "exit": 10,
- "atr": 14,
- "stop_atr": 3.0,
- "take_atr": 6.0,
- "max_hold": 120,
- "vol_window": 120,
- "vol_quantile": 0.8,
- },
- ),
- years,
- )
- if "btc_4h_vol_short" in required:
- components["btc_4h_vol_short"] = btc_equity
- if "btc_4h_vol_short_gated" in required:
- components["btc_4h_vol_short_gated"] = gated_equity(btc_equity, risk_state)
- missing = required - set(components)
- if missing:
- raise ValueError(f"missing required components: {sorted(missing)}; base={base_name}")
- return components
- def weights_from_row(row: pd.Series) -> dict[str, float]:
- weights: dict[str, float] = {}
- long_key = str(row["long_variant"])
- weights[long_key] = float(row["long_weight"])
- for key, column in COMPONENT_WEIGHT_COLUMNS.items():
- if key.startswith("long_rotation"):
- continue
- weight = float(row[column])
- if weight > 0.0:
- weights[key] = weight
- return weights
- def overlay_equity(component_series: dict[str, pd.Series], weights: dict[str, float], crash: pd.Series, overlay_weight: float) -> pd.Series:
- weighted = {
- name: component_returns(series) * weights[name]
- for name, series in component_series.items()
- if weights.get(name, 0.0) > 0.0
- }
- weighted["eth_crash_follow"] = component_returns(crash) * overlay_weight
- returns = pd.DataFrame(weighted).dropna().sum(axis=1)
- equity = INITIAL_EQUITY * (1.0 + returns).cumprod()
- equity.name = "equity"
- return equity
- def horizon_metrics_by_label(series: pd.Series) -> dict[str, dict[str, object]]:
- return {str(row["horizon"]): row for row in horizon_rows("", "", series)}
- def search_rows(selected: pd.DataFrame, component_series: dict[str, pd.Series], crash: pd.Series) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- for _, selected_row in selected.iterrows():
- base_name = str(selected_row["name"])
- weights = weights_from_row(selected_row)
- base_equity = overlay_equity(component_series, weights, crash, 0.0)
- base_horizons = horizon_metrics_by_label(base_equity)
- for overlay_weight in OVERLAY_WEIGHTS:
- equity = overlay_equity(component_series, weights, crash, overlay_weight)
- for horizon, row in horizon_metrics_by_label(equity).items():
- base = base_horizons[horizon]
- rows.append(
- {
- "base_name": base_name,
- "overlay_weight": overlay_weight,
- "horizon": horizon,
- "start": row["start"],
- "end": row["end"],
- "total_return": row["total_return"],
- "annualized_return": row["annualized_return"],
- "max_drawdown": row["max_drawdown"],
- "calmar": row["calmar"],
- "baseline_total_return": base["total_return"],
- "baseline_max_drawdown": base["max_drawdown"],
- "baseline_calmar": base["calmar"],
- "delta_total_return": float(row["total_return"]) - float(base["total_return"]),
- "delta_max_drawdown": float(row["max_drawdown"]) - float(base["max_drawdown"]),
- "delta_calmar": float(row["calmar"]) - float(base["calmar"]),
- }
- )
- return pd.DataFrame(rows)
- def summary_rows(results: pd.DataFrame) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- grouped = results[results["overlay_weight"] > 0.0].groupby(["base_name", "overlay_weight"])
- for (base_name, overlay_weight), group in grouped:
- full = group[group["horizon"] == "full"].iloc[0]
- rows.append(
- {
- "base_name": base_name,
- "overlay_weight": overlay_weight,
- "all_return_improved": bool((group["delta_total_return"] > 0.0).all()),
- "all_calmar_improved": bool((group["delta_calmar"] > 0.0).all()),
- "all_dd_not_worse": bool((group["delta_max_drawdown"] <= 0.0).all()),
- "stable_improvement": bool(
- (group["delta_total_return"] > 0.0).all()
- and (group["delta_calmar"] > 0.0).all()
- and (group["delta_max_drawdown"] <= 0.0).all()
- ),
- "full_delta_return": full["delta_total_return"],
- "full_delta_calmar": full["delta_calmar"],
- "full_delta_dd": full["delta_max_drawdown"],
- "worst_delta_return": group["delta_total_return"].min(),
- "worst_delta_calmar": group["delta_calmar"].min(),
- "worst_delta_dd": group["delta_max_drawdown"].max(),
- }
- )
- return pd.DataFrame(rows).sort_values(
- ["stable_improvement", "all_return_improved", "full_delta_calmar", "worst_delta_dd"],
- ascending=[False, False, False, True],
- )
- def component_summary(crash: pd.Series) -> pd.DataFrame:
- rows = []
- for row in horizon_rows(CRASH_FOLLOW.name, "component", crash):
- rows.append(
- {
- "name": row["name"],
- "horizon": row["horizon"],
- "total_return": row["total_return"],
- "max_drawdown": row["max_drawdown"],
- "calmar": row["calmar"],
- }
- )
- return pd.DataFrame(rows)
- def report_text(command: str, paths: list[Path], crash_component: pd.DataFrame, summary: pd.DataFrame, results: pd.DataFrame) -> str:
- stable = summary[summary["stable_improvement"]].copy()
- if len(stable):
- verdict = "Worth entering the next fusion main search only as a capped small overlay dimension, because at least one representative fusion combination improved return, Calmar, and drawdown across every tested horizon."
- else:
- verdict = "Not worth entering the next fusion main search now. No tested representative fusion combination improved return, Calmar, and drawdown across full/3y/1y/6m/3m simultaneously."
- keep = [
- "base_name",
- "overlay_weight",
- "horizon",
- "total_return",
- "max_drawdown",
- "calmar",
- "delta_total_return",
- "delta_max_drawdown",
- "delta_calmar",
- ]
- return "\n".join(
- [
- "# ETH Crash-Follow Long-Short Fusion Overlay Search",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- f"Overlay proxy: `{CRASH_FOLLOW.name}`.",
- f"Representative fusion source: `{SELECTED_FUSION}`.",
- "Method: rebuild each selected fusion candidate from existing component curves, then add `overlay_weight * crash_follow_daily_return` for weights 0.00 through 0.12. This is research only; no live path was changed.",
- "",
- "Stable improvement filter: every tested horizon must have `delta_total_return > 0`, `delta_calmar > 0`, and `delta_max_drawdown <= 0` versus the same fusion candidate with overlay weight 0.00.",
- "",
- "## Crash-Follow Component",
- "",
- markdown_table(crash_component),
- "",
- "## Stable Improvement Candidates",
- "",
- markdown_table(stable),
- "",
- "## Overlay Summary",
- "",
- markdown_table(summary.head(30)),
- "",
- "## Horizon Results",
- "",
- markdown_table(results[keep]),
- "",
- "## Conclusion",
- "",
- verdict,
- "",
- ]
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=8.0)
- parser.add_argument("--selected-fusion", type=Path, default=SELECTED_FUSION)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- args.output_dir.mkdir(parents=True, exist_ok=True)
- selected = pd.read_csv(args.selected_fusion)
- component_series = build_required_components(args.years, selected)
- crash = crash_follow_equity(CRASH_FOLLOW)
- results = search_rows(selected, component_series, crash)
- summary = summary_rows(results)
- crash_component = component_summary(crash)
- results_path = args.output_dir / f"{PREFIX}.csv"
- summary_path = args.output_dir / f"{PREFIX}-summary.csv"
- report_path = args.output_dir / f"{PREFIX}.md"
- results.to_csv(results_path, index=False)
- summary.to_csv(summary_path, index=False)
- report_path.write_text(
- report_text(
- f"rtk .venv/bin/python scripts/search_long_short_fusion_crash_follow_overlay.py --years {args.years}",
- [results_path, summary_path, report_path],
- crash_component,
- summary,
- results,
- ),
- encoding="utf-8",
- )
- print(report_path)
- print(summary.head(20).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|