| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- from __future__ import annotations
- from dataclasses import dataclass
- from math import sqrt
- from pathlib import Path
- import pandas as pd
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- BARS = ("3m", "5m", "15m")
- DATA_DIR = Path("data/okx-candles")
- OUT_DIR = Path("reports/short-bias")
- INITIAL_EQUITY = 10_000.0
- LEVERAGE = 3.0
- TAKER_FEE = 0.0004
- MIN_TRADES = 30
- BACKTEST_MONTHS = 36
- @dataclass(frozen=True)
- class Candidate:
- family: str
- name: str
- warmup: int
- params: dict[str, float | int]
- @dataclass(frozen=True)
- class BacktestResult:
- equity: pd.DataFrame
- trades: pd.DataFrame
- def load_frame(symbol: str, bar: str) -> pd.DataFrame:
- frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv")
- frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").reset_index(drop=True)
- cutoff = frame["dt"].iloc[-1] - pd.DateOffset(months=BACKTEST_MONTHS)
- return frame[frame["dt"] >= cutoff].reset_index(drop=True)
- def rsi(series: pd.Series, length: int) -> pd.Series:
- delta = series.diff()
- gain = delta.clip(lower=0.0).rolling(length).mean()
- loss = (-delta.clip(upper=0.0)).rolling(length).mean()
- rs = gain / loss
- return 100.0 - (100.0 / (1.0 + rs))
- def build_candidates() -> list[Candidate]:
- candidates: list[Candidate] = []
- for trend in (80, 160):
- for entry in (90, 94):
- for exit_rsi in (35,):
- for max_hold in (12, 32):
- params = {"trend": trend, "entry": entry, "exit": exit_rsi, "max_hold": max_hold}
- candidates.append(Candidate("rsi2_short", f"rsi2-short-t{trend}-e{entry}-x{exit_rsi}-h{max_hold}", trend + 3, params))
- for lookback in (16, 32, 64):
- for stop, take in ((0.005, 0.010),):
- for max_hold in (12, 32):
- params = {"lookback": lookback, "stop": stop, "take": take, "max_hold": max_hold}
- candidates.append(Candidate("breakdown_short", f"breakdown-short-l{lookback}-sl{stop}-tp{take}-h{max_hold}", lookback + 1, params))
- for window in (48, 96):
- for entry_z in (1.8, 2.2):
- for exit_z in (0.0,):
- for max_hold in (12, 32):
- params = {"window": window, "entry_z": entry_z, "exit_z": exit_z, "max_hold": max_hold}
- candidates.append(Candidate("vwap_revert_short", f"vwap-short-w{window}-e{entry_z}-x{exit_z}-h{max_hold}", window * 2, params))
- for length in (40, 80):
- for std in (2.0,):
- for trend in (160,):
- for max_hold in (12, 32):
- params = {"length": length, "std": std, "trend": trend, "max_hold": max_hold}
- candidates.append(Candidate("bb_short", f"bb-short-l{length}-s{std}-t{trend}-h{max_hold}", max(length, trend) + 1, params))
- return candidates
- def exit_equity(start_equity: float, entry: float, exit_price: float) -> tuple[float, float]:
- entry_notional = start_equity * LEVERAGE
- exit_notional = entry_notional * (exit_price / entry)
- pnl = start_equity * LEVERAGE * ((entry - exit_price) / entry)
- fees = TAKER_FEE * (entry_notional + exit_notional)
- return start_equity + pnl - fees, pnl - fees
- def mark_equity(start_equity: float, entry: float, mark: float) -> float:
- entry_fee = TAKER_FEE * start_equity * LEVERAGE
- pnl = start_equity * LEVERAGE * ((entry - mark) / entry)
- return start_equity + pnl - entry_fee
- def signal_columns(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.Series, pd.Series]:
- close = frame["close"]
- false = pd.Series(False, index=frame.index)
- if candidate.family == "rsi2_short":
- trend = close.rolling(int(candidate.params["trend"])).mean()
- value = rsi(close, 2)
- entry = (close < trend) & (value >= float(candidate.params["entry"]))
- exit_signal = value <= float(candidate.params["exit"])
- return entry, exit_signal, false
- if candidate.family == "breakdown_short":
- lookback = int(candidate.params["lookback"])
- low = frame["low"].shift(1).rolling(lookback).min()
- entry = close < low
- return entry, false, false
- if candidate.family == "vwap_revert_short":
- window = int(candidate.params["window"])
- volume = frame["volume"]
- vwap = (close * volume).rolling(window).sum() / volume.rolling(window).sum()
- deviation = (close - vwap) / vwap
- zscore = deviation / deviation.rolling(window).std(ddof=0)
- entry = zscore >= float(candidate.params["entry_z"])
- exit_signal = zscore <= float(candidate.params["exit_z"])
- return entry, exit_signal, false
- length = int(candidate.params["length"])
- middle = close.rolling(length).mean()
- upper = middle + float(candidate.params["std"]) * close.rolling(length).std(ddof=0)
- trend = close.rolling(int(candidate.params["trend"])).mean()
- entry = (close > upper) & (close < trend)
- exit_signal = close <= middle
- return entry, exit_signal, false
- def backtest(frame: pd.DataFrame, candidate: Candidate) -> BacktestResult:
- entry_signal, exit_signal, _ = signal_columns(frame, candidate)
- ts_values = frame["ts"].to_numpy()
- dt_values = frame["dt"].to_numpy()
- open_values = frame["open"].to_numpy(dtype=float)
- high_values = frame["high"].to_numpy(dtype=float)
- low_values = frame["low"].to_numpy(dtype=float)
- close_values = frame["close"].to_numpy(dtype=float)
- entry_values = entry_signal.fillna(False).to_numpy(dtype=bool)
- exit_values = exit_signal.fillna(False).to_numpy(dtype=bool)
- equity = INITIAL_EQUITY
- peak = equity
- equity_rows: list[dict[str, object]] = []
- trade_rows: list[dict[str, object]] = []
- position: dict[str, float | int] | None = None
- pending_entry = False
- pending_exit = False
- for index in range(candidate.warmup, len(frame)):
- if pending_exit and position is not None:
- new_equity, pnl = exit_equity(equity, float(position["entry"]), float(open_values[index]))
- trade_rows.append(
- {
- "entry_time": dt_values[int(position["entry_index"])],
- "exit_time": dt_values[index],
- "side": "short",
- "entry": float(position["entry"]),
- "exit": float(open_values[index]),
- "pnl": pnl,
- "return": new_equity / equity - 1.0,
- "bars": index - int(position["entry_index"]),
- }
- )
- equity = new_equity
- position = None
- pending_exit = False
- if pending_entry and position is None and equity > 0.0:
- position = {"entry": float(open_values[index]), "entry_index": index}
- pending_entry = False
- current_equity = equity
- if position is not None:
- entry = float(position["entry"])
- stop = float(candidate.params.get("stop", 0.0))
- take = float(candidate.params.get("take", 0.0))
- exit_price: float | None = None
- if stop and float(high_values[index]) >= entry * (1.0 + stop):
- exit_price = entry * (1.0 + stop)
- elif take and float(low_values[index]) <= entry * (1.0 - take):
- exit_price = entry * (1.0 - take)
- if exit_price is not None:
- new_equity, pnl = exit_equity(equity, entry, exit_price)
- trade_rows.append(
- {
- "entry_time": dt_values[int(position["entry_index"])],
- "exit_time": dt_values[index],
- "side": "short",
- "entry": entry,
- "exit": exit_price,
- "pnl": pnl,
- "return": new_equity / equity - 1.0,
- "bars": index - int(position["entry_index"]),
- }
- )
- equity = new_equity
- current_equity = equity
- position = None
- else:
- current_equity = mark_equity(equity, entry, float(close_values[index]))
- peak = max(peak, current_equity)
- equity_rows.append({"ts": ts_values[index], "dt": dt_values[index], "equity": current_equity, "drawdown": (peak - current_equity) / peak})
- if index == len(frame) - 1 or equity <= 0.0:
- continue
- if position is not None:
- held = index - int(position["entry_index"])
- if bool(exit_values[index]) or held >= int(candidate.params["max_hold"]):
- pending_exit = True
- continue
- if bool(entry_values[index]):
- pending_entry = True
- return BacktestResult(pd.DataFrame(equity_rows), pd.DataFrame(trade_rows))
- def annualized(total_return: float, start: pd.Timestamp, end: pd.Timestamp) -> float:
- years = (end - start).total_seconds() / 31_536_000
- if years <= 0.0:
- return 0.0
- if total_return <= -1.0:
- return -1.0
- return (1.0 + total_return) ** (1.0 / years) - 1.0
- def horizon_return(equity: pd.DataFrame, months: int) -> float:
- end = equity["dt"].iloc[-1]
- cutoff = end - pd.DateOffset(months=months)
- before = equity[equity["dt"] <= cutoff]
- start_equity = float(before["equity"].iloc[-1]) if len(before) else float(equity["equity"].iloc[0])
- return float(equity["equity"].iloc[-1]) / start_equity - 1.0
- def monthly_returns(equity: pd.DataFrame, key: dict[str, object]) -> pd.DataFrame:
- monthly = equity.set_index("dt")["equity"].resample("ME").last().ffill().pct_change().dropna()
- rows = [{**key, "month": index.strftime("%Y-%m"), "return": value} for index, value in monthly.items()]
- return pd.DataFrame(rows)
- def summarize(symbol: str, bar: str, candidate: Candidate, result: BacktestResult) -> dict[str, object]:
- equity = result.equity
- trades = result.trades
- total_return = float(equity["equity"].iloc[-1] / equity["equity"].iloc[0] - 1.0)
- ann = annualized(total_return, equity["dt"].iloc[0], equity["dt"].iloc[-1])
- max_dd = float(equity["drawdown"].max())
- wins = trades[trades["return"] > 0.0] if len(trades) else trades
- losses = trades[trades["return"] < 0.0] if len(trades) else trades
- gross_profit = float(wins["return"].sum()) if len(wins) else 0.0
- gross_loss = abs(float(losses["return"].sum())) if len(losses) else 0.0
- avg_win = float(wins["return"].mean()) if len(wins) else 0.0
- avg_loss = abs(float(losses["return"].mean())) if len(losses) else 0.0
- return {
- "symbol": symbol,
- "bar": bar,
- "family": candidate.family,
- "candidate": candidate.name,
- "total_return": total_return,
- "annualized_return": ann,
- "max_drawdown": max_dd,
- "calmar": ann / max_dd if max_dd else 0.0,
- "trades": int(len(trades)),
- "win_rate": float(len(wins) / len(trades)) if len(trades) else 0.0,
- "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- "short_trade_ratio": 1.0 if len(trades) else 0.0,
- "return_3y": horizon_return(equity, 36),
- "return_1y": horizon_return(equity, 12),
- "return_6m": horizon_return(equity, 6),
- "return_3m": horizon_return(equity, 3),
- "first_time": equity["dt"].iloc[0].strftime("%Y-%m-%d %H:%M"),
- "last_time": equity["dt"].iloc[-1].strftime("%Y-%m-%d %H:%M"),
- }
- def worth_label(row: pd.Series) -> str:
- if row["trades"] >= MIN_TRADES and row["total_return"] > 0.0 and row["calmar"] > 0.25 and row["return_1y"] > 0.0:
- return "continue"
- return "stop"
- def main() -> int:
- OUT_DIR.mkdir(parents=True, exist_ok=True)
- candidates = build_candidates()
- total_rows: list[dict[str, object]] = []
- monthly_frames: list[pd.DataFrame] = []
- for symbol in SYMBOLS:
- for bar in BARS:
- frame = load_frame(symbol, bar)
- for candidate in candidates:
- result = backtest(frame, candidate)
- if len(result.equity) == 0:
- continue
- row = summarize(symbol, bar, candidate, result)
- if row["trades"] < MIN_TRADES:
- continue
- total_rows.append(row)
- monthly_frames.append(monthly_returns(result.equity, {key: row[key] for key in ("symbol", "bar", "family", "candidate")}))
- totals = pd.DataFrame(total_rows)
- totals = totals.sort_values(["calmar", "annualized_return", "profit_factor"], ascending=False).reset_index(drop=True)
- totals["worth_continuing"] = totals.apply(worth_label, axis=1)
- top3 = totals.head(3).copy()
- monthly = pd.concat(monthly_frames, ignore_index=True) if monthly_frames else pd.DataFrame()
- top_monthly = monthly.merge(top3[["symbol", "bar", "family", "candidate"]], on=["symbol", "bar", "family", "candidate"], how="inner")
- totals.to_csv(OUT_DIR / "intraday-totals.csv", index=False)
- top3.to_csv(OUT_DIR / "intraday-top3.csv", index=False)
- monthly.to_csv(OUT_DIR / "intraday-monthly.csv", index=False)
- top_monthly.to_csv(OUT_DIR / "intraday-top3-monthly.csv", index=False)
- lines = [
- "# Short-Bias Intraday Search",
- "",
- f"Universe: {', '.join(SYMBOLS)} on {', '.join(BARS)}. All candidates are short-only; short trade ratio is therefore 1.0 for traded rows.",
- f"Cost model: {TAKER_FEE:.2%} single-side taker fee on leveraged notional, charged on entry and exit; leverage {LEVERAGE:g}x.",
- "",
- "## Top 3",
- "",
- ]
- for index, row in top3.iterrows():
- lines.append(
- f"{index + 1}. {row.symbol} {row.bar} {row.candidate}: total {row.total_return:.2%}, annualized {row.annualized_return:.2%}, "
- f"max DD {row.max_drawdown:.2%}, Calmar {row.calmar:.2f}, trades {int(row.trades)}, win {row.win_rate:.2%}, "
- f"PF {row.profit_factor:.2f}, 3y {row.return_3y:.2%}, 1y {row.return_1y:.2%}, 6m {row.return_6m:.2%}, 3m {row.return_3m:.2%}, "
- f"decision {row.worth_continuing}."
- )
- lines.extend(
- [
- "",
- "## Continue",
- "",
- "Continue only candidates marked `continue`; the rest fail the direct requirement of positive full-period and recent 1y net return with enough trades.",
- ]
- )
- (OUT_DIR / "intraday-report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- print(top3.to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|