| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from okx_codex_trader.models import Candle
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity
- from scripts import explore_ultrashort as explore
- from scripts.search_eth_btc_nextgen_variants import format_cell, markdown_table
- OUTPUT_DIR = Path("reports/strategy-expansion")
- PREFIX = "trend-swing"
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- BARS = ("1H", "4H", "1D")
- YEARS = 10.0
- LEVERAGE = 3
- ROUNDTRIP_COST_ON_MARGIN = 0.0004 * 2 * LEVERAGE
- HORIZONS = (
- ("full", None),
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Params:
- symbol: str
- bar: str
- family: str
- fast: int
- slow: int
- entry: int
- exit: int
- atr: int
- stop_atr: float
- take_atr: float
- max_hold: int
- @property
- def name(self) -> str:
- return (
- f"{self.symbol}-{self.bar}-{self.family}"
- f"-f{self.fast}-s{self.slow}-e{self.entry}-x{self.exit}"
- f"-a{self.atr}-sl{self.stop_atr}-tp{self.take_atr}-mh{self.max_hold}"
- )
- def load_15m_frame(symbol: str, years: float) -> pd.DataFrame:
- path = explore.CANDLE_CACHE_DIR / symbol / "15m.csv"
- if not path.exists():
- raise FileNotFoundError(f"missing local cache: {path}")
- frame = pd.read_csv(path)
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts")
- start = frame.index[-1] - pd.DateOffset(years=years)
- return frame[frame.index >= start]
- def resample_frame(frame: pd.DataFrame, bar: str) -> pd.DataFrame:
- rule = {"1H": "1h", "4H": "4h", "1D": "1D"}[bar]
- out = frame.resample(rule, label="left", closed="left").agg(
- open=("open", "first"),
- high=("high", "max"),
- low=("low", "min"),
- close=("close", "last"),
- volume=("volume", "sum"),
- )
- return out.dropna()
- def frame_to_candles(symbol: str, frame: pd.DataFrame) -> list[Candle]:
- return [
- Candle(
- symbol=symbol,
- ts=int(ts.timestamp() * 1000),
- open=float(row.open),
- high=float(row.high),
- low=float(row.low),
- close=float(row.close),
- volume=float(row.volume),
- )
- for ts, row in frame.iterrows()
- ]
- def true_range(highs: pd.Series, lows: pd.Series, closes: pd.Series) -> pd.Series:
- previous = closes.shift(1)
- return pd.concat([(highs - lows), (highs - previous).abs(), (lows - previous).abs()], axis=1).max(axis=1)
- def close_trade(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- candle: Candle,
- exit_price: float,
- ) -> tuple[float, bool]:
- exit_equity = trade_equity(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- exit_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - float(position["margin_used"])
- trades.append(
- {
- "side": "Long" if position["side"] == "long" else "Short",
- "entry_time": explore._format_ts(int(position["entry_time"])),
- "exit_time": explore._format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(pnl, 4),
- "return_pct": round(pnl / float(position["margin_used"]) * 100, 4),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]})
- return exit_equity, pnl > 0.0
- def run_segment(candles: list[Candle], params: Params) -> SegmentResult:
- highs = pd.Series([c.high for c in candles], dtype=float)
- lows = pd.Series([c.low for c in candles], dtype=float)
- closes = pd.Series([c.close for c in candles], dtype=float)
- fast = closes.ewm(span=params.fast, adjust=False).mean()
- slow = closes.ewm(span=params.slow, adjust=False).mean()
- atr = true_range(highs, lows, closes).rolling(params.atr).mean()
- entry_high = highs.shift(1).rolling(params.entry).max()
- entry_low = lows.shift(1).rolling(params.entry).min()
- exit_high = highs.shift(1).rolling(params.exit).max()
- exit_low = lows.shift(1).rolling(params.exit).min()
- rsi = explore._compute_rsi(closes, 5)
- warmup = max(params.slow, params.entry, params.exit, params.atr, 8)
- equity = explore.INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_side: str | None = None
- pending_exit = False
- for index in range(warmup, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- equity, won = close_trade(trades=trades, exits=exits, position=position, candle=candle, exit_price=candle.open)
- wins += 1 if won else 0
- position = None
- pending_exit = False
- if pending_side is not None and position is None and equity > 0.0:
- side = pending_side
- current_atr = float(atr.iloc[index - 1])
- position = {
- "side": side,
- "entry_time": candle.ts,
- "entry_price": candle.open,
- "entry_index": index,
- "margin_used": equity,
- "stop_price": candle.open - params.stop_atr * current_atr if side == "long" else candle.open + params.stop_atr * current_atr,
- "take_price": candle.open + params.take_atr * current_atr if side == "long" else candle.open - params.take_atr * current_atr,
- }
- entries.append({"ts": candle.ts, "price": candle.open, "side": side})
- pending_side = None
- current_equity = equity
- if position is not None:
- side = str(position["side"])
- stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or (
- side == "short" and candle.high >= float(position["stop_price"])
- )
- take_hit = (side == "long" and candle.high >= float(position["take_price"])) or (
- side == "short" and candle.low <= float(position["take_price"])
- )
- if stop_hit or take_hit:
- exit_price = float(position["stop_price"] if stop_hit else position["take_price"])
- equity, won = close_trade(trades=trades, exits=exits, position=position, candle=candle, exit_price=exit_price)
- wins += 1 if won else 0
- current_equity = equity
- position = None
- if position is not None:
- current_equity = mark_to_market(
- side=str(position["side"]),
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1 or equity <= 0.0:
- continue
- if position is not None:
- held = index - int(position["entry_index"])
- side = str(position["side"])
- if side == "long":
- pending_exit = candle.close < float(exit_low.iloc[index]) or fast.iloc[index] < slow.iloc[index] or held >= params.max_hold
- else:
- pending_exit = candle.close > float(exit_high.iloc[index]) or fast.iloc[index] > slow.iloc[index] or held >= params.max_hold
- continue
- if params.family == "donchian":
- if candle.close > float(entry_high.iloc[index]) and fast.iloc[index] > slow.iloc[index]:
- pending_side = "long"
- elif candle.close < float(entry_low.iloc[index]) and fast.iloc[index] < slow.iloc[index]:
- pending_side = "short"
- elif params.family == "ema_cross":
- prev_fast = fast.iloc[index - 1]
- prev_slow = slow.iloc[index - 1]
- if prev_fast <= prev_slow and fast.iloc[index] > slow.iloc[index]:
- pending_side = "long"
- elif prev_fast >= prev_slow and fast.iloc[index] < slow.iloc[index]:
- pending_side = "short"
- elif params.family == "trend_pullback":
- if fast.iloc[index] > slow.iloc[index] and candle.close <= fast.iloc[index] and rsi[index] <= 45:
- pending_side = "long"
- elif fast.iloc[index] < slow.iloc[index] and candle.close >= fast.iloc[index] and rsi[index] >= 55:
- pending_side = "short"
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY,
- win_rate=wins / trade_count if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=candles[warmup:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def cost_adjusted_frame(result: SegmentResult) -> pd.DataFrame:
- rows = [{"ts": pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True), "equity": explore.INITIAL_EQUITY}]
- equity = explore.INITIAL_EQUITY
- for trade in result.trades:
- equity *= 1.0 + float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN
- rows.append({"ts": pd.to_datetime(str(trade["exit_time"]), utc=True), "equity": equity})
- return pd.DataFrame(rows)
- def daily_equity(frame: pd.DataFrame, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series:
- series = frame.set_index("ts")["equity"].sort_index()
- index = pd.date_range(start.normalize(), end.normalize(), freq="1D", tz="UTC")
- return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).fillna(explore.INITIAL_EQUITY)
- def metrics_from_daily(series: pd.Series) -> dict[str, float]:
- years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365
- total = float(series.iloc[-1] / series.iloc[0] - 1.0)
- annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0
- drawdown = explore.max_drawdown_from_equity([float(v) for v in series])
- return {
- "total_return": total,
- "annualized_return": annual,
- "max_drawdown": drawdown,
- "calmar": annual / drawdown if drawdown else 0.0,
- }
- def trade_stats(result: SegmentResult) -> dict[str, float | int]:
- returns = [float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN for trade in result.trades]
- wins = [value for value in returns if value > 0.0]
- losses = [value for value in returns if value < 0.0]
- avg_win = sum(wins) / len(wins) if wins else 0.0
- avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0
- return {
- "trades": len(returns),
- "win_rate": len(wins) / len(returns) if returns else 0.0,
- "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0,
- }
- def horizon_metrics(series: pd.Series) -> dict[str, float]:
- out: dict[str, float] = {}
- end = series.index[-1]
- for label, offset in HORIZONS[1:]:
- scoped = series[series.index >= end - offset]
- if len(scoped) < 2:
- scoped = series
- out[f"return_{label}"] = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0)
- return out
- def monthly_rows(name: str, params: Params, series: pd.Series) -> pd.DataFrame:
- monthly = series.resample("ME").last()
- frame = pd.DataFrame(
- {
- "name": name,
- "symbol": params.symbol,
- "bar": params.bar,
- "family": params.family,
- "month": monthly.index.strftime("%Y-%m"),
- "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(),
- "end_equity": monthly.to_numpy(),
- }
- )
- frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0
- return frame
- def build_params() -> list[Params]:
- rows: list[Params] = []
- for symbol in SYMBOLS:
- for bar in BARS:
- scale = {"1H": 1, "4H": 1, "1D": 1}[bar]
- for family in ("donchian", "ema_cross", "trend_pullback"):
- for fast, slow in ((20 * scale, 80 * scale), (30 * scale, 120 * scale), (50 * scale, 200 * scale)):
- for entry, exit_ in ((20, 10), (55, 20)):
- for stop_atr, take_atr in ((2.0, 4.0), (3.0, 6.0)):
- max_hold = {"1H": 240, "4H": 120, "1D": 60}[bar]
- rows.append(
- Params(
- symbol=symbol,
- bar=bar,
- family=family,
- fast=fast,
- slow=slow,
- entry=entry,
- exit=exit_,
- atr=14,
- stop_atr=stop_atr,
- take_atr=take_atr,
- max_hold=max_hold,
- )
- )
- return rows
- def markdown_report(command: str, paths: list[Path], totals: pd.DataFrame, monthly: pd.DataFrame) -> str:
- top = totals.head(10)
- best_names = set(top.head(3)["name"])
- lines = [
- "# Trend swing expansion",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Scope: BTC-USDT-SWAP and ETH-USDT-SWAP perpetuals, resampled from local 15m cache to 1H/4H/1D.",
- f"Cost: 0.04% single-side taker fee, roundtrip cost on margin = {ROUNDTRIP_COST_ON_MARGIN:.4%} at {LEVERAGE}x.",
- "",
- "## Top candidates",
- "",
- markdown_table(
- top[
- [
- "name",
- "symbol",
- "bar",
- "family",
- "trades",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "win_rate",
- "payoff_ratio",
- "return_3y",
- "return_1y",
- "return_6m",
- "return_3m",
- ]
- ]
- ),
- "",
- "## Monthly returns for top 3",
- "",
- markdown_table(monthly[monthly["name"].isin(best_names)].tail(120)),
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--max-candidates", type=int, default=0)
- args = parser.parse_args()
- raw = {symbol: load_15m_frame(symbol, args.years) for symbol in SYMBOLS}
- candles = {
- (symbol, bar): frame_to_candles(symbol, resample_frame(raw[symbol], bar))
- for symbol in SYMBOLS
- for bar in BARS
- }
- params_grid = build_params()
- if args.max_candidates:
- params_grid = params_grid[: args.max_candidates]
- total_rows: list[dict[str, object]] = []
- monthly_frames: list[pd.DataFrame] = []
- for index, params in enumerate(params_grid, start=1):
- result = run_segment(candles[(params.symbol, params.bar)], params)
- frame = cost_adjusted_frame(result)
- start = pd.to_datetime(result.equity_curve[0]["ts"], unit="ms", utc=True)
- end = pd.to_datetime(result.equity_curve[-1]["ts"], unit="ms", utc=True)
- daily = daily_equity(frame, start, end)
- monthly = monthly_rows(params.name, params, daily)
- row = {
- "name": params.name,
- **params.__dict__,
- "first_candle": start.strftime("%Y-%m-%d %H:%M"),
- "last_candle": end.strftime("%Y-%m-%d %H:%M"),
- "fee_single_side": 0.0004,
- "roundtrip_cost_on_margin": ROUNDTRIP_COST_ON_MARGIN,
- "worst_month_return": float(monthly["return"].min()),
- **metrics_from_daily(daily),
- **trade_stats(result),
- **horizon_metrics(daily),
- }
- total_rows.append(row)
- monthly_frames.append(monthly)
- print(f"done {index}/{len(params_grid)} {params.name}", flush=True)
- totals = pd.DataFrame(total_rows).sort_values(
- ["calmar", "annualized_return", "max_drawdown", "trades"],
- ascending=[False, False, True, True],
- )
- monthly_all = pd.concat(monthly_frames, ignore_index=True)
- top3 = totals.head(3)
- args.output_dir.mkdir(parents=True, exist_ok=True)
- totals_path = args.output_dir / f"{PREFIX}-totals.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv"
- top_path = args.output_dir / f"{PREFIX}-top3.csv"
- best_path = args.output_dir / f"{PREFIX}-best.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- totals.to_csv(totals_path, index=False)
- monthly_all.to_csv(monthly_path, index=False)
- top3.to_csv(top_path, index=False)
- best_path.write_text(json.dumps(top3.to_dict(orient="records"), indent=2), encoding="utf-8")
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}"
- report_path.write_text(
- markdown_report(command, [totals_path, monthly_path, top_path, best_path, report_path], totals, monthly_all),
- encoding="utf-8",
- )
- print(top3.to_string(index=False, formatters={col: format_cell for col in top3.columns}))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|