from __future__ import annotations import argparse import json import sys from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts import search_eth_nextgen_micro_portfolio as base OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-nextgen-micro-direction-b" PRIMARY_COST = "maker_taker" COST_MODELS = { "maker_taker": 0.0021, "taker_taker": 0.0030, } MICRO_NAMES = ( "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.25-us", "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.4-us", "atr-compress-expand-r48-q0.15-sl0.008-tp0.016-mf0.25-us", "atr-compress-expand-r48-q0.15-sl0.008-tp0.016-mf0.4-us", ) LOOKBACKS = (30, 60, 90) HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ("21d", pd.DateOffset(days=21)), ("14d", pd.DateOffset(days=14)), ) def short_micro_name(name: str) -> str: return base.short_micro_name(name) def metrics_from_daily(series: pd.Series) -> dict[str, float]: return base.metrics_from_daily(series) def trade_return_stats(trades: list[base.LegReturn]) -> dict[str, float]: values = [trade.value for trade in trades] wins = [value for value in values if value > 0.0] losses = [value for value in values if value < 0.0] gross_profit = sum(wins) gross_loss_abs = abs(sum(losses)) avg_win = gross_profit / len(wins) if wins else 0.0 avg_loss_abs = gross_loss_abs / len(losses) if losses else 0.0 return { "trades": len(values), "win_rate": len(wins) / len(values) if values else 0.0, "avg_return": sum(values) / len(values) if values else 0.0, "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, } def horizon_rows(name: str, cost_model: str, series: pd.Series, trades: list[base.LegReturn]) -> list[dict[str, object]]: end = series.index[-1] rows: list[dict[str, object]] = [] for label, offset in HORIZONS: horizon = series if offset is None else series[series.index >= end - offset] if len(horizon) < 2: horizon = series start = horizon.index[0] selected = [trade for trade in trades if trade.exit_date >= start] years = max((horizon.index[-1] - start).total_seconds() / 86_400 / 365, 1 / 365) rows.append( { "name": name, "cost_model": cost_model, "horizon": label, "horizon_start": start.strftime("%Y-%m-%d"), "horizon_end": horizon.index[-1].strftime("%Y-%m-%d"), "trades_per_year": len(selected) / years, **trade_return_stats(selected), **metrics_from_daily(horizon), } ) return rows def evaluate( *, name: str, cost_model: str, kind: str, micro_name: str, lookback_days: int, micro_weight: float, series: pd.Series, trades: list[base.LegReturn], ) -> tuple[dict[str, object], list[dict[str, object]]]: horizons = horizon_rows(name, cost_model, series, trades) full = next(row for row in horizons if row["horizon"] == "full") recent = [row for row in horizons if row["horizon"] != "full"] return ( { "name": name, "cost_model": cost_model, "kind": kind, "micro_name": micro_name, "lookback_days": lookback_days, "micro_weight": micro_weight, "min_recent_total_return": min(float(row["net_total_return"]) for row in recent), **{key: full[key] for key in ( "trades", "trades_per_year", "win_rate", "avg_return", "payoff_ratio", "profit_factor", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", )}, }, horizons, ) def build_direction_b( *, cost_model: str, nextgen_series: pd.Series, nextgen_trades: list[base.LegReturn], micro_candidates: dict[str, tuple[pd.Series, list[base.LegReturn]]], ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: nextgen_returns = nextgen_series.pct_change().fillna(0.0) rows: list[dict[str, object]] = [] horizon_rows_out: list[dict[str, object]] = [] equity_rows: list[pd.DataFrame] = [] baseline_row, baseline_horizons = evaluate( name=base.NEXTGEN_BASELINE, cost_model=cost_model, kind="baseline", micro_name="", lookback_days=0, micro_weight=0.0, series=nextgen_series, trades=nextgen_trades, ) rows.append(baseline_row) horizon_rows_out.extend(baseline_horizons) equity_rows.append(base.equity_frame(base.NEXTGEN_BASELINE, cost_model, nextgen_series)) for micro_name, (micro_series, micro_trades) in micro_candidates.items(): if micro_name not in MICRO_NAMES: continue micro_returns = micro_series.pct_change().fillna(0.0) for lookback in LOOKBACKS: nextgen_regime = nextgen_series / nextgen_series.shift(lookback) - 1.0 micro_regime = micro_series / micro_series.shift(lookback) - 1.0 active = ((nextgen_regime < 0.0) & (micro_regime > 0.0)).shift(1).fillna(False).astype(bool) switch_name = f"switch-l{lookback}-{short_micro_name(micro_name)}" switch_series = base.returns_to_equity(switch_name, nextgen_returns.where(~active, micro_returns)) switch_trades = base.combine_trade_returns( nextgen_returns=[row for row in nextgen_trades if not bool(active.reindex([row.exit_date]).fillna(False).iloc[0])], micro_returns=micro_trades, nextgen_weight=1.0, micro_weight=1.0, micro_mask=active, ) row, horizons = evaluate( name=switch_name, cost_model=cost_model, kind="recent_regime_switch", micro_name=micro_name, lookback_days=lookback, micro_weight=1.0, series=switch_series, trades=switch_trades, ) rows.append(row) horizon_rows_out.extend(horizons) equity_rows.append(base.equity_frame(switch_name, cost_model, switch_series)) for micro_weight in (0.25, 0.40): overlay_name = f"riskoff-overlay-l{lookback}-m{micro_weight:.2f}-{short_micro_name(micro_name)}" overlay_series = base.returns_to_equity(overlay_name, nextgen_returns + micro_returns.where(active, 0.0) * micro_weight) overlay_trades = base.combine_trade_returns( nextgen_returns=nextgen_trades, micro_returns=micro_trades, nextgen_weight=1.0, micro_weight=micro_weight, micro_mask=active, ) row, horizons = evaluate( name=overlay_name, cost_model=cost_model, kind="riskoff_overlay", micro_name=micro_name, lookback_days=lookback, micro_weight=micro_weight, series=overlay_series, trades=overlay_trades, ) rows.append(row) horizon_rows_out.extend(horizons) equity_rows.append(base.equity_frame(overlay_name, cost_model, overlay_series)) summary = pd.DataFrame(rows).sort_values( ["cost_model", "kind", "net_calmar", "min_recent_total_return", "net_total_return"], ascending=[True, True, False, False, False], ) horizons = pd.DataFrame(horizon_rows_out) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizons = horizons.sort_values(["cost_model", "name", "horizon"]) equity = pd.concat(equity_rows, ignore_index=True) return summary, horizons, equity def markdown_table(frame: pd.DataFrame) -> str: return base.markdown_table(frame) def pct_columns(frame: pd.DataFrame, columns: tuple[str, ...]) -> pd.DataFrame: output = frame.copy() for column in columns: output[column] = output[column].map(lambda value: f"{float(value) * 100:.2f}%") return output def write_report(summary: pd.DataFrame, horizons: pd.DataFrame, output_files: list[Path], command: str) -> str: primary = summary[(summary["cost_model"] == PRIMARY_COST) & (summary["kind"] != "baseline")].copy() stress = summary[(summary["cost_model"] == "taker_taker") & (summary["kind"] != "baseline")].copy() top_primary = primary.sort_values(["kind", "net_calmar", "min_recent_total_return"], ascending=[True, False, False]).head(12) top_switch = primary[primary["kind"] == "recent_regime_switch"].sort_values(["net_calmar", "min_recent_total_return"], ascending=[False, False]).head(5) top_overlay = primary[primary["kind"] == "riskoff_overlay"].sort_values(["net_calmar", "min_recent_total_return"], ascending=[False, False]).head(8) taker_positive = stress[stress["net_total_return"] > 0.0].sort_values(["kind", "net_calmar"], ascending=[True, False]).head(10) best = top_switch.iloc[0] best_horizons = horizons[(horizons["cost_model"] == PRIMARY_COST) & (horizons["name"] == best["name"])].copy() frequency_ok = bool((best_horizons[best_horizons["horizon"].isin(["30d", "21d", "14d"])]["trades"] >= 2).all()) short_window_ok = bool((best_horizons[best_horizons["horizon"].isin(["30d", "21d", "14d"])]["net_total_return"] > 0.0).all()) stress_match = stress[stress["name"] == best["name"]] taker_ok = bool(len(stress_match) and float(stress_match.iloc[0]["net_total_return"]) > 0.0) recommendation = ( "建议作为当前 BB squeeze 的并行只读观察策略;不建议直接列为近期实盘候选。" if frequency_ok and taker_ok and not short_window_ok else "建议只读观察,不建议进入实盘候选。" ) lines = [ "# ETH nextgen + microstructure direction B", "", f"Run command: `{command}`", "", "Scope: lookback 30/60/90, selected ATR compression/expansion US-session micro legs, recent_regime_switch first and small riskoff_overlay checks second.", "Read-only local backtest only; no secrets, private exchange API, order path, executor, or remote service was touched.", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "## Verdict", "", recommendation, f"Best switch candidate: `{best['name']}`. Recent 30d/21d/14d trade counts are {best_horizons[best_horizons['horizon'].isin(['30d', '21d', '14d'])]['trades'].astype(int).tolist()}; taker/taker full-sample positive: `{taker_ok}`; all short windows positive: `{short_window_ok}`.", "", "## Top maker_taker candidates", "", markdown_table( pct_columns( top_primary[ [ "name", "kind", "lookback_days", "micro_weight", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "trades_per_year", "win_rate", "avg_return", "payoff_ratio", "profit_factor", "min_recent_total_return", ] ], ("net_total_return", "net_annualized_return", "net_max_drawdown", "win_rate", "avg_return", "min_recent_total_return"), ) ), "", "## Riskoff overlay checks", "", markdown_table( pct_columns( top_overlay[ [ "name", "lookback_days", "micro_weight", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "trades_per_year", "win_rate", "avg_return", "payoff_ratio", "profit_factor", "min_recent_total_return", ] ], ("net_total_return", "net_annualized_return", "net_max_drawdown", "win_rate", "avg_return", "min_recent_total_return"), ) ), "", "## Taker/taker positive candidates", "", markdown_table( pct_columns( taker_positive[ [ "name", "kind", "lookback_days", "micro_weight", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "trades_per_year", ] ], ("net_total_return", "net_annualized_return", "net_max_drawdown"), ) ), "", "## Best switch windows", "", markdown_table( pct_columns( best_horizons[ [ "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "trades", "trades_per_year", "win_rate", "avg_return", "payoff_ratio", "profit_factor", ] ], ("net_total_return", "net_annualized_return", "net_max_drawdown", "win_rate", "avg_return"), ) ), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() existing_equity = pd.read_csv(args.output_dir / "eth-btc-nextgen-equity.csv") base_equity = existing_equity[(existing_equity["cost_model"] == PRIMARY_COST) & (existing_equity["name"] == base.NEXTGEN_BASELINE)] if base_equity.empty: raise KeyError(f"missing existing nextgen equity for {base.NEXTGEN_BASELINE}") index = pd.DatetimeIndex(pd.to_datetime(base_equity["date"], utc=True)) summary_frames: list[pd.DataFrame] = [] horizon_frames: list[pd.DataFrame] = [] equity_frames: list[pd.DataFrame] = [] for cost_model, roundtrip_cost in COST_MODELS.items(): nextgen_series, nextgen_trades = base.load_nextgen(index, roundtrip_cost) micro_candidates = base.load_micro_candidates(index, roundtrip_cost) summary, horizons, equity = build_direction_b( cost_model=cost_model, nextgen_series=nextgen_series, nextgen_trades=nextgen_trades, micro_candidates=micro_candidates, ) summary_frames.append(summary) horizon_frames.append(horizons) equity_frames.append(equity) summary = pd.concat(summary_frames, ignore_index=True) horizons = pd.concat(horizon_frames, ignore_index=True) equity = pd.concat(equity_frames, ignore_index=True) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / f"{PREFIX}-summary.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" equity_path = args.output_dir / f"{PREFIX}-equity.csv" json_path = args.output_dir / f"{PREFIX}-top.json" report_path = args.output_dir / f"{PREFIX}-report.md" output_files = [summary_path, horizon_path, equity_path, json_path, report_path] summary.to_csv(summary_path, index=False) horizons.to_csv(horizon_path, index=False) equity.to_csv(equity_path, index=False) primary = summary[(summary["cost_model"] == PRIMARY_COST) & (summary["kind"] != "baseline")] json_path.write_text(json.dumps(primary.sort_values(["kind", "net_calmar"], ascending=[True, False]).head(20).to_dict("records"), indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()}" report_path.write_text(write_report(summary, horizons, output_files, command), encoding="utf-8") print(primary.sort_values(["kind", "net_calmar"], ascending=[True, False]).head(12).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())