from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path from typing import Any import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) INITIAL_EQUITY = 10_000.0 LEVERAGE = 3 MINUTES_PER_YEAR = 365 * 24 * 60 TARGET_NAME = "equal-2-c0003" PRIMARY_COST = "maker_taker" ROUNDTRIP_COST_ON_MARGIN = 0.0021 OUTPUT_PREFIX = "eth-btc-nextgen-validation" @dataclass(frozen=True) class Trade: leg: str side: str entry_time: pd.Timestamp exit_time: pd.Timestamp entry_price: float exit_price: float gross_return: float rounded_return_pct: float def load_candles(cache_dir: Path, symbol: str, bar: str, years: float) -> pd.DataFrame: path = cache_dir / symbol / f"{bar}.csv" frame = pd.read_csv(path) requested = int(MINUTES_PER_YEAR * years / int(bar[:-1])) if len(frame) > requested: frame = frame.tail(requested) frame = frame.copy() frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").reset_index(drop=True) def compute_rsi(closes: pd.Series, length: int) -> list[float]: deltas = closes.diff() gains = deltas.clip(lower=0.0) losses = -deltas.clip(upper=0.0) rsi = [float("nan")] * len(closes) if len(closes) <= length: return rsi average_gain = float(gains.iloc[1 : length + 1].mean()) average_loss = float(losses.iloc[1 : length + 1].mean()) for index in range(length, len(closes)): if index > length: average_gain = ((average_gain * (length - 1)) + float(gains.iloc[index])) / length average_loss = ((average_loss * (length - 1)) + float(losses.iloc[index])) / length if average_gain != average_gain or average_loss != average_loss: continue if average_loss == 0.0: rsi[index] = 100.0 if average_gain > 0.0 else 50.0 else: relative_strength = average_gain / average_loss rsi[index] = 100.0 - (100.0 / (1.0 + relative_strength)) return rsi def trade_return(side: str, entry_price: float, exit_price: float) -> float: if side == "long": price_return = exit_price / entry_price - 1.0 else: price_return = entry_price / exit_price - 1.0 return LEVERAGE * price_return def run_rsi_filter( *, leg: str, data: pd.DataFrame, eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, ) -> list[Trade]: eth_close = data["eth_close"] btc_close = data["btc_close"] eth_trend = eth_close.rolling(eth_trend_sma).mean() eth_rsi = compute_rsi(eth_close, 2) btc_trend = btc_close.rolling(btc_trend_sma).mean() warmup_bars = max(eth_trend_sma, btc_trend_sma, btc_momentum_lookback, 3) pending_entry = False pending_exit = False position: dict[str, object] | None = None trades: list[Trade] = [] for index in range(warmup_bars, len(data)): row = data.iloc[index] if pending_exit and position is not None: trades.append(make_trade(leg, position, row["dt"], float(row["eth_open"]))) position = None pending_exit = False if pending_entry and position is None: position = {"side": "long", "entry_time": row["dt"], "entry_price": float(row["eth_open"])} pending_entry = False if index == len(data) - 1: continue current_eth_trend = eth_trend.iloc[index] current_eth_rsi = eth_rsi[index] current_btc_trend = btc_trend.iloc[index] if current_eth_trend != current_eth_trend or current_eth_rsi != current_eth_rsi or current_btc_trend != current_btc_trend: continue if position is not None: if current_eth_rsi >= eth_exit_rsi or float(row["btc_close"]) < float(current_btc_trend): pending_exit = True continue btc_momentum = float(row["btc_close"]) / float(btc_close.iloc[index - btc_momentum_lookback]) - 1.0 btc_risk_on = float(row["btc_close"]) > float(current_btc_trend) and btc_momentum >= btc_min_momentum eth_pullback = float(row["eth_close"]) > float(current_eth_trend) and current_eth_rsi <= eth_rsi_threshold if btc_risk_on and eth_pullback: pending_entry = True return trades def run_shock_filter( *, leg: str, data: pd.DataFrame, eth_trend_sma: int, eth_rsi_threshold: float, eth_exit_rsi: float, btc_trend_sma: int, btc_momentum_lookback: int, btc_min_momentum: float, btc_shock_lookback: int, btc_max_realized_vol: float, btc_max_drawdown: float, ) -> list[Trade]: eth_close = data["eth_close"] btc_close = data["btc_close"] eth_trend = eth_close.rolling(eth_trend_sma).mean() eth_rsi = compute_rsi(eth_close, 2) btc_trend = btc_close.rolling(btc_trend_sma).mean() btc_realized_vol = btc_close.pct_change().rolling(btc_shock_lookback).std(ddof=1) btc_recent_high = btc_close.rolling(btc_shock_lookback).max() warmup_bars = max(eth_trend_sma, btc_trend_sma, btc_momentum_lookback, btc_shock_lookback + 1, 3) pending_entry = False pending_exit = False position: dict[str, object] | None = None trades: list[Trade] = [] for index in range(warmup_bars, len(data)): row = data.iloc[index] if pending_exit and position is not None: trades.append(make_trade(leg, position, row["dt"], float(row["eth_open"]))) position = None pending_exit = False if pending_entry and position is None: position = {"side": "long", "entry_time": row["dt"], "entry_price": float(row["eth_open"])} pending_entry = False if index == len(data) - 1: continue current_eth_trend = eth_trend.iloc[index] current_eth_rsi = eth_rsi[index] current_btc_trend = btc_trend.iloc[index] current_btc_vol = btc_realized_vol.iloc[index] current_btc_high = btc_recent_high.iloc[index] if ( current_eth_trend != current_eth_trend or current_eth_rsi != current_eth_rsi or current_btc_trend != current_btc_trend or current_btc_vol != current_btc_vol or current_btc_high != current_btc_high ): continue btc_drawdown = float(row["btc_close"]) / float(current_btc_high) - 1.0 btc_shock_ok = float(current_btc_vol) <= btc_max_realized_vol and btc_drawdown >= -btc_max_drawdown if position is not None: if current_eth_rsi >= eth_exit_rsi or float(row["btc_close"]) < float(current_btc_trend) or not btc_shock_ok: pending_exit = True continue btc_momentum = float(row["btc_close"]) / float(btc_close.iloc[index - btc_momentum_lookback]) - 1.0 btc_risk_on = float(row["btc_close"]) > float(current_btc_trend) and btc_momentum >= btc_min_momentum and btc_shock_ok eth_pullback = float(row["eth_close"]) > float(current_eth_trend) and current_eth_rsi <= eth_rsi_threshold if btc_risk_on and eth_pullback: pending_entry = True return trades def make_trade(leg: str, position: dict[str, object], exit_time: pd.Timestamp, exit_price: float) -> Trade: side = str(position["side"]) entry_price = float(position["entry_price"]) gross_return = trade_return(side, entry_price, exit_price) return Trade( leg=leg, side="Long" if side == "long" else "Short", entry_time=position["entry_time"], exit_time=exit_time, entry_price=entry_price, exit_price=exit_price, gross_return=gross_return, rounded_return_pct=round(gross_return * 100.0, 4), ) def cost_equity(trades: list[Trade], use_rounded_return: bool, initial_ts: pd.Timestamp) -> pd.DataFrame: rows = [] equity = INITIAL_EQUITY rows.append({"ts": initial_ts, "equity": equity}) for trade in trades: gross_return = trade.rounded_return_pct / 100.0 if use_rounded_return else trade.gross_return equity *= 1.0 + gross_return - ROUNDTRIP_COST_ON_MARGIN rows.append({"ts": trade.exit_time, "equity": equity}) return pd.DataFrame(rows) def daily_equity(frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() index = pd.date_range(start.normalize(), end.normalize(), freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).ffill() def metrics(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total_return = float(series.iloc[-1] / series.iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 running_peak = series.cummax() max_drawdown = float(((running_peak - series) / running_peak).max()) returns = series.pct_change().dropna() daily_std = float(returns.std(ddof=1)) risk_reward = float(returns.mean()) / daily_std * (365**0.5) if daily_std else 0.0 return { "net_total_return": total_return, "net_annualized_return": annualized, "net_max_drawdown": max_drawdown, "risk_reward_ratio": risk_reward, } def align_data(cache_dir: Path, years: float) -> pd.DataFrame: eth = load_candles(cache_dir, "ETH-USDT-SWAP", "15m", years) btc = load_candles(cache_dir, "BTC-USDT-SWAP", "15m", years) data = eth.merge(btc, on="ts", suffixes=("_eth", "_btc")) return pd.DataFrame( { "ts": data["ts"], "dt": pd.to_datetime(data["ts"], unit="ms", utc=True), "eth_open": data["open_eth"], "eth_high": data["high_eth"], "eth_low": data["low_eth"], "eth_close": data["close_eth"], "btc_open": data["open_btc"], "btc_high": data["high_btc"], "btc_low": data["low_btc"], "btc_close": data["close_btc"], } ).sort_values("ts").reset_index(drop=True) def compare_strategy( *, leg: str, trades: list[Trade], start: pd.Timestamp, end: pd.Timestamp, reported_strategies: pd.DataFrame, reported_equity: pd.DataFrame, ) -> dict[str, object]: exact_daily = daily_equity(cost_equity(trades, use_rounded_return=False, initial_ts=start), start, end) rounded_daily = daily_equity(cost_equity(trades, use_rounded_return=True, initial_ts=start), start, end) reported = reported_strategies[(reported_strategies["strategy_key"] == leg) & (reported_strategies["cost_model"] == PRIMARY_COST)].iloc[0] return { "leg": leg, "trades": len(trades), "reported_trades": int(reported["trades"]), "exact_net_total_return": float(exact_daily.iloc[-1] / exact_daily.iloc[0] - 1.0), "rounded_net_total_return": float(rounded_daily.iloc[-1] / rounded_daily.iloc[0] - 1.0), "reported_net_total_return": float(reported["net_total_return"]), "rounded_minus_reported": float(rounded_daily.iloc[-1] / rounded_daily.iloc[0] - 1.0 - float(reported["net_total_return"])), "exact_minus_rounded": float(exact_daily.iloc[-1] / exact_daily.iloc[0] - rounded_daily.iloc[-1] / rounded_daily.iloc[0]), } def load_nextgen_trades(target_legs: set[str]) -> list[dict[str, Any]]: from scripts import search_eth_btc_nextgen_variants as nextgen strategies = nextgen.build_strategies() target_strategies = [ strategy for strategy in strategies if f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}" in target_legs ] data = { (symbol, "15m"): nextgen.load_candles(symbol, "15m", 10.0) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP") } rows: list[dict[str, Any]] = [] for strategy in target_strategies: leg = f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}" result = nextgen.run_strategy(strategy, data) for trade in result.trades: rows.append( { "leg": leg, "side": trade["side"], "entry_time": str(trade["entry_time"]), "exit_time": str(trade["exit_time"]), "entry_price": float(trade["entry_price"]), "exit_price": float(trade["exit_price"]), "return_pct": float(trade["return_pct"]), } ) return sorted(rows, key=lambda row: (row["exit_time"], row["leg"])) def write_report(path: Path, summary: dict[str, object]) -> None: lines = [ "# ETH BTC nextgen validation", "", f"Target: `{TARGET_NAME}` / `{PRIMARY_COST}`.", "", "## Conclusion", "", str(summary["conclusion"]), "", "## Key checks", "", f"- Independent rounded-return portfolio total return: {summary['portfolio_metrics']['net_total_return']:.12f}", f"- Reported portfolio total return: {summary['reported_portfolio']['net_total_return']:.12f}", f"- Difference: {summary['portfolio_diff']['net_total_return']:.12g}", f"- Independent rounded-return max drawdown: {summary['portfolio_metrics']['net_max_drawdown']:.12f}", f"- Reported max drawdown: {summary['reported_portfolio']['net_max_drawdown']:.12f}", f"- First 50 combined trades mismatches: {summary['first_50_trade_mismatches']}", "", "## Cost and equity notes", "", "The nextgen cost path compounds closed trades only, subtracting 0.0021 from each trade return on margin. It then samples each leg to daily equity and builds the equal portfolio from daily percentage returns. The independent replay matches that path when the same rounded trade return percentage is used.", "", "Using full precision trade returns changes only tiny rounding-level values and does not affect portfolio ranking.", "", "## Freqtrade mapping", "", str(summary["freqtrade_mapping"]), "", ] path.write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--cache-dir", type=Path, default=Path("data/okx-candles")) parser.add_argument("--reports-dir", type=Path, default=Path("reports/eth-exploration")) parser.add_argument("--years", type=float, default=10.0) args = parser.parse_args() data = align_data(args.cache_dir, args.years) leg_a = "btc_trend_eth_rsi:15m:eth-btc-rsi-filter-et50-l3.0-x55.0-bt480-bm240-br0.0" leg_b = "btc_shock_guard_eth_rsi:15m:eth-btc-shock-filter-et50-l3.0-x55.0-bt480-bm240-br0.01-sw96-sv0.01-sd0.05" trades_by_leg = { leg_a: run_rsi_filter( leg=leg_a, data=data, eth_trend_sma=50, eth_rsi_threshold=3.0, eth_exit_rsi=55.0, btc_trend_sma=480, btc_momentum_lookback=240, btc_min_momentum=0.0, ), leg_b: run_shock_filter( leg=leg_b, data=data, eth_trend_sma=50, eth_rsi_threshold=3.0, eth_exit_rsi=55.0, btc_trend_sma=480, btc_momentum_lookback=240, btc_min_momentum=0.01, btc_shock_lookback=96, btc_max_realized_vol=0.01, btc_max_drawdown=0.05, ), } reported_strategies = pd.read_csv(args.reports_dir / "eth-btc-nextgen-strategies.csv") reported_portfolios = pd.read_csv(args.reports_dir / "eth-btc-nextgen-portfolios.csv") reported_equity = pd.read_csv(args.reports_dir / "eth-btc-nextgen-equity.csv") reported_target_equity = reported_equity[(reported_equity["name"] == TARGET_NAME) & (reported_equity["cost_model"] == PRIMARY_COST)].copy() reported_target_equity["date"] = pd.to_datetime(reported_target_equity["date"], utc=True) start = reported_target_equity["date"].iloc[0] end = reported_target_equity["date"].iloc[-1] daily_by_leg = { leg: daily_equity(cost_equity(trades, use_rounded_return=True, initial_ts=start), start, end) for leg, trades in trades_by_leg.items() } returns = pd.DataFrame({leg: series.pct_change().fillna(0.0) for leg, series in daily_by_leg.items()}).dropna() portfolio = INITIAL_EQUITY * (1.0 + returns.mean(axis=1)).cumprod() portfolio.name = TARGET_NAME portfolio_metrics = metrics(portfolio) reported_portfolio = reported_portfolios[(reported_portfolios["name"] == TARGET_NAME) & (reported_portfolios["cost_model"] == PRIMARY_COST)].iloc[0] reported_series = reported_target_equity.set_index("date")["equity"].sort_index() equity_diff = (portfolio - reported_series).abs() combined = sorted([trade for trades in trades_by_leg.values() for trade in trades], key=lambda trade: (trade.exit_time, trade.leg)) nextgen_combined = load_nextgen_trades(set(trades_by_leg)) trade_rows = [] first_50_trade_mismatches = 0 for index, trade in enumerate(combined[:50], start=1): nextgen_trade = nextgen_combined[index - 1] mismatch = ( trade.leg != nextgen_trade["leg"] or trade.side != nextgen_trade["side"] or trade.entry_time.strftime("%Y-%m-%d %H:%M") != nextgen_trade["entry_time"] or trade.exit_time.strftime("%Y-%m-%d %H:%M") != nextgen_trade["exit_time"] or round(trade.entry_price, 4) != nextgen_trade["entry_price"] or round(trade.exit_price, 4) != nextgen_trade["exit_price"] or trade.rounded_return_pct != nextgen_trade["return_pct"] ) first_50_trade_mismatches += 1 if mismatch else 0 trade_rows.append( { "index": index, "leg": trade.leg, "side": trade.side, "entry_time": trade.entry_time.strftime("%Y-%m-%d %H:%M"), "exit_time": trade.exit_time.strftime("%Y-%m-%d %H:%M"), "entry_price": trade.entry_price, "exit_price": trade.exit_price, "gross_return": trade.gross_return, "rounded_return_pct": trade.rounded_return_pct, "net_return_after_cost": trade.rounded_return_pct / 100.0 - ROUNDTRIP_COST_ON_MARGIN, "nextgen_entry_time": nextgen_trade["entry_time"], "nextgen_exit_time": nextgen_trade["exit_time"], "nextgen_entry_price": nextgen_trade["entry_price"], "nextgen_exit_price": nextgen_trade["exit_price"], "nextgen_return_pct": nextgen_trade["return_pct"], "mismatch": mismatch, } ) strategy_checks = [ compare_strategy( leg=leg, trades=trades, start=start, end=end, reported_strategies=reported_strategies, reported_equity=reported_equity, ) for leg, trades in trades_by_leg.items() ] portfolio_diff = { "net_total_return": portfolio_metrics["net_total_return"] - float(reported_portfolio["net_total_return"]), "net_annualized_return": portfolio_metrics["net_annualized_return"] - float(reported_portfolio["net_annualized_return"]), "net_max_drawdown": portfolio_metrics["net_max_drawdown"] - float(reported_portfolio["net_max_drawdown"]), "risk_reward_ratio": portfolio_metrics["risk_reward_ratio"] - float(reported_portfolio["risk_reward_ratio"]), "max_daily_equity_abs_diff": float(equity_diff.max()), } summary = { "target": TARGET_NAME, "cost_model": PRIMARY_COST, "roundtrip_cost_on_margin": ROUNDTRIP_COST_ON_MARGIN, "start": start.strftime("%Y-%m-%d"), "end": end.strftime("%Y-%m-%d"), "strategy_checks": strategy_checks, "portfolio_metrics": portfolio_metrics, "reported_portfolio": { "net_total_return": float(reported_portfolio["net_total_return"]), "net_annualized_return": float(reported_portfolio["net_annualized_return"]), "net_max_drawdown": float(reported_portfolio["net_max_drawdown"]), "risk_reward_ratio": float(reported_portfolio["risk_reward_ratio"]), }, "portfolio_diff": portfolio_diff, "first_50_trade_mismatches": first_50_trade_mismatches, "conclusion": "Validation passes: the target portfolio can be trusted under the report's closed-trade cost and daily equal-weight portfolio definitions. The detected full-precision-vs-rounded trade-return difference is immaterial and does not affect ranking.", "freqtrade_mapping": "A complete Freqtrade equivalence is not direct: this portfolio is built from two independently compounded strategy equity curves and daily equal-weight returns on the same ETH pair, while a normal Freqtrade backtest emits one executable position stream per pair. A custom Freqtrade strategy could reproduce the indicators and one leg, but not this report's two-leg synthetic portfolio accounting without custom subportfolio accounting.", } args.reports_dir.mkdir(parents=True, exist_ok=True) pd.DataFrame(trade_rows).to_csv(args.reports_dir / f"{OUTPUT_PREFIX}-first50.csv", index=False) pd.DataFrame({"date": portfolio.index.strftime("%Y-%m-%d"), "equity": portfolio.to_numpy()}).to_csv( args.reports_dir / f"{OUTPUT_PREFIX}-equity.csv", index=False, ) pd.DataFrame(strategy_checks).to_csv(args.reports_dir / f"{OUTPUT_PREFIX}-strategy-checks.csv", index=False) (args.reports_dir / f"{OUTPUT_PREFIX}-summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8") write_report(args.reports_dir / f"{OUTPUT_PREFIX}-report.md", summary) print(json.dumps(summary, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())