from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PRIMARY_COST = "maker_taker" COSTS = ( ("maker_maker", 0.0012), ("maker_taker", 0.0021), ("taker_taker", 0.0030), ) HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ("30d", pd.DateOffset(days=30)), ) @dataclass(frozen=True) class RegimeExit: low_stop_atr: float mid_stop_atr: float high_stop_atr: float low_take_atr: float mid_take_atr: float high_take_atr: float low_trail_atr: float mid_trail_atr: float high_trail_atr: float trail_activation_atr: float def params(self, regime: str) -> tuple[float, float, float]: if regime == "low": return self.low_stop_atr, self.low_take_atr, self.low_trail_atr if regime == "high": return self.high_stop_atr, self.high_take_atr, self.high_trail_atr return self.mid_stop_atr, self.mid_take_atr, self.mid_trail_atr @dataclass(frozen=True) class Variant: band_length: int bandwidth_lookback: int bandwidth_quantile: float side_mode: str btc_filter: str cooldown_bars: int atr_length: int rv_length: int rv_quantile_lookback: int low_vol_quantile: float high_vol_quantile: float middle_exit_buffer_pct: float middle_exit_confirm_bars: int exit: RegimeExit @property def name(self) -> str: return ( f"bb-squeeze-dyn-atr-l{self.band_length}-bw{self.bandwidth_lookback}" f"-q{self.bandwidth_quantile:g}-{self.side_mode}-{self.btc_filter}" f"-atr{self.atr_length}-rv{self.rv_length}x{self.rv_quantile_lookback}" f"-vq{self.low_vol_quantile:g}_{self.high_vol_quantile:g}" f"-sl{self.exit.low_stop_atr:g}_{self.exit.mid_stop_atr:g}_{self.exit.high_stop_atr:g}" f"-tp{self.exit.low_take_atr:g}_{self.exit.mid_take_atr:g}_{self.exit.high_take_atr:g}" f"-tr{self.exit.low_trail_atr:g}_{self.exit.mid_trail_atr:g}_{self.exit.high_trail_atr:g}" f"-act{self.exit.trail_activation_atr:g}" f"-cd{self.cooldown_bars}-mxbuf{self.middle_exit_buffer_pct:g}-mxc{self.middle_exit_confirm_bars}" ) def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def _load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def _align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_out: list[Candle] = [] right_out: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is not None: left_out.append(candle) right_out.append(other) return left_out, right_out def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, reason: str, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_ts": int(position["entry_time"]), "exit_ts": candle.ts, "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, "exit_reason": reason, "entry_vol_regime": position["entry_vol_regime"], "stop_atr": position["stop_atr"], "take_atr": position["take_atr"], "trail_atr": position["trail_atr"], "mfe_pct": round(float(position["mfe_pct"]) * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def true_range_series(eth: list[Candle]) -> pd.Series: high = pd.Series([candle.high for candle in eth], dtype=float) low = pd.Series([candle.low for candle in eth], dtype=float) close = pd.Series([candle.close for candle in eth], dtype=float) previous_close = close.shift(1) return pd.concat([(high - low), (high - previous_close).abs(), (low - previous_close).abs()], axis=1).max(axis=1) def vol_regime(realized_vol: float, low_threshold: float, high_threshold: float) -> str: if realized_vol <= low_threshold: return "low" if realized_vol >= high_threshold: return "high" return "mid" def favorable_move(side: str, entry_price: float, candle: Candle) -> float: if side == "long": return candle.high / entry_price - 1.0 return entry_price / candle.low - 1.0 def exit_position( position: dict[str, object], candle: Candle, atr_pct: float, variant: Variant, ) -> tuple[float, str] | None: side = str(position["side"]) entry_price = float(position["entry_price"]) stop_price = float(position["stop_price"]) take_price = position["take_price"] mfe_atr = float(position["mfe_pct"]) / atr_pct if atr_pct > 0.0 else 0.0 if mfe_atr >= variant.exit.trail_activation_atr: if side == "long": trail_stop = float(position["best_price"]) * (1.0 - float(position["trail_atr"]) * atr_pct) stop_price = max(stop_price, trail_stop) else: trail_stop = float(position["best_price"]) * (1.0 + float(position["trail_atr"]) * atr_pct) stop_price = min(stop_price, trail_stop) position["stop_price"] = stop_price if side == "long": if candle.open <= stop_price: return candle.open, "trail_gap" if stop_price != float(position["initial_stop_price"]) else "stop_gap" if take_price is not None and candle.open >= float(take_price): return candle.open, "take_gap" stop_hit = candle.low <= stop_price take_hit = take_price is not None and candle.high >= float(take_price) else: if candle.open >= stop_price: return candle.open, "trail_gap" if stop_price != float(position["initial_stop_price"]) else "stop_gap" if take_price is not None and candle.open <= float(take_price): return candle.open, "take_gap" stop_hit = candle.high >= stop_price take_hit = take_price is not None and candle.low <= float(take_price) if stop_hit: return stop_price, "trailing_stop" if stop_price != float(position["initial_stop_price"]) else "stop" if take_hit: return float(take_price), "take_profit" return None def run_variant(eth: list[Candle], btc: list[Candle], variant: Variant) -> tuple[SegmentResult, dict[str, int]]: eth_close = pd.Series([candle.close for candle in eth], dtype=float) btc_close = pd.Series([candle.close for candle in btc], dtype=float) middle_series = eth_close.rolling(variant.band_length).mean() stdev_series = eth_close.rolling(variant.band_length).std(ddof=0) upper_values = middle_series + 2.0 * stdev_series lower_values = middle_series - 2.0 * stdev_series bandwidth_series = (upper_values - lower_values) / middle_series threshold_series = bandwidth_series.rolling(variant.bandwidth_lookback).quantile(variant.bandwidth_quantile) btc_sma = btc_close.rolling(480).mean() btc_momentum = btc_close / btc_close.shift(96) - 1.0 realized_vol = eth_close.pct_change().rolling(variant.rv_length).std(ddof=0) low_vol = realized_vol.rolling(variant.rv_quantile_lookback).quantile(variant.low_vol_quantile) high_vol = realized_vol.rolling(variant.rv_quantile_lookback).quantile(variant.high_vol_quantile) atr_pct = true_range_series(eth).rolling(variant.atr_length).mean() / eth_close warmup_bars = max( variant.band_length, variant.bandwidth_lookback, variant.rv_length + variant.rv_quantile_lookback, variant.atr_length, 480, 96, ) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False middle_exit_streak = 0 cooldown_until = -1 exit_counts = { "stop_exits": 0, "take_profit_exits": 0, "trailing_stop_exits": 0, "signal_exits": 0, "low_vol_entries": 0, "mid_vol_entries": 0, "high_vol_entries": 0, } middle = middle_series.tolist() upper = upper_values.tolist() lower = lower_values.tolist() bandwidth = bandwidth_series.tolist() threshold = threshold_series.tolist() btc_sma_values = btc_sma.tolist() btc_momentum_values = btc_momentum.tolist() realized_vol_values = realized_vol.tolist() low_vol_values = low_vol.tolist() high_vol_values = high_vol.tolist() atr_pct_values = atr_pct.tolist() for index in range(warmup_bars, len(eth)): candle = eth[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, reason="signal_middle", ) wins += int(won) exit_counts["signal_exits"] += 1 position = None pending_exit = False middle_exit_streak = 0 cooldown_until = index + variant.cooldown_bars if pending_entry_side is not None and position is None and equity > 0.0: entry_price = candle.open regime = vol_regime(float(realized_vol_values[index - 1]), float(low_vol_values[index - 1]), float(high_vol_values[index - 1])) stop_atr, take_atr, trail_atr = variant.exit.params(regime) entry_atr_pct = float(atr_pct_values[index - 1]) stop_distance = stop_atr * entry_atr_pct take_distance = take_atr * entry_atr_pct stop_price = entry_price * (1.0 - stop_distance if pending_entry_side == "long" else 1.0 + stop_distance) take_price = None if take_atr > 0.0: take_price = entry_price * (1.0 + take_distance if pending_entry_side == "long" else 1.0 - take_distance) position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": entry_price, "margin_used": equity, "initial_stop_price": stop_price, "stop_price": stop_price, "take_price": take_price, "best_price": candle.high if pending_entry_side == "long" else candle.low, "mfe_pct": 0.0, "entry_vol_regime": regime, "stop_atr": stop_atr, "take_atr": take_atr, "trail_atr": trail_atr, } exit_counts[f"{regime}_vol_entries"] += 1 entries.append({"ts": candle.ts, "price": entry_price, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: if position["side"] == "long": position["best_price"] = max(float(position["best_price"]), candle.high) else: position["best_price"] = min(float(position["best_price"]), candle.low) position["mfe_pct"] = max(float(position["mfe_pct"]), favorable_move(str(position["side"]), float(position["entry_price"]), candle)) risk_exit = exit_position(position, candle, float(atr_pct_values[index]), variant) if risk_exit is not None: exit_price, reason = risk_exit equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price, reason=reason, ) wins += int(won) if reason.startswith("take"): exit_counts["take_profit_exits"] += 1 elif reason.startswith("trail"): exit_counts["trailing_stop_exits"] += 1 else: exit_counts["stop_exits"] += 1 current_equity = equity position = None middle_exit_streak = 0 cooldown_until = index + variant.cooldown_bars if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth) - 1 or equity <= 0.0: continue values = ( middle[index], upper[index], lower[index], bandwidth[index], threshold[index], btc_sma_values[index], btc_momentum_values[index], realized_vol_values[index], low_vol_values[index], high_vol_values[index], atr_pct_values[index], ) if any(value != value for value in values): continue if position is not None: middle_exit = ( position["side"] == "long" and candle.close < float(middle[index]) * (1.0 - variant.middle_exit_buffer_pct) ) or ( position["side"] == "short" and candle.close > float(middle[index]) * (1.0 + variant.middle_exit_buffer_pct) ) middle_exit_streak = middle_exit_streak + 1 if middle_exit else 0 if middle_exit_streak >= variant.middle_exit_confirm_bars: pending_exit = True continue if index < cooldown_until: continue if variant.btc_filter == "btc-up" and not (btc_close.iloc[index] > float(btc_sma_values[index])): continue if variant.btc_filter == "btc-up-momo" and not ( btc_close.iloc[index] > float(btc_sma_values[index]) and float(btc_momentum_values[index]) > 0.0 ): continue if bandwidth[index] <= threshold[index]: if candle.close > float(upper[index]): pending_entry_side = "long" elif variant.side_mode == "both" and candle.close < float(lower[index]): pending_entry_side = "short" return ( SegmentResult( trade_count=len(trades), total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / len(trades) if trades else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ), exit_counts, ) def cost_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(int(trade["exit_ts"]), unit="ms", utc=True), "equity": equity}) return pd.DataFrame(rows) def max_drawdown(values: list[float]) -> float: peak = values[0] dd = 0.0 for value in values: peak = max(peak, value) dd = max(dd, (peak - value) / peak if peak else 0.0) return dd def equity_metrics(frame: pd.DataFrame, start_time: pd.Timestamp, end_time: pd.Timestamp) -> dict[str, float]: years = (end_time - start_time).total_seconds() / 86_400 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 dd = max_drawdown([float(value) for value in frame["equity"]]) return { "net_total_return": total_return, "net_annualized_return": annualized, "net_max_drawdown": dd, "net_calmar": annualized / dd if dd else 0.0, } def trade_stats(trades: list[dict[str, object]]) -> dict[str, float | int]: returns = [float(trade["return_pct"]) for trade in trades] wins = [value for value in returns if value > 0.0] losses = [-value for value in returns if value < 0.0] reasons = {str(trade["exit_reason"]) for trade in trades} return { "trades": len(trades), "win_rate": len(wins) / len(trades) if trades else 0.0, "avg_return_pct": sum(returns) / len(returns) if returns else 0.0, "avg_mfe_pct": sum(float(trade["mfe_pct"]) for trade in trades) / len(trades) if trades else 0.0, "payoff_ratio": (sum(wins) / len(wins)) / (sum(losses) / len(losses)) if wins and losses else 0.0, "profit_factor": sum(wins) / sum(losses) if losses else 0.0, **{f"exit_{reason}": sum(1 for trade in trades if trade["exit_reason"] == reason) for reason in sorted(reasons)}, } def horizon_rows(frame: pd.DataFrame, trades: list[dict[str, object]], first_ts: int, last_ts: int) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] start = pd.to_datetime(first_ts, unit="ms", utc=True) end = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in HORIZONS: cutoff = start if offset is None else end - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) after = frame[frame["ts"] > cutoff] horizon_frame = pd.concat([pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), after[["ts", "equity"]]], ignore_index=True) else: horizon_frame = frame[["ts", "equity"]].copy() cutoff = pd.Timestamp(horizon_frame["ts"].iloc[0]) cutoff_ms = int(cutoff.timestamp() * 1000) horizon_trades = [trade for trade in trades if int(trade["exit_ts"]) >= cutoff_ms] rows.append( { "horizon": label, "horizon_start": cutoff.strftime("%Y-%m-%d %H:%M"), "horizon_end": end.strftime("%Y-%m-%d %H:%M"), **equity_metrics(horizon_frame, cutoff, end), **trade_stats(horizon_trades), } ) return rows def worst_month(frame: pd.DataFrame) -> tuple[str, float]: monthly = frame.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna() if not len(monthly): return "", 0.0 idx = monthly.idxmin() return idx.strftime("%Y-%m"), float(monthly.loc[idx]) def build_variants() -> list[Variant]: bases = ( (48, 960, 0.25, "both", "none", 24, 0.0005, 1), (48, 960, 0.25, "both", "none", 24, 0.0010, 1), (96, 480, 0.15, "both", "none", 24, 0.0010, 1), (96, 960, 0.25, "both", "btc-up", 24, 0.0010, 1), (96, 960, 0.25, "both", "btc-up-momo", 24, 0.0010, 1), ) exits = ( RegimeExit(1.6, 1.9, 2.3, 3.0, 3.8, 5.0, 1.2, 1.5, 1.9, 1.8), RegimeExit(1.8, 2.2, 2.8, 3.2, 4.5, 6.0, 1.3, 1.8, 2.4, 2.0), RegimeExit(2.0, 2.6, 3.2, 4.0, 5.5, 7.0, 1.5, 2.1, 2.8, 2.2), RegimeExit(1.4, 1.8, 2.4, 2.8, 3.5, 4.8, 1.0, 1.4, 2.0, 1.6), RegimeExit(2.2, 2.8, 3.6, 4.2, 6.0, 8.0, 1.8, 2.4, 3.2, 2.4), RegimeExit(1.8, 2.4, 3.0, 0.0, 0.0, 0.0, 1.2, 1.8, 2.5, 1.8), ) variants: list[Variant] = [] for band_length, lookback, quantile, side_mode, btc_filter, cooldown, middle_buffer, middle_confirm in bases: for exit_spec in exits: variants.append( Variant( band_length=band_length, bandwidth_lookback=lookback, bandwidth_quantile=quantile, side_mode=side_mode, btc_filter=btc_filter, cooldown_bars=cooldown, atr_length=96, rv_length=96, rv_quantile_lookback=960, low_vol_quantile=0.35, high_vol_quantile=0.70, middle_exit_buffer_pct=middle_buffer, middle_exit_confirm_bars=middle_confirm, exit=exit_spec, ) ) return variants def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def write_report(summary: pd.DataFrame, horizon: pd.DataFrame, first_ts: int, last_ts: int, command: str) -> str: primary = summary[summary["cost"] == PRIMARY_COST] top = primary.head(10) horizon_top = ( horizon[horizon["cost"] == PRIMARY_COST] .sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) .groupby("horizon", observed=True) .head(3) ) return "\n".join( [ "# ETH dynamic ATR exit exploration", "", f"Run command: `{command}`", f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.", "", "Scope: ETH/BTC local OKX 15m candle cache. Entry remains BB squeeze breakout. Exits use entry realized-vol regime to choose ATR stop, ATR take-profit, and ATR trailing distance.", "", "Top 10 by maker_taker Calmar:", markdown_table( top[ [ "name", "trades", "win_rate", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "profit_factor", "payoff_ratio", "stop_exits", "take_profit_exits", "trailing_stop_exits", "signal_exits", "low_vol_entries", "mid_vol_entries", "high_vol_entries", ] ] ), "", "Horizon leaders:", markdown_table( horizon_top[ [ "horizon", "name", "trades", "win_rate", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "profit_factor", "payoff_ratio", ] ] ), ] ) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth = _load_candles(ETH_SYMBOL, args.bar) btc = _load_candles(BTC_SYMBOL, args.bar) eth, btc = _align_pair(eth, btc) requested_bars = int(args.years * 365 * 24 * 60 / 15) eth = eth[-requested_bars:] btc = btc[-requested_bars:] summary_rows: list[dict[str, object]] = [] horizon_rows_out: list[dict[str, object]] = [] variants = build_variants() for index, variant in enumerate(variants, start=1): result, exit_counts = run_variant(eth, btc, variant) if not result.equity_curve: continue for cost_name, cost in COSTS: frame = cost_equity_frame(result, cost) metrics = equity_metrics( frame, pd.to_datetime(eth[0].ts, unit="ms", utc=True), pd.to_datetime(eth[-1].ts, unit="ms", utc=True), ) month, month_return = worst_month(frame) stats = trade_stats(result.trades) row = { "family": "bb_squeeze_dynamic_atr_exits", "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL if variant.btc_filter != "none" else "", "bar": args.bar, "name": variant.name, "band_length": variant.band_length, "bandwidth_lookback": variant.bandwidth_lookback, "bandwidth_quantile": variant.bandwidth_quantile, "side_mode": variant.side_mode, "btc_filter": variant.btc_filter, "cooldown_bars": variant.cooldown_bars, "atr_length": variant.atr_length, "rv_length": variant.rv_length, "rv_quantile_lookback": variant.rv_quantile_lookback, "low_vol_quantile": variant.low_vol_quantile, "high_vol_quantile": variant.high_vol_quantile, "middle_exit_buffer_pct": variant.middle_exit_buffer_pct, "middle_exit_confirm_bars": variant.middle_exit_confirm_bars, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": (eth[-1].ts - eth[0].ts) / 86_400_000 / 365, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month": month, "worst_month_return": month_return, **exit_counts, **stats, **metrics, } summary_rows.append(row) for horizon_row in horizon_rows(frame, result.trades, eth[0].ts, eth[-1].ts): horizon_rows_out.append( { "family": "bb_squeeze_dynamic_atr_exits", "cost": cost_name, "symbol": ETH_SYMBOL, "bar": args.bar, "name": variant.name, **horizon_row, } ) print(f"done {index}/{len(variants)} {variant.name}", flush=True) summary = pd.DataFrame(summary_rows).sort_values( ["cost", "net_calmar", "net_annualized_return", "profit_factor"], ascending=[True, False, False, False], ) primary = summary[summary["cost"] == PRIMARY_COST] summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True) horizon = pd.DataFrame(horizon_rows_out) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizon = horizon.sort_values(["cost", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / "eth-dynamic-atr-exits-summary.csv" horizon_path = args.output_dir / "eth-dynamic-atr-exits-horizon.csv" report_path = args.output_dir / "eth-dynamic-atr-exits-report.md" summary.to_csv(summary_path, index=False) horizon.to_csv(horizon_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}" report_path.write_text(write_report(summary, horizon, eth[0].ts, eth[-1].ts, command), encoding="utf-8") print(primary.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())