from __future__ import annotations import argparse import sys from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts.search_live_bb_squeeze_exit_variants import ( Variant as LiveVariant, _load_candles as load_live_candles, cost_equity_frame as live_cost_equity_frame, run_variant as run_live_variant, ) OUTPUT_DIR = Path("reports/eth-exploration") OUT_PREFIX = "eth-bidir-fusion-candidates" LIVE_NAME = "live_bb_squeeze_mxbuf0.0005" LIVE_VARIANT = LiveVariant(0.0005, 1) HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) def metrics(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 31_536_000 total = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = float(((series.cummax() - series) / series.cummax()).max()) return { "total_return": total, "annualized_return": annualized, "max_drawdown": drawdown, "calmar": annualized / drawdown if drawdown else 0.0, } def horizon_metrics(series: pd.Series) -> dict[str, float]: out: dict[str, float] = {} end = series.index[-1] for label, offset in HORIZONS: part = series if offset is None else series[series.index >= end - offset] if len(part) < 2: part = series values = metrics(part) for key, value in values.items(): out[f"{label}_{key}"] = value return out def recent_return(series: pd.Series, days: int) -> float: cutoff = series.index[-1] - pd.Timedelta(days=days) part = series[series.index >= cutoff] if len(part) < 2: return 0.0 return float(part.iloc[-1] / part.iloc[0] - 1.0) def trade_count(trades: list[dict[str, object]], days: int, end: pd.Timestamp) -> int: cutoff = end - pd.Timedelta(days=days) return sum(pd.Timestamp(trade["entry_time"], tz="UTC") >= cutoff for trade in trades) def live_row() -> dict[str, object]: candles = load_live_candles("ETH-USDT-SWAP", "15m") result = run_live_variant(candles, LIVE_VARIANT) frame = live_cost_equity_frame(result, 0.0021).set_index("ts")["equity"].sort_index() end = pd.to_datetime(candles[-1].ts, unit="ms", utc=True) row: dict[str, object] = { "source": "current_live", "name": LIVE_NAME, "kind": "live_bb_squeeze", "direction": "bidir", "full_trades": len(result.trades), "trades_30d": trade_count(result.trades, 30, end), "trades_14d": trade_count(result.trades, 14, end), "return_30d": recent_return(frame, 30), "return_14d": recent_return(frame, 14), "last_time": end.strftime("%Y-%m-%d %H:%M"), } row.update(horizon_metrics(frame)) return row def fusion_rows() -> list[dict[str, object]]: path = Path("reports/long-short-fusion/fusion-total.csv") if not path.exists(): return [] frame = pd.read_csv(path) frame = frame.sort_values( ["h1y_return", "h6m_return", "h3m_return", "max_drawdown"], ascending=[False, False, False, True], ).head(200) rows: list[dict[str, object]] = [] for _, source in frame.iterrows(): row = { "source": "long_short_fusion", "name": source["name"], "kind": "fusion", "direction": "long+short", "full_total_return": float(source["total_return"]), "full_annualized_return": float(source["annualized_return"]), "full_max_drawdown": float(source["max_drawdown"]), "full_calmar": float(source["calmar"]), "3y_total_return": float(source["h3y_return"]), "1y_total_return": float(source["h1y_return"]), "6m_total_return": float(source["h6m_return"]), "3m_total_return": float(source["h3m_return"]), "full_trades": int(source["trades"]), "trades_30d": 0, "trades_14d": 0, "return_30d": 0.0, "return_14d": 0.0, "last_time": "", "long_weight": float(source["long_weight"]), "short_exposure": float(source["short_exposure"]), "recent_trigger_source": "not_available_in_existing_fusion_outputs", } rows.append(row) return rows def report_candidate_rows(path: Path, source: str) -> list[dict[str, object]]: if not path.exists(): return [] frame = pd.read_csv(path) rows: list[dict[str, object]] = [] for _, source_row in frame.iterrows(): row = { "source": source, "name": source_row["name"], "kind": source_row["family"], "direction": "bidir" if "bidir" in str(source_row["family"]) else "short", "last_time": source_row["last_time"], "return_30d": float(source_row.get("1m_total_return", 0.0)), "return_14d": float(source_row.get("2w_total_return", 0.0)), "trades_30d": int(source_row.get("1m_trades", 0)), "trades_14d": int(source_row.get("2w_trades", 0)), } for label in ("full", "3y", "1y", "3m"): for key in ("total_return", "annualized_return", "max_drawdown", "calmar"): row[f"{label}_{key}"] = float(source_row.get(f"{label}_{key}", 0.0)) row["6m_total_return"] = 0.0 row["6m_annualized_return"] = 0.0 row["6m_max_drawdown"] = 0.0 row["6m_calmar"] = 0.0 row["full_trades"] = int(source_row.get("full_trades", 0)) row["recent_trigger_source"] = "1m_2w_report_fields" rows.append(row) return rows def normalize_rows(rows: list[dict[str, object]]) -> pd.DataFrame: frame = pd.DataFrame(rows) for column in ( "full_total_return", "3y_total_return", "1y_total_return", "6m_total_return", "3m_total_return", "return_30d", "return_14d", "full_max_drawdown", ): if column not in frame: frame[column] = 0.0 frame["all_horizons_nonnegative"] = ( (frame["full_total_return"] >= 0.0) & (frame["3y_total_return"] >= 0.0) & (frame["1y_total_return"] >= 0.0) & (frame["6m_total_return"] >= 0.0) & (frame["3m_total_return"] >= 0.0) ) frame["recent_active"] = (frame["trades_30d"] >= 4) & (frame["trades_14d"] >= 2) frame["passes_rule"] = frame["all_horizons_nonnegative"] & frame["recent_active"] frame["candidate"] = (frame["source"] != "current_live") & frame["passes_rule"] frame["score"] = ( frame["1y_total_return"] + 0.5 * frame["6m_total_return"] + 0.25 * frame["3m_total_return"] + frame["trades_30d"].clip(upper=20) * 0.005 - frame["full_max_drawdown"] ) return frame.sort_values(["candidate", "score"], ascending=[False, False]) def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.4f}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def report_text(command: str, paths: list[Path], frame: pd.DataFrame) -> str: cols = [ "source", "name", "kind", "direction", "full_total_return", "3y_total_return", "1y_total_return", "6m_total_return", "3m_total_return", "return_30d", "return_14d", "trades_30d", "trades_14d", "full_max_drawdown", "passes_rule", ] live = frame[frame["source"] == "current_live"][cols] selected = frame[frame["candidate"]][cols].head(20) fusion_near = frame[(frame["source"] == "long_short_fusion") & frame["all_horizons_nonnegative"]][cols].head(12) recent_fail = frame[ (frame["source"] != "current_live") & frame["recent_active"] & ~frame["all_horizons_nonnegative"] ][cols].sort_values(["trades_30d", "trades_14d"], ascending=[False, False]).head(12) return "\n".join( [ "# ETH Bidirectional Fusion Candidate Exploration", "", f"Run command: `{command}`", "", "Scope: offline research only. The script reads local candle cache plus existing CSV reports and does not touch live executor, deployment, credentials, or order submission.", "", "Selection rule: full/3y/1y/6m/3m total returns all nonnegative, plus at least 4 trades in 30d and 2 trades in 14d.", "", "Output files:", *[f"- `{path}`" for path in paths], "", "## Current Live BB Squeeze", "", markdown_table(live), "", "## Replacement Candidates Passing Rule", "", markdown_table(selected) if len(selected) else "No candidate passed the full rule.", "", "## Long-Horizon Fusion Near Misses", "", markdown_table(fusion_near), "", "## Recent-Active Horizon Failures", "", markdown_table(recent_fail), "", "## Conclusion", "", "The long/short fusion family is structurally stronger than recent high-frequency short/bidirectional searches on the long horizons, but the existing fusion outputs do not include 30d/14d trigger evidence. The high-frequency candidates have enough recent triggers but fail the long-horizon quality requirement. Current live BB squeeze remains the only row that passes both horizon and recent-trigger filters in this offline comparison.", "", "Recommendation: keep any fusion candidate in read-only observation. Do not replace the live BB squeeze from this run alone.", "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() rows = [live_row()] rows.extend(fusion_rows()) rows.extend(report_candidate_rows(Path("reports/eth-exploration/eth-filtered-recent-short-bidir-candidates.csv"), "filtered_recent_short_bidir")) rows.extend(report_candidate_rows(Path("reports/ultrashort/eth-highfreq-short-bidir-candidates.csv"), "highfreq_short_bidir")) frame = normalize_rows(rows) args.output_dir.mkdir(parents=True, exist_ok=True) all_path = args.output_dir / f"{OUT_PREFIX}.csv" selected_path = args.output_dir / f"{OUT_PREFIX}-selected.csv" report_path = args.output_dir / f"{OUT_PREFIX}-report.md" frame.to_csv(all_path, index=False) frame[frame["candidate"]].to_csv(selected_path, index=False) paths = [all_path, selected_path, report_path] command = "rtk .venv/bin/python scripts/explore_eth_bidir_fusion_candidates.py" report_path.write_text(report_text(command, paths, frame), encoding="utf-8") print(report_path) print(frame.head(20).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())