from __future__ import annotations import argparse import sys from dataclasses import dataclass from itertools import product from pathlib import Path from typing import Iterable import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts import explore_ultrashort as explore ETH_SYMBOL = "ETH-USDT-SWAP" BTC_SYMBOL = "BTC-USDT-SWAP" PRIMARY_COST = "maker_taker" BTC_TREND_SMA = 480 BTC_MOMENTUM_LOOKBACK = 240 BTC_MIN_MOMENTUM = 0.0 MAX_HOLD_BARS = 48 COST_SCENARIOS = ( ("maker_maker", 0.0012), ("maker_taker", 0.0021), ("taker_taker", 0.0030), ) HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) ENTRY_OFFSET_SETS = ( (0.002, 0.005, 0.008), (0.003, 0.006, 0.009), (0.004, 0.007, 0.010), ) @dataclass(frozen=True) class Strategy: family: str candidate: explore.Candidate | explore.PairCandidate pair: bool spec: dict[str, object] def close_partial_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], account_equity: float, candle: explore.Candle, exit_price: float, leverage: int, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = explore.trade_equity( side="long", margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) pnl = exit_equity - margin_used trades.append( { "side": "Long", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100, 4), "cost_weight": round(margin_used / account_equity, 8), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return account_equity + pnl, pnl > 0.0 def run_eth_btc_price_twap_filter_segment( *, eth_candles: list[explore.Candle], btc_candles: list[explore.Candle], leverage: int, warmup_bars: int, eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, entry_offsets: tuple[float, ...], entry_valid_bars: int, fill_buffer: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, ) -> explore.SegmentResult: eth_closes = pd.Series([candle.close for candle in eth_candles], dtype=float) btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) eth_trend = eth_closes.rolling(eth_trend_sma).mean().tolist() eth_rsi = explore._compute_rsi(eth_closes, 2) btc_trend = btc_closes.rolling(btc_trend_sma).mean().tolist() equity = explore.INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_limits: list[dict[str, float | int]] = [] pending_exit = False for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = close_partial_trade( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False pending_limits = [] active_limits: list[dict[str, float | int]] = [] for limit in pending_limits: if index > int(limit["expires_index"]): continue limit_price = float(limit["price"]) if candle.low <= limit_price * (1.0 - fill_buffer) and equity > 0.0: slice_margin = equity / len(entry_offsets) if position is None: position = { "side": "long", "entry_time": candle.ts, "entry_price": limit_price, "entry_index": index, "margin_used": slice_margin, "stop_price": limit_price * (1.0 - stop_loss_pct), } else: old_margin = float(position["margin_used"]) new_margin = old_margin + slice_margin entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin position["entry_price"] = entry_price position["margin_used"] = new_margin position["stop_price"] = entry_price * (1.0 - stop_loss_pct) entries.append({"ts": candle.ts, "price": limit_price, "side": "long"}) else: active_limits.append(limit) pending_limits = active_limits current_equity = equity if position is not None and candle.low <= float(position["stop_price"]): equity, won = close_partial_trade( trades=trades, exits=exits, position=position, account_equity=equity, candle=candle, exit_price=float(position["stop_price"]), leverage=leverage, ) wins += 1 if won else 0 current_equity = equity position = None pending_limits = [] if position is not None: position_equity = explore.mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) current_equity = equity - float(position["margin_used"]) + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue current_eth_trend = eth_trend[index] current_eth_rsi = eth_rsi[index] current_btc_trend = btc_trend[index] if current_eth_trend != current_eth_trend or current_eth_rsi != current_eth_rsi or current_btc_trend != current_btc_trend: continue if position is not None: held_bars = index - int(position["entry_index"]) if current_eth_rsi >= eth_exit_rsi or held_bars >= max_hold_bars: pending_exit = True pending_limits = [] continue btc_momentum = btc_candles[index].close / btc_candles[index - btc_momentum_lookback].close - 1.0 btc_risk_on = btc_candles[index].close > float(current_btc_trend) and btc_momentum >= btc_min_momentum eth_pullback = candle.close > float(current_eth_trend) and current_eth_rsi <= eth_rsi_threshold if not pending_limits and btc_risk_on and eth_pullback: pending_limits = [ { "price": candle.close * (1.0 - offset), "expires_index": index + entry_valid_bars, } for offset in entry_offsets ] trade_count = len(trades) return explore.SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def build_eth_btc_price_twap_filter_candidate( *, eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, stop_loss_pct: float, entry_offsets: tuple[float, ...], entry_valid_bars: int, fill_buffer: float, ) -> explore.PairCandidate: offset_label = "-".join(f"{offset:.4f}" for offset in entry_offsets) buffer_label = f"-fb{fill_buffer:.4f}" if fill_buffer else "" return explore.PairCandidate( f"eth-btc-price-twap-o{offset_label}-v{entry_valid_bars}{buffer_label}-et{eth_trend_sma}-l{eth_rsi_threshold}-x{eth_exit_rsi}-sl{stop_loss_pct}-mh{MAX_HOLD_BARS}-bt{BTC_TREND_SMA}-bm{BTC_MOMENTUM_LOOKBACK}-br{BTC_MIN_MOMENTUM}", max(eth_trend_sma, BTC_TREND_SMA, BTC_MOMENTUM_LOOKBACK, 3), lambda eth_candles, btc_candles, leverage, warmup_bars, eth_trend_sma=eth_trend_sma, eth_rsi_threshold=eth_rsi_threshold, eth_exit_rsi=eth_exit_rsi, stop_loss_pct=stop_loss_pct, entry_offsets=entry_offsets, entry_valid_bars=entry_valid_bars, fill_buffer=fill_buffer: run_eth_btc_price_twap_filter_segment( eth_candles=eth_candles, btc_candles=btc_candles, leverage=leverage, warmup_bars=warmup_bars, eth_trend_sma=eth_trend_sma, eth_rsi_threshold=eth_rsi_threshold, eth_exit_rsi=eth_exit_rsi, stop_loss_pct=stop_loss_pct, max_hold_bars=MAX_HOLD_BARS, entry_offsets=entry_offsets, entry_valid_bars=entry_valid_bars, fill_buffer=fill_buffer, btc_trend_sma=BTC_TREND_SMA, btc_momentum_lookback=BTC_MOMENTUM_LOOKBACK, btc_min_momentum=BTC_MIN_MOMENTUM, ), ) def candidate_specs() -> Iterable[dict[str, object]]: for trend, rsi, exit_rsi, stop, offsets, valid, fill in product( (50, 80, 160), (3.0, 5.0), (50.0, 55.0), (0.008, 0.012), ENTRY_OFFSET_SETS, (1, 2, 3), (0.0, 0.0002), ): yield { "eth_trend_sma": trend, "eth_rsi_threshold": rsi, "eth_exit_rsi": exit_rsi, "stop_loss_pct": stop, "entry_offsets": offsets, "entry_valid_bars": valid, "fill_buffer": fill, "btc_trend_sma": BTC_TREND_SMA, "btc_momentum_lookback": BTC_MOMENTUM_LOOKBACK, "btc_min_momentum": BTC_MIN_MOMENTUM, "max_hold_bars": MAX_HOLD_BARS, } def build_strategies(max_candidates: int | None) -> list[Strategy]: specs = list(candidate_specs()) if max_candidates is not None: specs = specs[:max_candidates] strategies = [ Strategy( "regime_price_twap", build_eth_btc_price_twap_filter_candidate( eth_trend_sma=int(spec["eth_trend_sma"]), eth_rsi_threshold=float(spec["eth_rsi_threshold"]), eth_exit_rsi=float(spec["eth_exit_rsi"]), stop_loss_pct=float(spec["stop_loss_pct"]), entry_offsets=tuple(float(value) for value in spec["entry_offsets"]), entry_valid_bars=int(spec["entry_valid_bars"]), fill_buffer=float(spec["fill_buffer"]), ), True, spec, ) for spec in specs ] baselines = [ Strategy( "baseline_price_twap_mid", explore.build_rsi2_long_guarded_price_twap_candidate(50, 3.0, 55.0, 0.008, MAX_HOLD_BARS, (0.001, 0.003, 0.005), 2), False, { "eth_trend_sma": 50, "eth_rsi_threshold": 3.0, "eth_exit_rsi": 55.0, "stop_loss_pct": 0.008, "entry_offsets": "0.0010-0.0030-0.0050", "entry_valid_bars": 2, "fill_buffer": 0.0, "btc_trend_sma": "", "btc_momentum_lookback": "", "btc_min_momentum": "", "max_hold_bars": MAX_HOLD_BARS, }, ), Strategy( "baseline_price_twap_deep", explore.build_rsi2_long_guarded_price_twap_candidate(50, 3.0, 55.0, 0.008, MAX_HOLD_BARS, (0.002, 0.005, 0.008), 3), False, { "eth_trend_sma": 50, "eth_rsi_threshold": 3.0, "eth_exit_rsi": 55.0, "stop_loss_pct": 0.008, "entry_offsets": "0.0020-0.0050-0.0080", "entry_valid_bars": 3, "fill_buffer": 0.0, "btc_trend_sma": "", "btc_momentum_lookback": "", "btc_min_momentum": "", "max_hold_bars": MAX_HOLD_BARS, }, ), Strategy( "baseline_eth_btc_rsi_filter", explore.build_eth_btc_rsi_filter_candidate(50, 3.0, 55.0, BTC_TREND_SMA, BTC_MOMENTUM_LOOKBACK, BTC_MIN_MOMENTUM), True, { "eth_trend_sma": 50, "eth_rsi_threshold": 3.0, "eth_exit_rsi": 55.0, "stop_loss_pct": "", "entry_offsets": "", "entry_valid_bars": "", "fill_buffer": "", "btc_trend_sma": BTC_TREND_SMA, "btc_momentum_lookback": BTC_MOMENTUM_LOOKBACK, "btc_min_momentum": BTC_MIN_MOMENTUM, "max_hold_bars": "", }, ), ] return strategies + baselines def window_rows(strategy: Strategy, eth: list[explore.Candle], btc: list[explore.Candle], window_size: int) -> list[dict[str, object]]: if strategy.pair: return explore.evaluate_pair_candidate_window_rows( candidate=strategy.candidate, eth_candles=eth, btc_candles=btc, window_size=window_size, leverage=explore.LEVERAGE, ) return explore.evaluate_candidate_window_rows( candidate=strategy.candidate, candles=eth, window_size=window_size, leverage=explore.LEVERAGE, ) def full_result(strategy: Strategy, eth: list[explore.Candle], btc: list[explore.Candle]) -> explore.SegmentResult: if strategy.pair: return strategy.candidate.run( eth_candles=eth, btc_candles=btc, leverage=explore.LEVERAGE, warmup_bars=strategy.candidate.warmup_bars, ) return strategy.candidate.run( candles=eth, leverage=explore.LEVERAGE, warmup_bars=strategy.candidate.warmup_bars, ) def append_cost_rows( *, strategy: Strategy, bar: str, eth: list[explore.Candle], rows: list[dict[str, object]], result: explore.SegmentResult, summary_rows: list[dict[str, object]], total_rows: list[dict[str, object]], horizon_rows: list[dict[str, object]], ) -> None: spec = strategy.spec.copy() if isinstance(spec.get("entry_offsets"), tuple): spec["entry_offsets"] = "-".join(f"{value:.4f}" for value in tuple(spec["entry_offsets"])) for cost_name, cost_value in COST_SCENARIOS: summary = explore.add_cost_metrics( pd.DataFrame([explore.summarize_window_rows(rows, strategy.candidate.name)]), cost_value, ).iloc[0].to_dict() summary_rows.append( { "family": strategy.family, "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL if strategy.pair else "", "bar": bar, "actual_bars": len(eth), "first_candle": explore._format_ts(eth[0].ts), "last_candle": explore._format_ts(eth[-1].ts), **spec, **summary, } ) net_equity = explore.cost_adjusted_trade_equity_frame(result, cost_value) metrics = explore.annualized_metrics_from_equity(net_equity, eth[0].ts, eth[-1].ts) years_actual = (eth[-1].ts - eth[0].ts) / 86_400_000 / 365 gross_annualized = (1.0 + result.total_return) ** (1.0 / years_actual) - 1.0 if result.total_return > -1.0 else 0.0 total_rows.append( { "family": strategy.family, "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL if strategy.pair else "", "bar": bar, "name": strategy.candidate.name, "first_candle": explore._format_ts(eth[0].ts), "last_candle": explore._format_ts(eth[-1].ts), "years": years_actual, "trades": result.trade_count, "gross_total_return": result.total_return, "gross_annualized_return": gross_annualized, "gross_max_drawdown_mark_to_market": result.max_drawdown, **spec, **metrics, } ) horizon = explore.recent_horizon_metrics_from_equity(net_equity, eth[-1].ts, HORIZONS) for horizon_row in horizon.to_dict("records"): horizon_rows.append( { "family": strategy.family, "cost": cost_name, "symbol": ETH_SYMBOL, "signal_symbol": BTC_SYMBOL if strategy.pair else "", "bar": bar, "name": strategy.candidate.name, "trades": result.trade_count, **spec, **horizon_row, } ) def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_markdown_cell(value) for value in row) + " |" for row in rows) def format_markdown_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_report( *, summary: pd.DataFrame, total: pd.DataFrame, horizon: pd.DataFrame, output_files: list[Path], command: str, ) -> str: primary_summary = summary[summary["cost"] == PRIMARY_COST].copy() primary_total = total[total["cost"] == PRIMARY_COST].copy() top = primary_summary[primary_summary["family"] == "regime_price_twap"].head(10) baseline_summary = primary_summary[primary_summary["family"] != "regime_price_twap"].sort_values( ["net_ci95_low", "net_avg_return"], ascending=False ) baseline_total = primary_total[primary_total["family"] != "regime_price_twap"].sort_values( ["net_calmar", "net_annualized_return"], ascending=False ) best = top.iloc[0] if len(top) else pd.Series(dtype=object) baseline_best_summary = baseline_summary.iloc[0] if len(baseline_summary) else pd.Series(dtype=object) baseline_best_total = baseline_total.iloc[0] if len(baseline_total) else pd.Series(dtype=object) total_top = primary_total[primary_total["family"] == "regime_price_twap"].sort_values( ["net_calmar", "net_annualized_return"], ascending=False ).head(10) top_names = set(top["name"]) horizon_top = horizon[ (horizon["cost"] == PRIMARY_COST) & (horizon["name"].isin(top_names)) ].sort_values(["name", "horizon"]) horizon_leaders = ( horizon[horizon["cost"] == PRIMARY_COST] .sort_values(["horizon", "net_annualized_return"], ascending=[True, False]) .groupby("horizon", observed=True) .head(3) ) better_window = bool(len(top) and len(baseline_summary) and float(best["net_ci95_low"]) > float(baseline_best_summary["net_ci95_low"])) better_total = bool(len(total_top) and len(baseline_total) and float(total_top.iloc[0]["net_calmar"]) > float(baseline_best_total["net_calmar"])) lines = [ "# ETH regime price-TWAP variants", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in output_files], "", "Primary sort: maker_taker cost, by net_ci95_low then net_avg_return.", "", "Tested entry rule: ETH price-TWAP entry requires BTC close above SMA480 and BTC momentum240 >= 0. Exit remains ETH RSI/hold/stop based.", "", "Top 10 maker_taker candidates:", markdown_table(top[ [ "family", "name", "net_avg_return", "net_ci95_low", "positive_window_rate", "trades", "avg_trades_per_window", "max_drawdown", ] ]), "", "Top 10 by full-period maker_taker net Calmar:", markdown_table(total_top[ [ "family", "name", "trades", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ] ]), "", "Baselines:", markdown_table(baseline_summary[ [ "family", "name", "net_avg_return", "net_ci95_low", "positive_window_rate", "trades", "avg_trades_per_window", "max_drawdown", ] ]), "", "Recent horizons for top 10:", markdown_table(horizon_top[ [ "name", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ] ]), "", "Recent horizon leaders:", markdown_table(horizon_leaders[ [ "horizon", "family", "name", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", ] ]), "", "Comparison:", f"- Window robustness better than the best standalone baseline: {better_window}.", f"- Full-period Calmar better than the best standalone baseline: {better_total}.", ] if len(top) and len(baseline_summary): lines.append( f"- Best regime price-TWAP net_ci95_low={float(best['net_ci95_low']):.6g}; best baseline net_ci95_low={float(baseline_best_summary['net_ci95_low']):.6g} ({baseline_best_summary['family']})." ) if len(total_top) and len(baseline_total): lines.append( f"- Best regime price-TWAP net_calmar={float(total_top.iloc[0]['net_calmar']):.6g}; best baseline net_calmar={float(baseline_best_total['net_calmar']):.6g} ({baseline_best_total['family']})." ) return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default="15m") parser.add_argument("--years", type=float, default=3.25) parser.add_argument("--window-size", type=int, default=explore.WINDOW_SIZE) parser.add_argument("--max-candidates", type=int, default=None) parser.add_argument("--output-dir", type=Path, default=Path("reports/eth-exploration")) args = parser.parse_args() requested_bars = explore.history_bars_for_years(args.bar, args.years) client = explore.OkxClient() eth = explore.get_candles_cached(client, ETH_SYMBOL, args.bar, requested_bars) btc = explore.get_candles_cached(client, BTC_SYMBOL, args.bar, requested_bars) eth, btc = explore.align_pair_candles(eth, btc) strategies = build_strategies(args.max_candidates) summary_rows: list[dict[str, object]] = [] total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] for index, strategy in enumerate(strategies, start=1): rows = window_rows(strategy, eth, btc, args.window_size) result = full_result(strategy, eth, btc) append_cost_rows( strategy=strategy, bar=args.bar, eth=eth, rows=rows, result=result, summary_rows=summary_rows, total_rows=total_rows, horizon_rows=horizon_rows, ) print(f"done {index}/{len(strategies)} {strategy.family} {strategy.candidate.name}") summary = pd.DataFrame(summary_rows).sort_values( ["cost", "net_ci95_low", "net_avg_return"], ascending=[True, False, False], ) primary = summary[summary["cost"] == PRIMARY_COST] others = summary[summary["cost"] != PRIMARY_COST] summary = pd.concat([primary, others], ignore_index=True) total = pd.DataFrame(total_rows).sort_values( ["cost", "net_calmar", "net_annualized_return"], ascending=[True, False, False], ) horizon = pd.DataFrame(horizon_rows) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["cost", "horizon", "net_annualized_return"], ascending=[True, True, False]) args.output_dir.mkdir(parents=True, exist_ok=True) summary_path = args.output_dir / "eth-regime-price-twap-summary.csv" total_path = args.output_dir / "eth-regime-price-twap-total.csv" horizon_path = args.output_dir / "eth-regime-price-twap-horizon.csv" top10_path = args.output_dir / "eth-regime-price-twap-top10.csv" report_path = args.output_dir / "eth-regime-price-twap-report.md" output_files = [summary_path, total_path, horizon_path, top10_path, report_path] summary.to_csv(summary_path, index=False) total.to_csv(total_path, index=False) horizon.to_csv(horizon_path, index=False) summary[(summary["cost"] == PRIMARY_COST) & (summary["family"] == "regime_price_twap")].head(10).to_csv(top10_path, index=False) command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --bar {args.bar} --years {args.years} --window-size {args.window_size}" report_path.write_text( markdown_report( summary=summary, total=total, horizon=horizon, output_files=output_files, command=command, ), encoding="utf-8", ) print(summary[(summary["cost"] == PRIMARY_COST) & (summary["family"] == "regime_price_twap")].head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())