| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- from __future__ import annotations
- import argparse
- import sys
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts.search_live_bb_squeeze_exit_variants import (
- Variant as LiveVariant,
- _load_candles as load_live_candles,
- cost_equity_frame as live_cost_equity_frame,
- run_variant as run_live_variant,
- )
- OUTPUT_DIR = Path("reports/eth-exploration")
- OUT_PREFIX = "eth-bidir-fusion-candidates"
- LIVE_NAME = "live_bb_squeeze_mxbuf0.0005"
- LIVE_VARIANT = LiveVariant(0.0005, 1)
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- def metrics(series: pd.Series) -> dict[str, float]:
- years = (series.index[-1] - series.index[0]).total_seconds() / 31_536_000
- total = float(series.iloc[-1] / series.iloc[0] - 1.0)
- annualized = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0
- drawdown = float(((series.cummax() - series) / series.cummax()).max())
- return {
- "total_return": total,
- "annualized_return": annualized,
- "max_drawdown": drawdown,
- "calmar": annualized / drawdown if drawdown else 0.0,
- }
- def horizon_metrics(series: pd.Series) -> dict[str, float]:
- out: dict[str, float] = {}
- end = series.index[-1]
- for label, offset in HORIZONS:
- part = series if offset is None else series[series.index >= end - offset]
- if len(part) < 2:
- part = series
- values = metrics(part)
- for key, value in values.items():
- out[f"{label}_{key}"] = value
- return out
- def recent_return(series: pd.Series, days: int) -> float:
- cutoff = series.index[-1] - pd.Timedelta(days=days)
- part = series[series.index >= cutoff]
- if len(part) < 2:
- return 0.0
- return float(part.iloc[-1] / part.iloc[0] - 1.0)
- def trade_count(trades: list[dict[str, object]], days: int, end: pd.Timestamp) -> int:
- cutoff = end - pd.Timedelta(days=days)
- return sum(pd.Timestamp(trade["entry_time"], tz="UTC") >= cutoff for trade in trades)
- def live_row() -> dict[str, object]:
- candles = load_live_candles("ETH-USDT-SWAP", "15m")
- result = run_live_variant(candles, LIVE_VARIANT)
- frame = live_cost_equity_frame(result, 0.0021).set_index("ts")["equity"].sort_index()
- end = pd.to_datetime(candles[-1].ts, unit="ms", utc=True)
- row: dict[str, object] = {
- "source": "current_live",
- "name": LIVE_NAME,
- "kind": "live_bb_squeeze",
- "direction": "bidir",
- "full_trades": len(result.trades),
- "trades_30d": trade_count(result.trades, 30, end),
- "trades_14d": trade_count(result.trades, 14, end),
- "return_30d": recent_return(frame, 30),
- "return_14d": recent_return(frame, 14),
- "last_time": end.strftime("%Y-%m-%d %H:%M"),
- }
- row.update(horizon_metrics(frame))
- return row
- def fusion_rows() -> list[dict[str, object]]:
- path = Path("reports/long-short-fusion/fusion-total.csv")
- if not path.exists():
- return []
- frame = pd.read_csv(path)
- frame = frame.sort_values(
- ["h1y_return", "h6m_return", "h3m_return", "max_drawdown"],
- ascending=[False, False, False, True],
- ).head(200)
- rows: list[dict[str, object]] = []
- for _, source in frame.iterrows():
- row = {
- "source": "long_short_fusion",
- "name": source["name"],
- "kind": "fusion",
- "direction": "long+short",
- "full_total_return": float(source["total_return"]),
- "full_annualized_return": float(source["annualized_return"]),
- "full_max_drawdown": float(source["max_drawdown"]),
- "full_calmar": float(source["calmar"]),
- "3y_total_return": float(source["h3y_return"]),
- "1y_total_return": float(source["h1y_return"]),
- "6m_total_return": float(source["h6m_return"]),
- "3m_total_return": float(source["h3m_return"]),
- "full_trades": int(source["trades"]),
- "trades_30d": 0,
- "trades_14d": 0,
- "return_30d": 0.0,
- "return_14d": 0.0,
- "last_time": "",
- "long_weight": float(source["long_weight"]),
- "short_exposure": float(source["short_exposure"]),
- "recent_trigger_source": "not_available_in_existing_fusion_outputs",
- }
- rows.append(row)
- return rows
- def report_candidate_rows(path: Path, source: str) -> list[dict[str, object]]:
- if not path.exists():
- return []
- frame = pd.read_csv(path)
- rows: list[dict[str, object]] = []
- for _, source_row in frame.iterrows():
- row = {
- "source": source,
- "name": source_row["name"],
- "kind": source_row["family"],
- "direction": "bidir" if "bidir" in str(source_row["family"]) else "short",
- "last_time": source_row["last_time"],
- "return_30d": float(source_row.get("1m_total_return", 0.0)),
- "return_14d": float(source_row.get("2w_total_return", 0.0)),
- "trades_30d": int(source_row.get("1m_trades", 0)),
- "trades_14d": int(source_row.get("2w_trades", 0)),
- }
- for label in ("full", "3y", "1y", "3m"):
- for key in ("total_return", "annualized_return", "max_drawdown", "calmar"):
- row[f"{label}_{key}"] = float(source_row.get(f"{label}_{key}", 0.0))
- row["6m_total_return"] = 0.0
- row["6m_annualized_return"] = 0.0
- row["6m_max_drawdown"] = 0.0
- row["6m_calmar"] = 0.0
- row["full_trades"] = int(source_row.get("full_trades", 0))
- row["recent_trigger_source"] = "1m_2w_report_fields"
- rows.append(row)
- return rows
- def normalize_rows(rows: list[dict[str, object]]) -> pd.DataFrame:
- frame = pd.DataFrame(rows)
- for column in (
- "full_total_return",
- "3y_total_return",
- "1y_total_return",
- "6m_total_return",
- "3m_total_return",
- "return_30d",
- "return_14d",
- "full_max_drawdown",
- ):
- if column not in frame:
- frame[column] = 0.0
- frame["all_horizons_nonnegative"] = (
- (frame["full_total_return"] >= 0.0)
- & (frame["3y_total_return"] >= 0.0)
- & (frame["1y_total_return"] >= 0.0)
- & (frame["6m_total_return"] >= 0.0)
- & (frame["3m_total_return"] >= 0.0)
- )
- frame["recent_active"] = (frame["trades_30d"] >= 4) & (frame["trades_14d"] >= 2)
- frame["passes_rule"] = frame["all_horizons_nonnegative"] & frame["recent_active"]
- frame["candidate"] = (frame["source"] != "current_live") & frame["passes_rule"]
- frame["score"] = (
- frame["1y_total_return"]
- + 0.5 * frame["6m_total_return"]
- + 0.25 * frame["3m_total_return"]
- + frame["trades_30d"].clip(upper=20) * 0.005
- - frame["full_max_drawdown"]
- )
- return frame.sort_values(["candidate", "score"], ascending=[False, False])
- def markdown_table(frame: pd.DataFrame) -> str:
- def cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.4f}"
- return str(value).replace("|", "\\|")
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows)
- def report_text(command: str, paths: list[Path], frame: pd.DataFrame) -> str:
- cols = [
- "source",
- "name",
- "kind",
- "direction",
- "full_total_return",
- "3y_total_return",
- "1y_total_return",
- "6m_total_return",
- "3m_total_return",
- "return_30d",
- "return_14d",
- "trades_30d",
- "trades_14d",
- "full_max_drawdown",
- "passes_rule",
- ]
- live = frame[frame["source"] == "current_live"][cols]
- selected = frame[frame["candidate"]][cols].head(20)
- fusion_near = frame[(frame["source"] == "long_short_fusion") & frame["all_horizons_nonnegative"]][cols].head(12)
- recent_fail = frame[
- (frame["source"] != "current_live")
- & frame["recent_active"]
- & ~frame["all_horizons_nonnegative"]
- ][cols].sort_values(["trades_30d", "trades_14d"], ascending=[False, False]).head(12)
- return "\n".join(
- [
- "# ETH Bidirectional Fusion Candidate Exploration",
- "",
- f"Run command: `{command}`",
- "",
- "Scope: offline research only. The script reads local candle cache plus existing CSV reports and does not touch live executor, deployment, credentials, or order submission.",
- "",
- "Selection rule: full/3y/1y/6m/3m total returns all nonnegative, plus at least 4 trades in 30d and 2 trades in 14d.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "## Current Live BB Squeeze",
- "",
- markdown_table(live),
- "",
- "## Replacement Candidates Passing Rule",
- "",
- markdown_table(selected) if len(selected) else "No candidate passed the full rule.",
- "",
- "## Long-Horizon Fusion Near Misses",
- "",
- markdown_table(fusion_near),
- "",
- "## Recent-Active Horizon Failures",
- "",
- markdown_table(recent_fail),
- "",
- "## Conclusion",
- "",
- "The long/short fusion family is structurally stronger than recent high-frequency short/bidirectional searches on the long horizons, but the existing fusion outputs do not include 30d/14d trigger evidence. The high-frequency candidates have enough recent triggers but fail the long-horizon quality requirement. Current live BB squeeze remains the only row that passes both horizon and recent-trigger filters in this offline comparison.",
- "",
- "Recommendation: keep any fusion candidate in read-only observation. Do not replace the live BB squeeze from this run alone.",
- "",
- ]
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- rows = [live_row()]
- rows.extend(fusion_rows())
- rows.extend(report_candidate_rows(Path("reports/eth-exploration/eth-filtered-recent-short-bidir-candidates.csv"), "filtered_recent_short_bidir"))
- rows.extend(report_candidate_rows(Path("reports/ultrashort/eth-highfreq-short-bidir-candidates.csv"), "highfreq_short_bidir"))
- frame = normalize_rows(rows)
- args.output_dir.mkdir(parents=True, exist_ok=True)
- all_path = args.output_dir / f"{OUT_PREFIX}.csv"
- selected_path = args.output_dir / f"{OUT_PREFIX}-selected.csv"
- report_path = args.output_dir / f"{OUT_PREFIX}-report.md"
- frame.to_csv(all_path, index=False)
- frame[frame["candidate"]].to_csv(selected_path, index=False)
- paths = [all_path, selected_path, report_path]
- command = "rtk .venv/bin/python scripts/explore_eth_bidir_fusion_candidates.py"
- report_path.write_text(report_text(command, paths, frame), encoding="utf-8")
- print(report_path)
- print(frame.head(20).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|