| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts import search_eth_nextgen_micro_portfolio as base
- OUTPUT_DIR = Path("reports/eth-exploration")
- PREFIX = "eth-nextgen-micro-direction-b"
- PRIMARY_COST = "maker_taker"
- COST_MODELS = {
- "maker_taker": 0.0021,
- "taker_taker": 0.0030,
- }
- MICRO_NAMES = (
- "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.25-us",
- "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.4-us",
- "atr-compress-expand-r48-q0.15-sl0.008-tp0.016-mf0.25-us",
- "atr-compress-expand-r48-q0.15-sl0.008-tp0.016-mf0.4-us",
- )
- LOOKBACKS = (30, 60, 90)
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- ("30d", pd.DateOffset(days=30)),
- ("21d", pd.DateOffset(days=21)),
- ("14d", pd.DateOffset(days=14)),
- )
- def short_micro_name(name: str) -> str:
- return base.short_micro_name(name)
- def metrics_from_daily(series: pd.Series) -> dict[str, float]:
- return base.metrics_from_daily(series)
- def trade_return_stats(trades: list[base.LegReturn]) -> dict[str, float]:
- values = [trade.value for trade in trades]
- wins = [value for value in values if value > 0.0]
- losses = [value for value in values if value < 0.0]
- gross_profit = sum(wins)
- gross_loss_abs = abs(sum(losses))
- avg_win = gross_profit / len(wins) if wins else 0.0
- avg_loss_abs = gross_loss_abs / len(losses) if losses else 0.0
- return {
- "trades": len(values),
- "win_rate": len(wins) / len(values) if values else 0.0,
- "avg_return": sum(values) / len(values) if values else 0.0,
- "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0,
- "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0,
- }
- def horizon_rows(name: str, cost_model: str, series: pd.Series, trades: list[base.LegReturn]) -> list[dict[str, object]]:
- end = series.index[-1]
- rows: list[dict[str, object]] = []
- for label, offset in HORIZONS:
- horizon = series if offset is None else series[series.index >= end - offset]
- if len(horizon) < 2:
- horizon = series
- start = horizon.index[0]
- selected = [trade for trade in trades if trade.exit_date >= start]
- years = max((horizon.index[-1] - start).total_seconds() / 86_400 / 365, 1 / 365)
- rows.append(
- {
- "name": name,
- "cost_model": cost_model,
- "horizon": label,
- "horizon_start": start.strftime("%Y-%m-%d"),
- "horizon_end": horizon.index[-1].strftime("%Y-%m-%d"),
- "trades_per_year": len(selected) / years,
- **trade_return_stats(selected),
- **metrics_from_daily(horizon),
- }
- )
- return rows
- def evaluate(
- *,
- name: str,
- cost_model: str,
- kind: str,
- micro_name: str,
- lookback_days: int,
- micro_weight: float,
- series: pd.Series,
- trades: list[base.LegReturn],
- ) -> tuple[dict[str, object], list[dict[str, object]]]:
- horizons = horizon_rows(name, cost_model, series, trades)
- full = next(row for row in horizons if row["horizon"] == "full")
- recent = [row for row in horizons if row["horizon"] != "full"]
- return (
- {
- "name": name,
- "cost_model": cost_model,
- "kind": kind,
- "micro_name": micro_name,
- "lookback_days": lookback_days,
- "micro_weight": micro_weight,
- "min_recent_total_return": min(float(row["net_total_return"]) for row in recent),
- **{key: full[key] for key in (
- "trades",
- "trades_per_year",
- "win_rate",
- "avg_return",
- "payoff_ratio",
- "profit_factor",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- )},
- },
- horizons,
- )
- def build_direction_b(
- *,
- cost_model: str,
- nextgen_series: pd.Series,
- nextgen_trades: list[base.LegReturn],
- micro_candidates: dict[str, tuple[pd.Series, list[base.LegReturn]]],
- ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
- nextgen_returns = nextgen_series.pct_change().fillna(0.0)
- rows: list[dict[str, object]] = []
- horizon_rows_out: list[dict[str, object]] = []
- equity_rows: list[pd.DataFrame] = []
- baseline_row, baseline_horizons = evaluate(
- name=base.NEXTGEN_BASELINE,
- cost_model=cost_model,
- kind="baseline",
- micro_name="",
- lookback_days=0,
- micro_weight=0.0,
- series=nextgen_series,
- trades=nextgen_trades,
- )
- rows.append(baseline_row)
- horizon_rows_out.extend(baseline_horizons)
- equity_rows.append(base.equity_frame(base.NEXTGEN_BASELINE, cost_model, nextgen_series))
- for micro_name, (micro_series, micro_trades) in micro_candidates.items():
- if micro_name not in MICRO_NAMES:
- continue
- micro_returns = micro_series.pct_change().fillna(0.0)
- for lookback in LOOKBACKS:
- nextgen_regime = nextgen_series / nextgen_series.shift(lookback) - 1.0
- micro_regime = micro_series / micro_series.shift(lookback) - 1.0
- active = ((nextgen_regime < 0.0) & (micro_regime > 0.0)).shift(1).fillna(False).astype(bool)
- switch_name = f"switch-l{lookback}-{short_micro_name(micro_name)}"
- switch_series = base.returns_to_equity(switch_name, nextgen_returns.where(~active, micro_returns))
- switch_trades = base.combine_trade_returns(
- nextgen_returns=[row for row in nextgen_trades if not bool(active.reindex([row.exit_date]).fillna(False).iloc[0])],
- micro_returns=micro_trades,
- nextgen_weight=1.0,
- micro_weight=1.0,
- micro_mask=active,
- )
- row, horizons = evaluate(
- name=switch_name,
- cost_model=cost_model,
- kind="recent_regime_switch",
- micro_name=micro_name,
- lookback_days=lookback,
- micro_weight=1.0,
- series=switch_series,
- trades=switch_trades,
- )
- rows.append(row)
- horizon_rows_out.extend(horizons)
- equity_rows.append(base.equity_frame(switch_name, cost_model, switch_series))
- for micro_weight in (0.25, 0.40):
- overlay_name = f"riskoff-overlay-l{lookback}-m{micro_weight:.2f}-{short_micro_name(micro_name)}"
- overlay_series = base.returns_to_equity(overlay_name, nextgen_returns + micro_returns.where(active, 0.0) * micro_weight)
- overlay_trades = base.combine_trade_returns(
- nextgen_returns=nextgen_trades,
- micro_returns=micro_trades,
- nextgen_weight=1.0,
- micro_weight=micro_weight,
- micro_mask=active,
- )
- row, horizons = evaluate(
- name=overlay_name,
- cost_model=cost_model,
- kind="riskoff_overlay",
- micro_name=micro_name,
- lookback_days=lookback,
- micro_weight=micro_weight,
- series=overlay_series,
- trades=overlay_trades,
- )
- rows.append(row)
- horizon_rows_out.extend(horizons)
- equity_rows.append(base.equity_frame(overlay_name, cost_model, overlay_series))
- summary = pd.DataFrame(rows).sort_values(
- ["cost_model", "kind", "net_calmar", "min_recent_total_return", "net_total_return"],
- ascending=[True, True, False, False, False],
- )
- horizons = pd.DataFrame(horizon_rows_out)
- horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True)
- horizons = horizons.sort_values(["cost_model", "name", "horizon"])
- equity = pd.concat(equity_rows, ignore_index=True)
- return summary, horizons, equity
- def markdown_table(frame: pd.DataFrame) -> str:
- return base.markdown_table(frame)
- def pct_columns(frame: pd.DataFrame, columns: tuple[str, ...]) -> pd.DataFrame:
- output = frame.copy()
- for column in columns:
- output[column] = output[column].map(lambda value: f"{float(value) * 100:.2f}%")
- return output
- def write_report(summary: pd.DataFrame, horizons: pd.DataFrame, output_files: list[Path], command: str) -> str:
- primary = summary[(summary["cost_model"] == PRIMARY_COST) & (summary["kind"] != "baseline")].copy()
- stress = summary[(summary["cost_model"] == "taker_taker") & (summary["kind"] != "baseline")].copy()
- top_primary = primary.sort_values(["kind", "net_calmar", "min_recent_total_return"], ascending=[True, False, False]).head(12)
- top_switch = primary[primary["kind"] == "recent_regime_switch"].sort_values(["net_calmar", "min_recent_total_return"], ascending=[False, False]).head(5)
- top_overlay = primary[primary["kind"] == "riskoff_overlay"].sort_values(["net_calmar", "min_recent_total_return"], ascending=[False, False]).head(8)
- taker_positive = stress[stress["net_total_return"] > 0.0].sort_values(["kind", "net_calmar"], ascending=[True, False]).head(10)
- best = top_switch.iloc[0]
- best_horizons = horizons[(horizons["cost_model"] == PRIMARY_COST) & (horizons["name"] == best["name"])].copy()
- frequency_ok = bool((best_horizons[best_horizons["horizon"].isin(["30d", "21d", "14d"])]["trades"] >= 2).all())
- short_window_ok = bool((best_horizons[best_horizons["horizon"].isin(["30d", "21d", "14d"])]["net_total_return"] > 0.0).all())
- stress_match = stress[stress["name"] == best["name"]]
- taker_ok = bool(len(stress_match) and float(stress_match.iloc[0]["net_total_return"]) > 0.0)
- recommendation = (
- "建议作为当前 BB squeeze 的并行只读观察策略;不建议直接列为近期实盘候选。"
- if frequency_ok and taker_ok and not short_window_ok
- else "建议只读观察,不建议进入实盘候选。"
- )
- lines = [
- "# ETH nextgen + microstructure direction B",
- "",
- f"Run command: `{command}`",
- "",
- "Scope: lookback 30/60/90, selected ATR compression/expansion US-session micro legs, recent_regime_switch first and small riskoff_overlay checks second.",
- "Read-only local backtest only; no secrets, private exchange API, order path, executor, or remote service was touched.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in output_files],
- "",
- "## Verdict",
- "",
- recommendation,
- f"Best switch candidate: `{best['name']}`. Recent 30d/21d/14d trade counts are {best_horizons[best_horizons['horizon'].isin(['30d', '21d', '14d'])]['trades'].astype(int).tolist()}; taker/taker full-sample positive: `{taker_ok}`; all short windows positive: `{short_window_ok}`.",
- "",
- "## Top maker_taker candidates",
- "",
- markdown_table(
- pct_columns(
- top_primary[
- [
- "name",
- "kind",
- "lookback_days",
- "micro_weight",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "trades",
- "trades_per_year",
- "win_rate",
- "avg_return",
- "payoff_ratio",
- "profit_factor",
- "min_recent_total_return",
- ]
- ],
- ("net_total_return", "net_annualized_return", "net_max_drawdown", "win_rate", "avg_return", "min_recent_total_return"),
- )
- ),
- "",
- "## Riskoff overlay checks",
- "",
- markdown_table(
- pct_columns(
- top_overlay[
- [
- "name",
- "lookback_days",
- "micro_weight",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "trades",
- "trades_per_year",
- "win_rate",
- "avg_return",
- "payoff_ratio",
- "profit_factor",
- "min_recent_total_return",
- ]
- ],
- ("net_total_return", "net_annualized_return", "net_max_drawdown", "win_rate", "avg_return", "min_recent_total_return"),
- )
- ),
- "",
- "## Taker/taker positive candidates",
- "",
- markdown_table(
- pct_columns(
- taker_positive[
- [
- "name",
- "kind",
- "lookback_days",
- "micro_weight",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "trades",
- "trades_per_year",
- ]
- ],
- ("net_total_return", "net_annualized_return", "net_max_drawdown"),
- )
- ),
- "",
- "## Best switch windows",
- "",
- markdown_table(
- pct_columns(
- best_horizons[
- [
- "horizon",
- "horizon_start",
- "horizon_end",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "trades",
- "trades_per_year",
- "win_rate",
- "avg_return",
- "payoff_ratio",
- "profit_factor",
- ]
- ],
- ("net_total_return", "net_annualized_return", "net_max_drawdown", "win_rate", "avg_return"),
- )
- ),
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- existing_equity = pd.read_csv(args.output_dir / "eth-btc-nextgen-equity.csv")
- base_equity = existing_equity[(existing_equity["cost_model"] == PRIMARY_COST) & (existing_equity["name"] == base.NEXTGEN_BASELINE)]
- if base_equity.empty:
- raise KeyError(f"missing existing nextgen equity for {base.NEXTGEN_BASELINE}")
- index = pd.DatetimeIndex(pd.to_datetime(base_equity["date"], utc=True))
- summary_frames: list[pd.DataFrame] = []
- horizon_frames: list[pd.DataFrame] = []
- equity_frames: list[pd.DataFrame] = []
- for cost_model, roundtrip_cost in COST_MODELS.items():
- nextgen_series, nextgen_trades = base.load_nextgen(index, roundtrip_cost)
- micro_candidates = base.load_micro_candidates(index, roundtrip_cost)
- summary, horizons, equity = build_direction_b(
- cost_model=cost_model,
- nextgen_series=nextgen_series,
- nextgen_trades=nextgen_trades,
- micro_candidates=micro_candidates,
- )
- summary_frames.append(summary)
- horizon_frames.append(horizons)
- equity_frames.append(equity)
- summary = pd.concat(summary_frames, ignore_index=True)
- horizons = pd.concat(horizon_frames, ignore_index=True)
- equity = pd.concat(equity_frames, ignore_index=True)
- args.output_dir.mkdir(parents=True, exist_ok=True)
- summary_path = args.output_dir / f"{PREFIX}-summary.csv"
- horizon_path = args.output_dir / f"{PREFIX}-horizons.csv"
- equity_path = args.output_dir / f"{PREFIX}-equity.csv"
- json_path = args.output_dir / f"{PREFIX}-top.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- output_files = [summary_path, horizon_path, equity_path, json_path, report_path]
- summary.to_csv(summary_path, index=False)
- horizons.to_csv(horizon_path, index=False)
- equity.to_csv(equity_path, index=False)
- primary = summary[(summary["cost_model"] == PRIMARY_COST) & (summary["kind"] != "baseline")]
- json_path.write_text(json.dumps(primary.sort_values(["kind", "net_calmar"], ascending=[True, False]).head(20).to_dict("records"), indent=2), encoding="utf-8")
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()}"
- report_path.write_text(write_report(summary, horizons, output_files, command), encoding="utf-8")
- print(primary.sort_values(["kind", "net_calmar"], ascending=[True, False]).head(12).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|