from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import trade_equity from scripts.search_live_bb_squeeze_exit_variants import DATA_DIR, INITIAL_EQUITY, LEVERAGE, OUTPUT_DIR, _format_ts ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" ROUNDTRIP_COST = 0.0021 WARMUP_BARS = 960 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ("14d", pd.DateOffset(days=14)), ) @dataclass(frozen=True) class Variant: name: str regime: str dynamic_exposure: bool t_overlay: bool t_profit_pct: float t_fraction: float readd_buffer_pct: float readd_mode: str dynamic_mode: str def load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def align_pair(eth: list[Candle], btc: list[Candle]) -> tuple[list[Candle], list[Candle]]: btc_by_ts = {candle.ts: candle for candle in btc} eth_out: list[Candle] = [] btc_out: list[Candle] = [] for candle in eth: other = btc_by_ts.get(candle.ts) if other is not None: eth_out.append(candle) btc_out.append(other) return eth_out, btc_out def indicators(eth: list[Candle], btc: list[Candle]) -> pd.DataFrame: frame = pd.DataFrame( { "ts": [candle.ts for candle in eth], "open": [candle.open for candle in eth], "high": [candle.high for candle in eth], "low": [candle.low for candle in eth], "close": [candle.close for candle in eth], "btc_close": [candle.close for candle in btc], } ) close = frame["close"].astype(float) btc_close = frame["btc_close"].astype(float) middle = close.rolling(48).mean() stdev = close.rolling(48).std(ddof=0) upper = middle + 2.0 * stdev lower = middle - 2.0 * stdev bandwidth = (upper - lower) / middle frame["middle"] = middle frame["upper"] = upper frame["lower"] = lower frame["bandwidth"] = bandwidth frame["bandwidth_threshold"] = bandwidth.rolling(960).quantile(0.25) frame["eth_vol_96"] = close.pct_change().rolling(96).std(ddof=0) frame["eth_ret_90d"] = close / close.shift(96 * 90) - 1.0 frame["btc_ret_90d"] = btc_close / btc_close.shift(96 * 90) - 1.0 frame["ratio_ret_30d"] = (close / btc_close) / (close / btc_close).shift(96 * 30) - 1.0 frame["eth_vol_30d"] = close.pct_change().rolling(96 * 30).std(ddof=0) frame["eth_vol_365d_median"] = frame["eth_vol_30d"].rolling(365).median() return frame def position_equity(position: dict[str, object] | None, mark_price: float) -> float: if position is None: return 0.0 side = str(position["side"]) total = 0.0 for leg in position["legs"]: # type: ignore[union-attr] total += trade_equity( side=side, margin_used=float(leg["margin"]), entry_price=float(leg["entry_price"]), exit_price=mark_price, leverage=LEVERAGE, ) return total def position_margin(position: dict[str, object] | None) -> float: if position is None: return 0.0 return sum(float(leg["margin"]) for leg in position["legs"]) # type: ignore[union-attr] def position_return(position: dict[str, object], mark_price: float) -> float: margin = position_margin(position) if margin <= 0.0: return 0.0 return position_equity(position, mark_price) / margin - 1.0 def account_equity(cash: float, position: dict[str, object] | None, mark_price: float) -> float: return cash + position_equity(position, mark_price) def close_fraction( *, cash: float, position: dict[str, object], fraction: float, exit_price: float, ts: int, reason: str, trades: list[dict[str, object]], ) -> tuple[float, dict[str, object] | None, float]: side = str(position["side"]) closed_margin = 0.0 exit_value = 0.0 remaining_legs: list[dict[str, float]] = [] for leg in position["legs"]: # type: ignore[union-attr] margin = float(leg["margin"]) close_margin = margin * fraction keep_margin = margin - close_margin if close_margin > 0.0: closed_margin += close_margin exit_value += trade_equity( side=side, margin_used=close_margin, entry_price=float(leg["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) if keep_margin > 1e-9: remaining_legs.append({"margin": keep_margin, "entry_price": float(leg["entry_price"])}) if closed_margin <= 0.0: return cash, position, 0.0 net_exit_value = exit_value - closed_margin * ROUNDTRIP_COST trades.append( { "side": "Long" if side == "long" else "Short", "entry_time": _format_ts(int(position["entry_ts"])), "exit_time": _format_ts(ts), "exit_ts": ts, "entry_price": weighted_entry_price(position), "exit_price": exit_price, "margin": closed_margin, "return_pct": (net_exit_value / closed_margin - 1.0) * 100.0, "exit_reason": reason, } ) cash += net_exit_value if not remaining_legs: return cash, None, closed_margin updated = dict(position) updated["legs"] = remaining_legs updated["t_reduced"] = bool(updated.get("t_reduced")) or reason == "t_reduce" return cash, updated, closed_margin def weighted_entry_price(position: dict[str, object]) -> float: total_margin = position_margin(position) if total_margin <= 0.0: return 0.0 return sum(float(leg["margin"]) * float(leg["entry_price"]) for leg in position["legs"]) / total_margin # type: ignore[union-attr] def add_margin(position: dict[str, object], margin: float, entry_price: float) -> dict[str, object]: updated = dict(position) legs = list(updated["legs"]) # type: ignore[arg-type] legs.append({"margin": margin, "entry_price": entry_price}) updated["legs"] = legs updated["t_reduced"] = False return updated def regime_cap(row: pd.Series, side: str, mode: str) -> float: if mode == "none": return 1.0 eth_ret = float(row["eth_ret_90d"]) btc_ret = float(row["btc_ret_90d"]) ratio_ret = float(row["ratio_ret_30d"]) if any(value != value for value in (eth_ret, btc_ret, ratio_ret)): return 1.0 if mode == "directional": if side == "long": if eth_ret > 0.0 and btc_ret > 0.0 and ratio_ret > -0.05: return 1.0 if btc_ret > -0.05 and eth_ret > -0.12: return 0.65 return 0.0 if eth_ret < 0.0 or ratio_ret < -0.08: return 1.0 if ratio_ret < 0.0 or btc_ret < -0.05: return 0.65 return 0.0 if mode == "vol_scaled": cap = 1.0 if float(row["eth_vol_30d"]) > float(row["eth_vol_365d_median"]): cap *= 0.7 if side == "long" and ratio_ret < -0.05: cap *= 0.5 if side == "short" and ratio_ret > 0.05: cap *= 0.5 return cap raise ValueError(f"unknown regime mode: {mode}") def dynamic_multiplier(row: pd.Series, closed_returns: list[float], variant: Variant) -> float: if variant.dynamic_mode == "none": return 1.0 if variant.dynamic_mode == "vol_only": vol = float(row["eth_vol_96"]) if vol == vol and vol > 0.005: return 0.8 return 1.0 if variant.dynamic_mode == "ratio_soft": ratio = float(row["ratio_ret_30d"]) if ratio != ratio: return 1.0 if ratio < -0.08: return 0.75 if ratio > 0.08: return 1.05 return 1.0 if variant.dynamic_mode != "closed_trade": raise ValueError(f"unknown dynamic mode: {variant.dynamic_mode}") mult = 1.0 vol = float(row["eth_vol_96"]) if vol == vol and vol > 0.005: mult *= 0.7 recent = closed_returns[-20:] if len(recent) >= 8: avg = sum(recent) / len(recent) if avg < -0.01: mult *= 0.5 elif avg > 0.015: mult *= 1.15 return min(1.0, max(0.25, mult)) def should_readd(position: dict[str, object], row: pd.Series, variant: Variant) -> bool: if not variant.t_overlay or not bool(position.get("t_reduced")): return False side = str(position["side"]) middle = float(row["middle"]) close = float(row["close"]) if variant.readd_mode == "never": return False if variant.readd_mode == "middle_trend": if side == "long": return close <= middle * (1.0 + variant.readd_buffer_pct) and float(row["ratio_ret_30d"]) >= -0.03 return close >= middle * (1.0 - variant.readd_buffer_pct) and float(row["ratio_ret_30d"]) <= 0.03 if variant.readd_mode != "middle": raise ValueError(f"unknown readd mode: {variant.readd_mode}") if side == "long": return close <= middle * (1.0 + variant.readd_buffer_pct) return close >= middle * (1.0 - variant.readd_buffer_pct) def target_unit(row: pd.Series, side: str, variant: Variant, closed_returns: list[float]) -> float: return regime_cap(row, side, variant.regime) * dynamic_multiplier(row, closed_returns, variant) def run_variant(frame: pd.DataFrame, variant: Variant) -> dict[str, object]: cash = INITIAL_EQUITY position: dict[str, object] | None = None pending_entry_side: str | None = None pending_full_exit: str | None = None pending_t_reduce = False pending_readd = False cooldown_until = -1 middle_exit_streak = 0 trades: list[dict[str, object]] = [] equity_rows: list[dict[str, object]] = [] event_rows: list[dict[str, object]] = [] closed_returns: list[float] = [] for index in range(WARMUP_BARS, len(frame)): row = frame.iloc[index] ts = int(row["ts"]) open_price = float(row["open"]) close_price = float(row["close"]) if pending_full_exit and position is not None: margin_before = position_margin(position) cash, position, _ = close_fraction( cash=cash, position=position, fraction=1.0, exit_price=open_price, ts=ts, reason=pending_full_exit, trades=trades, ) if trades: closed_returns.append(float(trades[-1]["return_pct"]) / 100.0) event_rows.append({"ts": ts, "event": pending_full_exit, "margin": margin_before}) pending_full_exit = None pending_t_reduce = False pending_readd = False middle_exit_streak = 0 cooldown_until = index + 24 if pending_t_reduce and position is not None: cash, position, closed_margin = close_fraction( cash=cash, position=position, fraction=variant.t_fraction, exit_price=open_price, ts=ts, reason="t_reduce", trades=trades, ) if closed_margin > 0.0 and trades: closed_returns.append(float(trades[-1]["return_pct"]) / 100.0) event_rows.append({"ts": ts, "event": "t_reduce", "margin": closed_margin}) pending_t_reduce = False if pending_readd and position is not None: unit = target_unit(row, str(position["side"]), variant, closed_returns) equity = account_equity(cash, position, open_price) desired_margin = equity * unit missing = max(0.0, desired_margin - position_margin(position)) add = min(cash, missing) if add > 1.0: cash -= add position = add_margin(position, add, open_price) event_rows.append({"ts": ts, "event": "t_readd", "margin": add}) pending_readd = False if pending_entry_side is not None and position is None and cash > 1.0: unit = target_unit(row, pending_entry_side, variant, closed_returns) margin = cash * unit if margin > 1.0: cash -= margin position = { "side": pending_entry_side, "entry_ts": ts, "entry_index": index, "legs": [{"margin": margin, "entry_price": open_price}], "stop_price": open_price * (0.99 if pending_entry_side == "long" else 1.01), "t_reduced": False, } event_rows.append({"ts": ts, "event": f"entry_{pending_entry_side}", "margin": margin}) pending_entry_side = None if position is not None: side = str(position["side"]) stop_hit = (side == "long" and float(row["low"]) <= float(position["stop_price"])) or ( side == "short" and float(row["high"]) >= float(position["stop_price"]) ) if stop_hit: margin_before = position_margin(position) cash, position, _ = close_fraction( cash=cash, position=position, fraction=1.0, exit_price=float(position["stop_price"]), ts=ts, reason="stop", trades=trades, ) if trades: closed_returns.append(float(trades[-1]["return_pct"]) / 100.0) event_rows.append({"ts": ts, "event": "stop", "margin": margin_before}) middle_exit_streak = 0 cooldown_until = index + 24 equity = account_equity(cash, position, close_price) equity_rows.append( { "ts": ts, "time": pd.to_datetime(ts, unit="ms", utc=True), "equity": equity, "cash": cash, "position_margin": position_margin(position), "position_side": "flat" if position is None else str(position["side"]), } ) if index == len(frame) - 1 or equity <= 0.0: continue needed = ("middle", "upper", "lower", "bandwidth", "bandwidth_threshold", "eth_vol_96") if any(float(row[key]) != float(row[key]) for key in needed): continue if position is not None: side = str(position["side"]) middle_exit = (side == "long" and close_price < float(row["middle"]) * (1.0 - 0.0005)) or ( side == "short" and close_price > float(row["middle"]) * (1.0 + 0.0005) ) middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 if middle_exit_streak >= 1: pending_full_exit = "middle_exit" continue if variant.t_overlay and not bool(position.get("t_reduced")): band_half = float(row["upper"]) - float(row["middle"]) far = (side == "long" and close_price >= float(row["upper"]) + 0.2 * band_half) or ( side == "short" and close_price <= float(row["lower"]) - 0.2 * band_half ) if far and position_return(position, close_price) >= variant.t_profit_pct: pending_t_reduce = True continue if should_readd(position, row, variant): pending_readd = True continue if index < cooldown_until or float(row["eth_vol_96"]) > 0.006: continue compressed = float(row["bandwidth"]) <= float(row["bandwidth_threshold"]) if not compressed: continue if close_price > float(row["upper"]): if target_unit(row, "long", variant, closed_returns) > 0.0: pending_entry_side = "long" elif close_price < float(row["lower"]): if target_unit(row, "short", variant, closed_returns) > 0.0: pending_entry_side = "short" return { "variant": variant, "equity": pd.DataFrame(equity_rows), "trades": pd.DataFrame(trades), "events": pd.DataFrame(event_rows), } def max_drawdown(values: pd.Series) -> float: peak = values.cummax() return float(((peak - values) / peak).max()) if len(values) else 0.0 def metrics(equity: pd.DataFrame, trades: pd.DataFrame, label: str, offset: pd.DateOffset | None) -> dict[str, object]: end = pd.Timestamp(equity["time"].iloc[-1]) if offset is None: scoped = equity.copy() else: cutoff = end - offset before = equity[equity["time"] <= cutoff] start_equity = float(before["equity"].iloc[-1]) if len(before) else float(equity["equity"].iloc[0]) after = equity[equity["time"] > cutoff] scoped = pd.concat([pd.DataFrame([{"time": cutoff, "equity": start_equity}]), after[["time", "equity"]]], ignore_index=True) start = pd.Timestamp(scoped["time"].iloc[0]) years = max((end - start).total_seconds() / 86_400 / 365, 1e-9) total_return = float(scoped["equity"].iloc[-1] / scoped["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0 if trades.empty: trade_count = 0 win_rate = 0.0 profit_factor = 0.0 else: trade_times = pd.to_datetime(trades["exit_ts"], unit="ms", utc=True) scoped_trades = trades[(trade_times > start) & (trade_times <= end)] trade_count = len(scoped_trades) returns = scoped_trades["return_pct"].astype(float) / 100.0 if trade_count else pd.Series(dtype=float) win_rate = float((returns > 0.0).mean()) if trade_count else 0.0 gains = float(returns[returns > 0.0].sum()) losses = abs(float(returns[returns < 0.0].sum())) profit_factor = gains / losses if losses > 0.0 else 0.0 dd = max_drawdown(scoped["equity"].astype(float)) return { "horizon": label, "start": start.strftime("%Y-%m-%d %H:%M"), "end": end.strftime("%Y-%m-%d %H:%M"), "total_return": total_return, "annualized_return": annualized, "max_drawdown": dd, "calmar": annualized / dd if dd else 0.0, "trades": trade_count, "trades_per_30d": trade_count / years / 365 * 30, "win_rate": win_rate, "profit_factor": profit_factor, } def build_variants() -> list[Variant]: return [ Variant("baseline", "none", False, False, 0.0, 0.0, 0.0, "middle", "none"), Variant("dynamic_exposure", "none", True, False, 0.0, 0.0, 0.0, "middle", "closed_trade"), Variant("dynamic_vol_only", "none", True, False, 0.0, 0.0, 0.0, "middle", "vol_only"), Variant("dynamic_ratio_soft", "none", True, False, 0.0, 0.0, 0.0, "middle", "ratio_soft"), Variant("regime_directional", "directional", False, False, 0.0, 0.0, 0.0, "middle", "none"), Variant("regime_vol_scaled", "vol_scaled", False, False, 0.0, 0.0, 0.0, "middle", "none"), Variant("t_overlay_p012_f20_never", "none", False, True, 0.012, 0.20, 0.001, "never", "none"), Variant("t_overlay_p012_f30", "none", False, True, 0.012, 0.30, 0.001, "middle", "none"), Variant("t_overlay_p012_f30_never", "none", False, True, 0.012, 0.30, 0.001, "never", "none"), Variant("t_overlay_p018_f20_middle_trend", "none", False, True, 0.018, 0.20, 0.001, "middle_trend", "none"), Variant("t_overlay_p018_f30", "none", False, True, 0.018, 0.30, 0.001, "middle", "none"), Variant("t_overlay_p024_f20_never", "none", False, True, 0.024, 0.20, 0.001, "never", "none"), Variant("dynamic_t_p012_f30", "none", True, True, 0.012, 0.30, 0.001, "middle", "closed_trade"), Variant("dynamic_t_p018_f20_middle_trend", "none", True, True, 0.018, 0.20, 0.001, "middle_trend", "closed_trade"), Variant("vol_dynamic_t_p024_f20_never", "none", True, True, 0.024, 0.20, 0.001, "never", "vol_only"), Variant("ratio_dynamic_t_p024_f20_never", "none", True, True, 0.024, 0.20, 0.001, "never", "ratio_soft"), Variant("regime_directional_dynamic", "directional", True, False, 0.0, 0.0, 0.0, "middle", "closed_trade"), Variant("regime_directional_t_p012_f30", "directional", False, True, 0.012, 0.30, 0.001, "middle", "none"), Variant("layered_directional_dynamic_t_p012_f30", "directional", True, True, 0.012, 0.30, 0.001, "middle", "closed_trade"), Variant("layered_vol_dynamic_t_p012_f30", "vol_scaled", True, True, 0.012, 0.30, 0.001, "middle", "closed_trade"), Variant("layered_vol_dynamic_t_p018_f20_middle_trend", "vol_scaled", True, True, 0.018, 0.20, 0.001, "middle_trend", "closed_trade"), Variant("layered_vol_t_p024_f20_never", "vol_scaled", False, True, 0.024, 0.20, 0.001, "never", "none"), Variant("layered_vol_ratio_t_p024_f20_never", "vol_scaled", True, True, 0.024, 0.20, 0.001, "never", "ratio_soft"), ] def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def write_report(summary: pd.DataFrame, horizons: pd.DataFrame, events: pd.DataFrame, paths: list[Path], command: str) -> str: ranking = summary.sort_values(["recent_min_return", "recent_min_calmar", "full_max_drawdown"], ascending=[False, False, True]) baseline = horizons[horizons["variant"] == "baseline"] best = ranking.iloc[0] decision = "No" baseline_h = horizons[horizons["variant"] == "baseline"].set_index("horizon") best_h = horizons[horizons["variant"] == best["variant"]].set_index("horizon") medium_not_worse = all(float(best_h.loc[horizon, "total_return"]) >= float(baseline_h.loc[horizon, "total_return"]) for horizon in ("1y", "6m", "3m")) short_not_worse = all(float(best_h.loc[horizon, "total_return"]) >= float(baseline_h.loc[horizon, "total_return"]) for horizon in ("30d", "14d")) if best["variant"] != "baseline" and medium_not_worse and short_not_worse and float(best["full_max_drawdown"]) <= float(summary.loc[summary["variant"] == "baseline", "full_max_drawdown"].iloc[0]): decision = "Paper-observe only" return ( "# Layered BB squeeze position exploration\n\n" "Scope: offline local-candle research only. No live executor, deployment, credentials, or order path was changed.\n\n" f"Run command: `{command}`\n\n" "Layer model: long-term regime cap + medium-term live BB squeeze signal + optional dynamic exposure + optional in-position T overlay.\n\n" "Output files:\n" + "\n".join(f"- `{path}`" for path in paths) + "\n\n" "## Summary Ranking\n\n" + markdown_table(ranking[["variant", "full_total_return", "full_max_drawdown", "full_calmar", "recent_min_return", "recent_min_calmar", "trades", "t_reduce_events", "t_readd_events"]]) + "\n\n" "## Baseline Horizons\n\n" + markdown_table(baseline[["horizon", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "trades_per_30d", "win_rate", "profit_factor"]]) + "\n\n" "## Best Variant Horizons\n\n" + markdown_table(horizons[horizons["variant"] == best["variant"]][["horizon", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "trades_per_30d", "win_rate", "profit_factor"]]) + "\n\n" "## Event Counts\n\n" + markdown_table(events) + "\n\n" "## Decision\n\n" f"- Best ranked variant: `{best['variant']}`.\n" f"- Replace live strategy now: {decision}.\n" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth, btc = align_pair(load_candles(ETH_SYMBOL, args.bar), load_candles(BTC_SYMBOL, args.bar)) frame = indicators(eth, btc) summary_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] event_rows: list[dict[str, object]] = [] equity_frames: list[pd.DataFrame] = [] for variant in build_variants(): result = run_variant(frame, variant) equity = result["equity"].copy() trades = result["trades"] events = result["events"] equity["variant"] = variant.name equity_frames.append(equity) horizon_metrics = [metrics(equity, trades, label, offset) for label, offset in HORIZONS] for row in horizon_metrics: horizon_rows.append({"variant": variant.name, **row}) by_horizon = {row["horizon"]: row for row in horizon_metrics} event_counts = events["event"].value_counts().to_dict() if not events.empty else {} event_rows.append( { "variant": variant.name, "entry_events": int(sum(value for key, value in event_counts.items() if str(key).startswith("entry_"))), "middle_exit_events": int(event_counts.get("middle_exit", 0)), "stop_events": int(event_counts.get("stop", 0)), "t_reduce_events": int(event_counts.get("t_reduce", 0)), "t_readd_events": int(event_counts.get("t_readd", 0)), } ) recent_returns = [float(by_horizon[label]["total_return"]) for label in ("1y", "6m", "3m", "30d", "14d")] recent_calmars = [float(by_horizon[label]["calmar"]) for label in ("1y", "6m", "3m", "30d", "14d")] summary_rows.append( { "variant": variant.name, "regime": variant.regime, "dynamic_exposure": variant.dynamic_exposure, "dynamic_mode": variant.dynamic_mode, "t_overlay": variant.t_overlay, "t_profit_pct": variant.t_profit_pct, "t_fraction": variant.t_fraction, "readd_mode": variant.readd_mode, "trades": int(by_horizon["full"]["trades"]), "full_total_return": by_horizon["full"]["total_return"], "full_annualized_return": by_horizon["full"]["annualized_return"], "full_max_drawdown": by_horizon["full"]["max_drawdown"], "full_calmar": by_horizon["full"]["calmar"], "recent_min_return": min(recent_returns), "recent_min_calmar": min(recent_calmars), "t_reduce_events": int(event_counts.get("t_reduce", 0)), "t_readd_events": int(event_counts.get("t_readd", 0)), } ) print(f"done {variant.name}") summary = pd.DataFrame(summary_rows) horizons = pd.DataFrame(horizon_rows) events = pd.DataFrame(event_rows) equity_out = pd.concat(equity_frames, ignore_index=True) args.output_dir.mkdir(parents=True, exist_ok=True) prefix = "layered-bb-squeeze-position" summary_path = args.output_dir / f"{prefix}-summary.csv" horizons_path = args.output_dir / f"{prefix}-horizons.csv" events_path = args.output_dir / f"{prefix}-events.csv" equity_path = args.output_dir / f"{prefix}-equity.csv" report_path = args.output_dir / f"{prefix}-report.md" paths = [summary_path, horizons_path, events_path, equity_path, report_path] summary.to_csv(summary_path, index=False) horizons.to_csv(horizons_path, index=False) events.to_csv(events_path, index=False) equity_out.to_csv(equity_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --output-dir {args.output_dir.as_posix()}" report_path.write_text(write_report(summary, horizons, events, paths, command), encoding="utf-8") print(summary.sort_values(["recent_min_return", "recent_min_calmar"], ascending=[False, False]).head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())