from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.donchian_report import DonchianConfig, run_donchian_segment from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" BAR = "15m" YEARS = 10.0 LEVERAGE = 3 INITIAL_EQUITY = 10_000.0 DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") PRIMARY_COST = "maker_taker" COSTS = ( ("maker_maker", 0.0012), ("maker_taker", 0.0021), ("taker_taker", 0.0030), ) HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Strategy: family: str name: str warmup_bars: int pair: bool run: object def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def _load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def _align_pair(left: list[Candle], right: list[Candle]) -> tuple[list[Candle], list[Candle]]: right_by_ts = {candle.ts: candle for candle in right} left_out: list[Candle] = [] right_out: list[Candle] = [] for candle in left: other = right_by_ts.get(candle.ts) if other is not None: left_out.append(candle) right_out.append(other) return left_out, right_out def _close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - margin_used trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": _format_ts(int(position["entry_time"])), "exit_time": _format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / margin_used * 100.0, 4), "cost_weight": 1.0, } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def _empty_result(candles: list[Candle], warmup_bars: int, equity_curve: list[dict[str, float | int]]) -> SegmentResult: return SegmentResult( trade_count=0, total_return=0.0, win_rate=0.0, max_drawdown=0.0, trades=[], open_position=None, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=[], exits=[], ) def run_bb_pullback_segment( *, candles: list[Candle], trend_sma: int, band_length: int, std_multiplier: float, stop_loss_pct: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) trend = closes.rolling(trend_sma).mean().tolist() middle = closes.rolling(band_length).mean().tolist() stdev = closes.rolling(band_length).std(ddof=0).tolist() upper = [m + std_multiplier * s if m == m and s == s else float("nan") for m, s in zip(middle, stdev)] lower = [m - std_multiplier * s if m == m and s == s else float("nan") for m, s in zip(middle, stdev)] warmup_bars = max(trend_sma, band_length) return _run_indicator_segment(candles, warmup_bars, trend, middle, upper, lower, stop_loss_pct, "pullback") def run_bb_squeeze_breakout_segment( *, candles: list[Candle], band_length: int, std_multiplier: float, bandwidth_lookback: int, bandwidth_quantile: float, stop_loss_pct: float, ) -> SegmentResult: closes = pd.Series([candle.close for candle in candles], dtype=float) middle_series = closes.rolling(band_length).mean() stdev_series = closes.rolling(band_length).std(ddof=0) upper_series = middle_series + std_multiplier * stdev_series lower_series = middle_series - std_multiplier * stdev_series bandwidth = ((upper_series - lower_series) / middle_series).tolist() threshold = pd.Series(bandwidth, dtype=float).rolling(bandwidth_lookback).quantile(bandwidth_quantile).tolist() warmup_bars = max(band_length, bandwidth_lookback) return _run_squeeze_segment( candles, warmup_bars, middle_series.tolist(), upper_series.tolist(), lower_series.tolist(), bandwidth, threshold, stop_loss_pct, ) def _run_indicator_segment( candles: list[Candle], warmup_bars: int, trend: list[float], middle: list[float], upper: list[float], lower: list[float], stop_loss_pct: float, mode: str, ) -> SegmentResult: if len(candles) <= warmup_bars: return _empty_result(candles, warmup_bars, []) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += int(won) position = None pending_exit = False if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, "stop_price": candle.open * (1.0 - stop_loss_pct if pending_entry_side == "long" else 1.0 + stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: stop_hit = (position["side"] == "long" and candle.low <= float(position["stop_price"])) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: equity, won = _close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), ) wins += int(won) current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue values = (trend[index], middle[index], upper[index], lower[index]) if any(value != value for value in values): continue if position is not None: if (position["side"] == "long" and candle.close >= float(middle[index])) or ( position["side"] == "short" and candle.close <= float(middle[index]) ): pending_exit = True continue if mode == "pullback": if candle.close > float(trend[index]) and candle.close <= float(lower[index]): pending_entry_side = "long" elif candle.close < float(trend[index]) and candle.close >= float(upper[index]): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def _run_squeeze_segment( candles: list[Candle], warmup_bars: int, middle: list[float], upper: list[float], lower: list[float], bandwidth: list[float], threshold: list[float], stop_loss_pct: float, ) -> SegmentResult: if len(candles) <= warmup_bars: return _empty_result(candles, warmup_bars, []) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False for index in range(warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = _close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += int(won) position = None pending_exit = False if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, "stop_price": candle.open * (1.0 - stop_loss_pct if pending_entry_side == "long" else 1.0 + stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: stop_hit = (position["side"] == "long" and candle.low <= float(position["stop_price"])) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: equity, won = _close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), ) wins += int(won) current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue values = (middle[index], upper[index], lower[index], bandwidth[index], threshold[index]) if any(value != value for value in values): continue if position is not None: if (position["side"] == "long" and candle.close < float(middle[index])) or ( position["side"] == "short" and candle.close > float(middle[index]) ): pending_exit = True continue if bandwidth[index] <= threshold[index]: if candle.close > float(upper[index]): pending_entry_side = "long" elif candle.close < float(lower[index]): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def run_ethbtc_ratio_segment( *, eth_candles: list[Candle], btc_candles: list[Candle], mode: str, ratio_length: int, eth_trend_sma: int, exit_length: int, std_multiplier: float, stop_loss_pct: float, ) -> SegmentResult: eth_close = pd.Series([candle.close for candle in eth_candles], dtype=float) btc_close = pd.Series([candle.close for candle in btc_candles], dtype=float) ratio = eth_close / btc_close ratio_high = ratio.shift(1).rolling(ratio_length).max().tolist() ratio_low = ratio.shift(1).rolling(ratio_length).min().tolist() ratio_mid = ratio.rolling(exit_length).mean().tolist() ratio_std = ratio.rolling(exit_length).std(ddof=0).tolist() trend = eth_close.rolling(eth_trend_sma).mean().tolist() upper = [m + std_multiplier * s if m == m and s == s else float("nan") for m, s in zip(ratio_mid, ratio_std)] lower = [m - std_multiplier * s if m == m and s == s else float("nan") for m, s in zip(ratio_mid, ratio_std)] warmup_bars = max(ratio_length, eth_trend_sma, exit_length) if len(eth_candles) <= warmup_bars: return _empty_result(eth_candles, warmup_bars, []) equity = INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry_side: str | None = None pending_exit = False for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = _close_position(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += int(won) position = None pending_exit = False if pending_entry_side is not None and position is None and equity > 0.0: position = { "side": pending_entry_side, "entry_time": candle.ts, "entry_price": candle.open, "margin_used": equity, "stop_price": candle.open * (1.0 - stop_loss_pct if pending_entry_side == "long" else 1.0 + stop_loss_pct), } entries.append({"ts": candle.ts, "price": candle.open, "side": pending_entry_side}) pending_entry_side = None current_equity = equity if position is not None: stop_hit = (position["side"] == "long" and candle.low <= float(position["stop_price"])) or ( position["side"] == "short" and candle.high >= float(position["stop_price"]) ) if stop_hit: equity, won = _close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=float(position["stop_price"]), ) wins += int(won) current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue current_ratio = float(ratio.iloc[index]) values = (ratio_high[index], ratio_low[index], ratio_mid[index], trend[index]) if any(value != value for value in values): continue if position is not None: if (position["side"] == "long" and current_ratio < float(ratio_mid[index])) or ( position["side"] == "short" and current_ratio > float(ratio_mid[index]) ): pending_exit = True continue if mode == "breakout": if candle.close > float(trend[index]) and current_ratio > float(ratio_high[index]): pending_entry_side = "long" elif candle.close < float(trend[index]) and current_ratio < float(ratio_low[index]): pending_entry_side = "short" elif upper[index] == upper[index] and lower[index] == lower[index]: if current_ratio <= float(lower[index]): pending_entry_side = "long" elif current_ratio >= float(upper[index]): pending_entry_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def build_strategies() -> list[Strategy]: strategies: list[Strategy] = [] for entry in (20, 48, 96): for exit_window in (10, 24, 48): for stop in (0.008, 0.012): name = f"donchian-e{entry}-x{exit_window}-sl{stop}" config = DonchianConfig(entry_window=entry, exit_window=exit_window, stop_loss_pct=stop) strategies.append( Strategy( "donchian", name, max(entry, exit_window), False, lambda candles, config=config: run_donchian_segment( candles=candles, leverage=LEVERAGE, warmup_bars=max(config.entry_window, config.exit_window), config=config, ), ) ) for lookback in (16, 32, 64): for take, stop in ((0.008, 0.006), (0.012, 0.008), (0.018, 0.012)): strategies.append( Strategy( "momentum_breakout", f"range-momo-l{lookback}-tp{take}-sl{stop}", lookback, False, lambda candles, lookback=lookback, take=take, stop=stop: explore.run_range_momentum_segment( candles=candles, leverage=LEVERAGE, warmup_bars=lookback, lookback=lookback, take_profit_pct=take, stop_loss_pct=stop, ), ) ) for window in (48, 96, 192): for entry_z in (1.5, 2.0, 2.5): for stop in (0.006, 0.01): strategies.append( Strategy( "vwap_reversion", f"vwap-revert-w{window}-z{entry_z}-sl{stop}", window * 2, False, lambda candles, window=window, entry_z=entry_z, stop=stop: explore.run_vwap_reversion_segment( candles=candles, leverage=LEVERAGE, warmup_bars=window * 2, window=window, entry_z=entry_z, exit_z=0.2, stop_loss_pct=stop, ), ) ) for trend in (240, 480): for length in (48, 96): for stop in (0.008, 0.012): strategies.append( Strategy( "bb_band_pullback", f"bb-pullback-t{trend}-l{length}-sl{stop}", max(trend, length), False, lambda candles, trend=trend, length=length, stop=stop: run_bb_pullback_segment( candles=candles, trend_sma=trend, band_length=length, std_multiplier=2.0, stop_loss_pct=stop, ), ) ) for length in (48, 96): for bandwidth_lookback in (480, 960): for quantile in (0.15, 0.25): for stop in (0.008, 0.012): strategies.append( Strategy( "bb_squeeze_breakout", f"bb-squeeze-l{length}-bw{bandwidth_lookback}-q{quantile}-sl{stop}", max(length, bandwidth_lookback), False, lambda candles, length=length, bandwidth_lookback=bandwidth_lookback, quantile=quantile, stop=stop: run_bb_squeeze_breakout_segment( candles=candles, band_length=length, std_multiplier=2.0, bandwidth_lookback=bandwidth_lookback, bandwidth_quantile=quantile, stop_loss_pct=stop, ), ) ) for mode in ("breakout", "mean_reversion"): for ratio_length in (96, 240): for trend in (240, 480): for exit_length in (48, 96): for stop in (0.008, 0.012): strategies.append( Strategy( f"ethbtc_relative_strength_{mode}", f"ethbtc-ratio-{mode}-r{ratio_length}-t{trend}-x{exit_length}-sl{stop}", max(ratio_length, trend, exit_length), True, lambda eth, btc, mode=mode, ratio_length=ratio_length, trend=trend, exit_length=exit_length, stop=stop: run_ethbtc_ratio_segment( eth_candles=eth, btc_candles=btc, mode=mode, ratio_length=ratio_length, eth_trend_sma=trend, exit_length=exit_length, std_multiplier=2.0, stop_loss_pct=stop, ), ) ) return strategies def cost_equity_frame(result: SegmentResult, cost: float) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0)) rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) return pd.DataFrame(rows) def max_drawdown(values: list[float]) -> float: peak = values[0] dd = 0.0 for value in values: peak = max(peak, value) dd = max(dd, (peak - value) / peak if peak else 0.0) return dd def equity_metrics(frame: pd.DataFrame, first_ts: int, last_ts: int) -> dict[str, float]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(frame["equity"].iloc[-1] / frame["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0 dd = max_drawdown([float(value) for value in frame["equity"]]) return { "net_total_return": total_return, "net_annualized_return": annualized, "net_max_drawdown": dd, "net_calmar": annualized / dd if dd else 0.0, } def horizon_rows(frame: pd.DataFrame, last_ts: int, horizons: tuple[tuple[str, pd.DateOffset], ...]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = pd.to_datetime(last_ts, unit="ms", utc=True) for label, offset in horizons: cutoff = end_time - offset before = frame[frame["ts"] <= cutoff] if len(before): start_equity = float(before["equity"].iloc[-1]) horizon_frame = pd.concat( [pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), frame[frame["ts"] > cutoff][["ts", "equity"]]], ignore_index=True, ) start_time = cutoff else: horizon_frame = frame[["ts", "equity"]].copy() start_time = pd.Timestamp(horizon_frame["ts"].iloc[0]) rows.append( { "horizon": label, "horizon_start": start_time.strftime("%Y-%m-%d %H:%M"), "horizon_end": end_time.strftime("%Y-%m-%d %H:%M"), **equity_metrics(horizon_frame, int(start_time.timestamp() * 1000), last_ts), } ) return rows def worst_month(frame: pd.DataFrame) -> tuple[str, float]: monthly = frame.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna() if not len(monthly): return "", 0.0 idx = monthly.idxmin() return idx.strftime("%Y-%m"), float(monthly.loc[idx]) def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def write_report( *, summary: pd.DataFrame, horizon: pd.DataFrame, output_files: list[Path], command: str, first_ts: int, last_ts: int, requested_years: float, ) -> str: primary = summary[summary["cost"] == PRIMARY_COST].head(10) family = ( summary[summary["cost"] == PRIMARY_COST] .groupby("family", as_index=False) .agg( best_calmar=("net_calmar", "max"), best_annualized=("net_annualized_return", "max"), best_total=("net_total_return", "max"), candidates=("name", "count"), ) .sort_values(["best_calmar", "best_annualized"], ascending=False) ) horizon_top = ( horizon[horizon["cost"] == PRIMARY_COST] .sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) .groupby("horizon", observed=True) .head(3) ) primary_frame = summary[summary["cost"] == PRIMARY_COST] family_best = primary_frame.groupby("family", as_index=False).head(1).set_index("family") next_round_lines: list[str] = [] if "bb_squeeze_breakout" in family_best.index: row = family_best.loc["bb_squeeze_breakout"] next_round_lines.append( f"- Primary: BB squeeze breakout. Best row `{row['name']}` has Calmar {format_cell(row['net_calmar'])}, annualized {format_cell(row['net_annualized_return'])}, trades {row['trades']}." ) if "ethbtc_relative_strength_breakout" in family_best.index: row = family_best.loc["ethbtc_relative_strength_breakout"] next_round_lines.append( f"- Secondary: ETH/BTC relative-strength breakout. Best row `{row['name']}` is positive but DD-heavy: Calmar {format_cell(row['net_calmar'])}, DD {format_cell(row['net_max_drawdown'])}." ) if "bb_band_pullback" in family_best.index: row = family_best.loc["bb_band_pullback"] next_round_lines.append( f"- Secondary: BB band pullback. Lower return than squeeze, but smoother DD profile in the best row `{row['name']}`." ) weak = [ family for family in ("vwap_reversion", "momentum_breakout", "ethbtc_relative_strength_mean_reversion") if family in family_best.index and float(family_best.loc[family]["net_total_return"]) <= -0.99 ] if weak: next_round_lines.append(f"- Drop for now: {', '.join(weak)} ended near full loss after maker_taker costs.") if "donchian" in family_best.index: row = family_best.loc["donchian"] next_round_lines.append( f"- Low priority: Donchian best Calmar is only {format_cell(row['net_calmar'])}; keep only as a benchmark." ) lines = [ "# ETH non-RSI 10y exploration", "", f"Run command: `{command}`", f"Requested years: {requested_years:g}", f"Actual continuous local history: `{_format_ts(first_ts)}` to `{_format_ts(last_ts)}`.", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Primary sort: maker_taker by net_calmar, then net_annualized_return, then net_total_return.", "", "Top 10:", markdown_table( primary[ [ "family", "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "worst_month", "worst_month_return", ] ] ), "", "Family leaders:", markdown_table(family), "", "Recent horizon leaders:", markdown_table( horizon_top[ [ "horizon", "family", "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ] ] ), "", "Next-round directions:", *next_round_lines, ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default=BAR) parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() eth = _load_candles(ETH_SYMBOL, args.bar) btc = _load_candles(BTC_SYMBOL, args.bar) eth, btc = _align_pair(eth, btc) requested_bars = int(args.years * 365 * 24 * 60 / 15) eth = eth[-requested_bars:] btc = btc[-requested_bars:] strategies = build_strategies() summary_rows: list[dict[str, object]] = [] horizon_output_rows: list[dict[str, object]] = [] for index, strategy in enumerate(strategies, start=1): result = strategy.run(eth, btc) if strategy.pair else strategy.run(eth) if not result.equity_curve: print(f"skip {index}/{len(strategies)} {strategy.family} {strategy.name}") continue for cost_name, cost in COSTS: frame = cost_equity_frame(result, cost) metrics = equity_metrics(frame, eth[0].ts, eth[-1].ts) month, month_return = worst_month(frame) summary_rows.append( { "family": strategy.family, "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL if strategy.pair else "", "bar": args.bar, "name": strategy.name, "first_candle": _format_ts(eth[0].ts), "last_candle": _format_ts(eth[-1].ts), "years": (eth[-1].ts - eth[0].ts) / 86_400_000 / 365, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month": month, "worst_month_return": month_return, **metrics, } ) for row in horizon_rows(frame, eth[-1].ts, HORIZONS): horizon_output_rows.append( { "family": strategy.family, "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL if strategy.pair else "", "bar": args.bar, "name": strategy.name, "trades": result.trade_count, **row, } ) print(f"done {index}/{len(strategies)} {strategy.family} {strategy.name}") summary = pd.DataFrame(summary_rows).sort_values( ["cost", "net_calmar", "net_annualized_return", "net_total_return"], ascending=[True, False, False, False], ) primary = summary[summary["cost"] == PRIMARY_COST] summary = pd.concat([primary, summary[summary["cost"] != PRIMARY_COST]], ignore_index=True) horizon = pd.DataFrame(horizon_output_rows) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizon = horizon.sort_values(["cost", "horizon", "net_calmar", "net_annualized_return"], ascending=[True, True, False, False]) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / "eth-non-rsi-10y-summary.csv" horizon_path = args.output_dir / "eth-non-rsi-10y-horizon.csv" top10_path = args.output_dir / "eth-non-rsi-10y-top10.csv" report_path = args.output_dir / "eth-non-rsi-10y-report.md" output_files = [summary_path, horizon_path, top10_path, report_path] summary.to_csv(summary_path, index=False) horizon.to_csv(horizon_path, index=False) summary[summary["cost"] == PRIMARY_COST].head(10).to_csv(top10_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years}" report_path.write_text( write_report( summary=summary, horizon=horizon, output_files=output_files, command=command, first_ts=eth[0].ts, last_ts=eth[-1].ts, requested_years=args.years, ), encoding="utf-8", ) print(summary[summary["cost"] == PRIMARY_COST].head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())