from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore from scripts.search_eth_btc_nextgen_variants import format_cell, markdown_table OUTPUT_DIR = Path("reports/strategy-expansion") PREFIX = "trend-swing" SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") BARS = ("1H", "4H", "1D") YEARS = 10.0 LEVERAGE = 3 ROUNDTRIP_COST_ON_MARGIN = 0.0004 * 2 * LEVERAGE HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Params: symbol: str bar: str family: str fast: int slow: int entry: int exit: int atr: int stop_atr: float take_atr: float max_hold: int @property def name(self) -> str: return ( f"{self.symbol}-{self.bar}-{self.family}" f"-f{self.fast}-s{self.slow}-e{self.entry}-x{self.exit}" f"-a{self.atr}-sl{self.stop_atr}-tp{self.take_atr}-mh{self.max_hold}" ) def load_15m_frame(symbol: str, years: float) -> pd.DataFrame: path = explore.CANDLE_CACHE_DIR / symbol / "15m.csv" if not path.exists(): raise FileNotFoundError(f"missing local cache: {path}") frame = pd.read_csv(path) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") start = frame.index[-1] - pd.DateOffset(years=years) return frame[frame.index >= start] def resample_frame(frame: pd.DataFrame, bar: str) -> pd.DataFrame: rule = {"1H": "1h", "4H": "4h", "1D": "1D"}[bar] out = frame.resample(rule, label="left", closed="left").agg( open=("open", "first"), high=("high", "max"), low=("low", "min"), close=("close", "last"), volume=("volume", "sum"), ) return out.dropna() def frame_to_candles(symbol: str, frame: pd.DataFrame) -> list[Candle]: return [ Candle( symbol=symbol, ts=int(ts.timestamp() * 1000), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for ts, row in frame.iterrows() ] def true_range(highs: pd.Series, lows: pd.Series, closes: pd.Series) -> pd.Series: previous = closes.shift(1) return pd.concat([(highs - lows), (highs - previous).abs(), (lows - previous).abs()], axis=1).max(axis=1) def close_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], position: dict[str, object], candle: Candle, exit_price: float, ) -> tuple[float, bool]: exit_equity = trade_equity( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=LEVERAGE, ) pnl = exit_equity - float(position["margin_used"]) trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / float(position["margin_used"]) * 100, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return exit_equity, pnl > 0.0 def run_segment(candles: list[Candle], params: Params) -> SegmentResult: highs = pd.Series([c.high for c in candles], dtype=float) lows = pd.Series([c.low for c in candles], dtype=float) closes = pd.Series([c.close for c in candles], dtype=float) fast = closes.ewm(span=params.fast, adjust=False).mean() slow = closes.ewm(span=params.slow, adjust=False).mean() atr = true_range(highs, lows, closes).rolling(params.atr).mean() entry_high = highs.shift(1).rolling(params.entry).max() entry_low = lows.shift(1).rolling(params.entry).min() exit_high = highs.shift(1).rolling(params.exit).max() exit_low = lows.shift(1).rolling(params.exit).min() rsi = explore._compute_rsi(closes, 5) warmup = max(params.slow, params.entry, params.exit, params.atr, 8) equity = explore.INITIAL_EQUITY ending_equity = equity peak_equity = equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_side: str | None = None pending_exit = False for index in range(warmup, len(candles)): candle = candles[index] if pending_exit and position is not None: equity, won = close_trade(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open) wins += 1 if won else 0 position = None pending_exit = False if pending_side is not None and position is None and equity > 0.0: side = pending_side current_atr = float(atr.iloc[index - 1]) position = { "side": side, "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": equity, "stop_price": candle.open - params.stop_atr * current_atr if side == "long" else candle.open + params.stop_atr * current_atr, "take_price": candle.open + params.take_atr * current_atr if side == "long" else candle.open - params.take_atr * current_atr, } entries.append({"ts": candle.ts, "price": candle.open, "side": side}) pending_side = None current_equity = equity if position is not None: side = str(position["side"]) stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or ( side == "short" and candle.high >= float(position["stop_price"]) ) take_hit = (side == "long" and candle.high >= float(position["take_price"])) or ( side == "short" and candle.low <= float(position["take_price"]) ) if stop_hit or take_hit: exit_price = float(position["stop_price"] if stop_hit else position["take_price"]) equity, won = close_trade(trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price) wins += 1 if won else 0 current_equity = equity position = None if position is not None: current_equity = mark_to_market( side=str(position["side"]), margin_used=float(position["margin_used"]), entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=LEVERAGE, ) peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or equity <= 0.0: continue if position is not None: held = index - int(position["entry_index"]) side = str(position["side"]) if side == "long": pending_exit = candle.close < float(exit_low.iloc[index]) or fast.iloc[index] < slow.iloc[index] or held >= params.max_hold else: pending_exit = candle.close > float(exit_high.iloc[index]) or fast.iloc[index] > slow.iloc[index] or held >= params.max_hold continue if params.family == "donchian": if candle.close > float(entry_high.iloc[index]) and fast.iloc[index] > slow.iloc[index]: pending_side = "long" elif candle.close < float(entry_low.iloc[index]) and fast.iloc[index] < slow.iloc[index]: pending_side = "short" elif params.family == "ema_cross": prev_fast = fast.iloc[index - 1] prev_slow = slow.iloc[index - 1] if prev_fast <= prev_slow and fast.iloc[index] > slow.iloc[index]: pending_side = "long" elif prev_fast >= prev_slow and fast.iloc[index] < slow.iloc[index]: pending_side = "short" elif params.family == "trend_pullback": if fast.iloc[index] > slow.iloc[index] and candle.close <= fast.iloc[index] and rsi[index] <= 45: pending_side = "long" elif fast.iloc[index] < slow.iloc[index] and candle.close >= fast.iloc[index] and rsi[index] >= 55: pending_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup:], equity_curve=equity_curve, entries=entries, exits=exits, ) def cost_adjusted_frame(result: SegmentResult) -> pd.DataFrame: rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": explore.INITIAL_EQUITY}] equity = explore.INITIAL_EQUITY for trade in result.trades: equity *= 1.0 + float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity}) return pd.DataFrame(rows) def daily_equity(frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() index = pd.date_range(start.normalize(), end.normalize(), freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).fillna(explore.INITIAL_EQUITY) def metrics_from_daily(series: pd.Series) -> dict[str, float]: years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total = float(series.iloc[-1] / series.iloc[0] - 1.0) annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = explore.max_drawdown_from_equity([float(v) for v in series]) return { "total_return": total, "annualized_return": annual, "max_drawdown": drawdown, "calmar": annual / drawdown if drawdown else 0.0, } def trade_stats(result: SegmentResult) -> dict[str, float | int]: returns = [float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN for trade in result.trades] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0 return { "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, } def horizon_metrics(series: pd.Series) -> dict[str, float]: out: dict[str, float] = {} end = series.index[-1] for label, offset in HORIZONS[1:]: scoped = series[series.index >= end - offset] if len(scoped) < 2: scoped = series out[f"return_{label}"] = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) return out def monthly_rows(name: str, params: Params, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "name": name, "symbol": params.symbol, "bar": params.bar, "family": params.family, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def build_params() -> list[Params]: rows: list[Params] = [] for symbol in SYMBOLS: for bar in BARS: scale = {"1H": 1, "4H": 1, "1D": 1}[bar] for family in ("donchian", "ema_cross", "trend_pullback"): for fast, slow in ((20 * scale, 80 * scale), (30 * scale, 120 * scale), (50 * scale, 200 * scale)): for entry, exit_ in ((20, 10), (55, 20)): for stop_atr, take_atr in ((2.0, 4.0), (3.0, 6.0)): max_hold = {"1H": 240, "4H": 120, "1D": 60}[bar] rows.append( Params( symbol=symbol, bar=bar, family=family, fast=fast, slow=slow, entry=entry, exit=exit_, atr=14, stop_atr=stop_atr, take_atr=take_atr, max_hold=max_hold, ) ) return rows def markdown_report(command: str, paths: list[Path], totals: pd.DataFrame, monthly: pd.DataFrame) -> str: top = totals.head(10) best_names = set(top.head(3)["name"]) lines = [ "# Trend swing expansion", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: BTC-USDT-SWAP and ETH-USDT-SWAP perpetuals, resampled from local 15m cache to 1H/4H/1D.", f"Cost: 0.04% single-side taker fee, roundtrip cost on margin = {ROUNDTRIP_COST_ON_MARGIN:.4%} at {LEVERAGE}x.", "", "## Top candidates", "", markdown_table( top[ [ "name", "symbol", "bar", "family", "trades", "total_return", "annualized_return", "max_drawdown", "calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", ] ] ), "", "## Monthly returns for top 3", "", markdown_table(monthly[monthly["name"].isin(best_names)].tail(120)), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int, default=0) args = parser.parse_args() raw = {symbol: load_15m_frame(symbol, args.years) for symbol in SYMBOLS} candles = { (symbol, bar): frame_to_candles(symbol, resample_frame(raw[symbol], bar)) for symbol in SYMBOLS for bar in BARS } params_grid = build_params() if args.max_candidates: params_grid = params_grid[: args.max_candidates] total_rows: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] for index, params in enumerate(params_grid, start=1): result = run_segment(candles[(params.symbol, params.bar)], params) frame = cost_adjusted_frame(result) start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True) end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True) daily = daily_equity(frame, start, end) monthly = monthly_rows(params.name, params, daily) row = { "name": params.name, **params.__dict__, "first_candle": start.strftime("%Y-%m-%d %H:%M"), "last_candle": end.strftime("%Y-%m-%d %H:%M"), "fee_single_side": 0.0004, "roundtrip_cost_on_margin": ROUNDTRIP_COST_ON_MARGIN, "worst_month_return": float(monthly["return"].min()), **metrics_from_daily(daily), **trade_stats(result), **horizon_metrics(daily), } total_rows.append(row) monthly_frames.append(monthly) print(f"done {index}/{len(params_grid)} {params.name}", flush=True) totals = pd.DataFrame(total_rows).sort_values( ["calmar", "annualized_return", "max_drawdown", "trades"], ascending=[False, False, True, True], ) monthly_all = pd.concat(monthly_frames, ignore_index=True) top3 = totals.head(3) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / f"{PREFIX}-totals.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv" top_path = args.output_dir / f"{PREFIX}-top3.csv" best_path = args.output_dir / f"{PREFIX}-best.json" report_path = args.output_dir / f"{PREFIX}-report.md" totals.to_csv(totals_path, index=False) monthly_all.to_csv(monthly_path, index=False) top3.to_csv(top_path, index=False) best_path.write_text(json.dumps(top3.to_dict(orient="records"), indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" report_path.write_text( markdown_report(command, [totals_path, monthly_path, top_path, best_path, report_path], totals, monthly_all), encoding="utf-8", ) print(top3.to_string(index=False, formatters={col: format_cell for col in top3.columns})) return 0 if __name__ == "__main__": raise SystemExit(main())