from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market from scripts import explore_ultrashort as explore from scripts.search_eth_btc_nextgen_variants import ( HORIZONS, COSTS, format_cell, load_candles, markdown_table, metrics_from_daily_equity, monthly_rows, ) OUTPUT_DIR = Path("reports/eth-exploration") PREFIX = "eth-btc-low-turnover" YEARS = 10.0 PRIMARY_COST = "maker_taker" def daily_equity(frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() start_day = start.normalize() end_day = end.normalize() series = pd.concat([pd.Series([explore.INITIAL_EQUITY], index=[start_day]), series]).sort_index() index = pd.date_range(start_day, end_day, freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index) @dataclass(frozen=True) class Params: eth_trend_sma: int eth_rsi_threshold: float eth_exit_rsi: float btc_trend_sma: int btc_momentum_lookback: int btc_min_momentum: float min_signal_edge: float cooldown_bars: int btc_vol_lookback: int btc_max_realized_vol: float session: str max_trades_per_month: int adverse_lookback: int adverse_btc_return: float max_hold_bars: int @property def name(self) -> str: return ( f"lt-et{self.eth_trend_sma}-l{self.eth_rsi_threshold}-x{self.eth_exit_rsi}" f"-bt{self.btc_trend_sma}-bm{self.btc_momentum_lookback}-br{self.btc_min_momentum}" f"-edge{self.min_signal_edge}-cd{self.cooldown_bars}-vw{self.btc_vol_lookback}" f"-vc{self.btc_max_realized_vol}-s{self.session}-mt{self.max_trades_per_month}" f"-aw{self.adverse_lookback}-ar{self.adverse_btc_return}-mh{self.max_hold_bars}" ) def session_allowed(ts: int, session: str) -> bool: hour = pd.to_datetime(ts, unit="ms", utc=True).hour if session == "all": return True if session == "asia": return 0 <= hour < 8 if session == "europe": return 7 <= hour < 16 if session == "us": return 13 <= hour < 22 raise ValueError(f"unknown session {session}") def close_position( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, leverage: int, ) -> tuple[float, bool]: exit_equity = explore.trade_equity( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=leverage, ) pnl = exit_equity - float(position["margin_used"]) trades.append( { "side": "Long", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / float(position["margin_used"]) * 100, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": "long"}) return exit_equity, pnl > 0.0 def run_low_turnover_segment( *, eth_candles: list[Candle], btc_candles: list[Candle], leverage: int, params: Params, ) -> SegmentResult: eth_closes = pd.Series([candle.close for candle in eth_candles], dtype=float) btc_closes = pd.Series([candle.close for candle in btc_candles], dtype=float) eth_trend = eth_closes.rolling(params.eth_trend_sma).mean().tolist() eth_rsi = explore._compute_rsi(eth_closes, 2) btc_trend = btc_closes.rolling(params.btc_trend_sma).mean().tolist() btc_returns = btc_closes.pct_change() btc_realized_vol = btc_returns.rolling(params.btc_vol_lookback).std(ddof=1).tolist() btc_recent_min_return = btc_returns.rolling(params.adverse_lookback).min().tolist() warmup_bars = max( params.eth_trend_sma, params.btc_trend_sma, params.btc_momentum_lookback, params.btc_vol_lookback, params.adverse_lookback, 3, ) equity = explore.INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_entry = False pending_exit = False last_exit_index = -10**9 month_counts: dict[str, int] = {} for index in range(warmup_bars, len(eth_candles)): candle = eth_candles[index] if pending_exit and position is not None: equity, won = close_position( trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open, leverage=leverage, ) wins += 1 if won else 0 position = None pending_exit = False last_exit_index = index if pending_entry and position is None and equity > 0.0: entry_month = pd.to_datetime(candle.ts, unit="ms", utc=True).strftime("%Y-%m") month_counts[entry_month] = month_counts.get(entry_month, 0) + 1 position = { "side": "long", "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, } entries.append({"ts": candle.ts, "price": candle.open, "side": "long"}) pending_entry = False current_equity = equity if position is not None: current_equity = mark_to_market( side="long", margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=leverage, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(eth_candles) - 1 or equity <= 0.0: continue current_eth_trend = eth_trend[index] current_eth_rsi = eth_rsi[index] current_btc_trend = btc_trend[index] current_btc_vol = btc_realized_vol[index] current_btc_recent_min_return = btc_recent_min_return[index] if ( current_eth_trend != current_eth_trend or current_eth_rsi != current_eth_rsi or current_btc_trend != current_btc_trend or current_btc_vol != current_btc_vol or current_btc_recent_min_return != current_btc_recent_min_return ): continue btc_momentum = btc_candles[index].close / btc_candles[index - params.btc_momentum_lookback].close - 1.0 if position is not None: held_bars = index - int(position["entry_index"]) if current_eth_rsi >= params.eth_exit_rsi or btc_candles[index].close < float(current_btc_trend) or held_bars >= params.max_hold_bars: pending_exit = True continue entry_month = pd.to_datetime(candle.ts, unit="ms", utc=True).strftime("%Y-%m") signal_edge = (btc_momentum - params.btc_min_momentum) + ((params.eth_rsi_threshold - current_eth_rsi) / 100.0) btc_risk_on = btc_candles[index].close > float(current_btc_trend) and btc_momentum >= params.btc_min_momentum eth_pullback = candle.close > float(current_eth_trend) and current_eth_rsi <= params.eth_rsi_threshold if ( btc_risk_on and eth_pullback and signal_edge >= params.min_signal_edge and index - last_exit_index >= params.cooldown_bars and float(current_btc_vol) <= params.btc_max_realized_vol and session_allowed(candle.ts, params.session) and month_counts.get(entry_month, 0) < params.max_trades_per_month and float(current_btc_recent_min_return) > -params.adverse_btc_return ): pending_entry = True trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=eth_candles[warmup_bars:], equity_curve=equity_curve, entries=entries, exits=exits, ) def build_params() -> list[Params]: return [ Params( eth_trend_sma=eth_trend, eth_rsi_threshold=eth_rsi, eth_exit_rsi=exit_rsi, btc_trend_sma=btc_trend, btc_momentum_lookback=btc_momentum, btc_min_momentum=btc_min, min_signal_edge=edge, cooldown_bars=cooldown, btc_vol_lookback=vol_lookback, btc_max_realized_vol=vol_cap, session=session, max_trades_per_month=max_trades, adverse_lookback=adverse_lookback, adverse_btc_return=adverse_return, max_hold_bars=max_hold, ) for eth_trend in (50,) for eth_rsi in (2.0, 3.0) for exit_rsi in (55.0,) for btc_trend in (480, 720) for btc_momentum in (240, 480) for btc_min in (0.01, 0.02) for edge in (0.0, 0.01) for cooldown in (32,) for vol_lookback in (240,) for vol_cap in (0.006, 0.008) for session in ("all", "us") for max_trades in (4,) for adverse_lookback in (4,) for adverse_return in (0.015,) for max_hold in (96,) ] def trade_stats_for_window(result: SegmentResult, cost: float, start: pd.Timestamp, end: pd.Timestamp) -> dict[str, float | int]: returns: list[float] = [] for trade in result.trades: exit_time = pd.to_datetime(str(trade["exit_time"]), utc=True) if start <= exit_time <= end: returns.append(float(trade["return_pct"]) / 100.0 - cost * float(trade.get("cost_weight", 1.0))) wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss_abs = abs(sum(losses) / len(losses)) if losses else 0.0 gross_profit = sum(wins) gross_loss_abs = abs(sum(losses)) return { "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "payoff_ratio": avg_win / avg_loss_abs if avg_loss_abs else 0.0, "profit_factor": gross_profit / gross_loss_abs if gross_loss_abs else 0.0, } def horizon_rows(name: str, result: SegmentResult, cost: float, series: pd.Series) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] end_time = series.index[-1] for label, offset in HORIZONS: horizon = series if offset is None else series[series.index >= end_time - offset] if len(horizon) < 2: horizon = series stats = trade_stats_for_window(result, cost, horizon.index[0], horizon.index[-1]) rows.append( { "name": name, "horizon": label, "horizon_start": horizon.index[0].strftime("%Y-%m-%d"), "horizon_end": horizon.index[-1].strftime("%Y-%m-%d"), **metrics_from_daily_equity(horizon), **stats, } ) return rows def valid_all_horizons(horizon: pd.DataFrame, cost_model: str) -> set[str]: scoped = horizon[horizon["cost_model"] == cost_model] pivot = scoped.pivot_table(index="name", columns="horizon", values="net_total_return", aggfunc="min") required = pivot.reindex(columns=["full", "3y", "1y", "6m", "3m"]) return set(required[(required > 0.0).all(axis=1)].index) def markdown_report( *, command: str, paths: list[Path], totals: pd.DataFrame, horizon: pd.DataFrame, monthly_summary: pd.DataFrame, worst_months: pd.DataFrame, qualified_names: set[str], ) -> str: taker_positive = totals[(totals["cost_model"] == "taker_taker") & (totals["name"].isin(qualified_names))] top = taker_positive.head(10) if len(taker_positive) else totals[totals["cost_model"] == PRIMARY_COST].head(10) top_name = str(top.iloc[0]["name"]) if len(top) else "" top_horizon = horizon[(horizon["name"] == top_name) & (horizon["cost_model"].isin(("maker_taker", "taker_taker")))] lines = [ "# ETH BTC low-turnover nextgen exploration", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: ETH-only long entries from BTC trend/momentum regime plus extreme ETH RSI2 pullback.", "Entry constraints tested: longer BTC trend, lower ETH RSI threshold, signal edge floor, cooldown, BTC volatility cap, session filter, monthly trade cap, and rejection after adverse BTC candles.", "Costs: maker_taker=0.0021 and taker_taker=0.0030 roundtrip on margin at 3x.", "", f"Taker-taker candidates positive across full/3y/1y/6m/3m: {len(taker_positive)}.", "", "## Top candidates", "", markdown_table( top[ [ "name", "cost_model", "trades", "trades_per_month", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "worst_month_return", ] ] ), "", "## Horizon metrics for selected candidate", "", markdown_table( top_horizon[ [ "cost_model", "horizon", "horizon_start", "horizon_end", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "trades", ] ] ), "", "## Monthly summary", "", markdown_table(monthly_summary[monthly_summary["name"].isin(set(top.head(5)["name"]))].head(20)), "", "## Worst months", "", markdown_table(worst_months[worst_months["name"].isin(set(top.head(5)["name"]))].head(20)), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int, default=0) args = parser.parse_args() eth, btc = explore.align_pair_candles( load_candles("ETH-USDT-SWAP", "15m", args.years), load_candles("BTC-USDT-SWAP", "15m", args.years), ) params_grid = build_params() if args.max_candidates: params_grid = params_grid[: args.max_candidates] start = pd.to_datetime(eth[max(param.btc_trend_sma for param in params_grid)].ts, unit="ms", utc=True) end = pd.to_datetime(eth[-1].ts, unit="ms", utc=True) total_rows: list[dict[str, object]] = [] horizon_output: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] for index, params in enumerate(params_grid, start=1): result = run_low_turnover_segment(eth_candles=eth, btc_candles=btc, leverage=explore.LEVERAGE, params=params) for cost_model, cost_value in COSTS.items(): frame = explore.cost_adjusted_trade_equity_frame(result, cost_value) daily = daily_equity(frame, start, end) metrics = metrics_from_daily_equity(daily) monthly = monthly_rows(params.name, daily) stats = trade_stats_for_window(result, cost_value, daily.index[0], daily.index[-1]) total_rows.append( { "name": params.name, "cost_model": cost_model, "roundtrip_cost_on_margin": cost_value, "first_candle": start.strftime("%Y-%m-%d %H:%M"), "last_candle": end.strftime("%Y-%m-%d %H:%M"), "years": (end - start).total_seconds() / 86_400 / 365, "trades_per_month": stats["trades"] / max((end - start).days / 30.4375, 1.0), "gross_total_return": result.total_return, "gross_max_drawdown_mark_to_market": result.max_drawdown, "worst_month_return": float(monthly["return"].min()), **params.__dict__, **stats, **metrics, } ) for row in horizon_rows(params.name, result, cost_value, daily): horizon_output.append({"cost_model": cost_model, **row}) monthly_frames.append(monthly.assign(cost_model=cost_model)) print(f"done {index}/{len(params_grid)} {params.name}", flush=True) totals = pd.DataFrame(total_rows).sort_values( ["cost_model", "net_calmar", "net_annualized_return", "trades"], ascending=[True, False, False, True], ) horizon = pd.DataFrame(horizon_output) horizon["horizon"] = pd.Categorical(horizon["horizon"], categories=["full", "3y", "1y", "6m", "3m"], ordered=True) horizon = horizon.sort_values(["cost_model", "name", "horizon"]) monthly = pd.concat(monthly_frames, ignore_index=True) monthly_summary = ( monthly.groupby(["cost_model", "name"], as_index=False) .agg( months=("return", "count"), positive_month_rate=("return", lambda values: float((values > 0.0).mean())), avg_month_return=("return", "mean"), median_month_return=("return", "median"), worst_month_return=("return", "min"), best_month_return=("return", "max"), ) .sort_values(["cost_model", "worst_month_return", "positive_month_rate"], ascending=[True, False, False]) ) worst_months = monthly.sort_values("return").head(250) taker_positive_names = valid_all_horizons(horizon, "taker_taker") qualified = totals[(totals["cost_model"] == "taker_taker") & (totals["name"].isin(taker_positive_names))].copy() qualified = qualified.sort_values(["net_calmar", "net_annualized_return", "trades"], ascending=[False, False, True]) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / f"{PREFIX}-totals.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly-summary.csv" worst_path = args.output_dir / f"{PREFIX}-worst-months.csv" qualified_path = args.output_dir / f"{PREFIX}-taker-positive.csv" report_path = args.output_dir / f"{PREFIX}-report.md" best_path = args.output_dir / f"{PREFIX}-best.json" totals.to_csv(totals_path, index=False) horizon.to_csv(horizon_path, index=False) monthly_summary.to_csv(monthly_path, index=False) worst_months.to_csv(worst_path, index=False) qualified.to_csv(qualified_path, index=False) best_payload = qualified.head(1).to_dict(orient="records")[0] if len(qualified) else totals.head(1).to_dict(orient="records")[0] best_path.write_text(json.dumps(best_payload, indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" paths = [totals_path, horizon_path, monthly_path, worst_path, qualified_path, best_path, report_path] report_path.write_text( markdown_report( command=command, paths=paths, totals=totals, horizon=horizon, monthly_summary=monthly_summary, worst_months=worst_months, qualified_names=taker_positive_names, ), encoding="utf-8", ) print(qualified.head(10).to_string(index=False) if len(qualified) else totals.head(10).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())