| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- from __future__ import annotations
- import argparse
- import json
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- CACHE_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/short-bias")
- PREFIX = "swing"
- SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP")
- BARS = ("1H", "4H", "1D")
- INITIAL_EQUITY = 10_000.0
- TAKER_FEE = 0.0004
- ROUNDTRIP_FEE = TAKER_FEE * 2
- YEARS = 10.0
- HORIZONS = (
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- @dataclass(frozen=True)
- class Strategy:
- family: str
- symbol: str
- bar: str
- params: dict[str, float | int]
- @property
- def name(self) -> str:
- params = "-".join(f"{key}{value}" for key, value in self.params.items())
- return f"{self.family}-{self.symbol.split('-')[0].lower()}-{self.bar}-{params}"
- def load_15m_frame(symbol: str, years: float) -> pd.DataFrame:
- path = CACHE_DIR / symbol / "15m.csv"
- if not path.exists():
- raise FileNotFoundError(f"missing local cache: {path}")
- frame = pd.read_csv(path)
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts")
- start = frame.index[-1] - pd.DateOffset(years=years)
- return frame[frame.index >= start]
- def resample_frame(frame: pd.DataFrame, bar: str) -> pd.DataFrame:
- rule = {"1H": "1h", "4H": "4h", "1D": "1D"}[bar]
- out = frame.resample(rule, label="left", closed="left").agg(
- open=("open", "first"),
- high=("high", "max"),
- low=("low", "min"),
- close=("close", "last"),
- volume=("volume", "sum"),
- )
- return out.dropna()
- def true_range(frame: pd.DataFrame) -> pd.Series:
- previous = frame["close"].shift(1)
- return pd.concat(
- [
- frame["high"] - frame["low"],
- (frame["high"] - previous).abs(),
- (frame["low"] - previous).abs(),
- ],
- axis=1,
- ).max(axis=1)
- def signal_frame(strategy: Strategy, frame: pd.DataFrame, btc_frame: pd.DataFrame | None) -> pd.DataFrame:
- close = frame["close"]
- high = frame["high"]
- low = frame["low"]
- open_ = frame["open"]
- p = strategy.params
- fast = close.ewm(span=int(p["fast"]), adjust=False).mean()
- slow = close.ewm(span=int(p["slow"]), adjust=False).mean()
- atr = true_range(frame).rolling(int(p["atr"])).mean()
- prior_low = low.shift(1).rolling(int(p.get("entry", 20))).min()
- prior_high = high.shift(1).rolling(int(p.get("exit", 10))).max()
- if strategy.family == "donchian_breakdown":
- entry = (close < prior_low) & (close < slow)
- exit_ = (close > fast) | (close > prior_high)
- elif strategy.family == "ema_death_cross":
- previous_fast = fast.shift(1)
- previous_slow = slow.shift(1)
- entry = (previous_fast >= previous_slow) & (fast < slow)
- exit_ = (fast > slow) | (close > slow)
- elif strategy.family == "ma_rebound_short":
- entry = (close < slow) & (high >= fast) & (close < fast) & (close < open_)
- exit_ = (close > fast) | (close > slow)
- elif strategy.family == "btc_riskoff_eth":
- if btc_frame is None:
- raise ValueError("btc_riskoff_eth requires BTC signal frame")
- aligned = pd.DataFrame({"eth_close": close}).join(btc_frame[["close"]].rename(columns={"close": "btc_close"}), how="inner")
- btc_close = aligned["btc_close"]
- btc_fast = btc_close.ewm(span=int(p["btc_fast"]), adjust=False).mean()
- btc_slow = btc_close.ewm(span=int(p["btc_slow"]), adjust=False).mean()
- btc_ret = btc_close / btc_close.shift(int(p["btc_lookback"])) - 1.0
- eth_ret = aligned["eth_close"] / aligned["eth_close"].shift(int(p["eth_lookback"])) - 1.0
- entry = ((btc_close < btc_slow) & (btc_ret <= -float(p["btc_drop"])) & (eth_ret <= float(p["eth_max_return"]))).reindex(frame.index, fill_value=False)
- exit_ = ((btc_close > btc_fast) | (aligned["eth_close"] > fast.reindex(aligned.index))).reindex(frame.index, fill_value=False)
- elif strategy.family == "vol_expansion_short":
- atr_pct = atr / close
- vol_gate = atr_pct > atr_pct.rolling(int(p["vol_window"])).quantile(float(p["vol_quantile"]))
- entry = (close < prior_low) & (close < slow) & vol_gate
- exit_ = (close > fast) | (atr_pct < atr_pct.rolling(int(p["vol_window"])).median())
- else:
- raise ValueError(f"unknown family: {strategy.family}")
- out = pd.DataFrame({"entry": entry.fillna(False), "exit": exit_.fillna(False), "atr": atr}, index=frame.index)
- return out
- def close_short(entry_price: float, exit_price: float, equity: float) -> tuple[float, float]:
- gross_return = entry_price / exit_price - 1.0
- net_return = gross_return - ROUNDTRIP_FEE
- return max(0.0, equity * (1.0 + net_return)), net_return
- def run_strategy(strategy: Strategy, frame: pd.DataFrame, btc_frame: pd.DataFrame | None) -> dict[str, object]:
- signals = signal_frame(strategy, frame, btc_frame)
- warmup = max(int(strategy.params.get(key, 0)) for key in ("slow", "entry", "exit", "atr", "vol_window", "btc_slow")) + 2
- equity = INITIAL_EQUITY
- position: dict[str, float | int | pd.Timestamp] | None = None
- pending_entry = False
- pending_exit = False
- trades: list[dict[str, object]] = []
- equity_curve: list[dict[str, object]] = []
- rows = list(frame.itertuples())
- for index in range(warmup, len(rows)):
- row = rows[index]
- ts = frame.index[index]
- if pending_exit and position is not None:
- equity, net_return = close_short(float(position["entry_price"]), float(row.open), equity)
- trades.append(
- {
- "side": "Short",
- "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"),
- "exit_time": ts.strftime("%Y-%m-%d %H:%M"),
- "entry_price": float(position["entry_price"]),
- "exit_price": float(row.open),
- "return": net_return,
- "hold_bars": index - int(position["entry_index"]),
- }
- )
- position = None
- pending_exit = False
- if pending_entry and position is None and equity > 0.0:
- atr = float(signals["atr"].iloc[index - 1])
- position = {
- "entry_time": ts,
- "entry_index": index,
- "entry_price": float(row.open),
- "stop_price": float(row.open) + atr * float(strategy.params["stop_atr"]),
- "take_price": max(0.01, float(row.open) - atr * float(strategy.params["take_atr"])),
- }
- pending_entry = False
- mark_equity = equity
- if position is not None:
- stop_hit = float(row.high) >= float(position["stop_price"])
- take_hit = float(row.low) <= float(position["take_price"])
- if stop_hit or take_hit:
- exit_price = float(position["stop_price"] if stop_hit else position["take_price"])
- equity, net_return = close_short(float(position["entry_price"]), exit_price, equity)
- trades.append(
- {
- "side": "Short",
- "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"),
- "exit_time": ts.strftime("%Y-%m-%d %H:%M"),
- "entry_price": float(position["entry_price"]),
- "exit_price": exit_price,
- "return": net_return,
- "hold_bars": index - int(position["entry_index"]),
- }
- )
- position = None
- mark_equity = equity
- if position is not None:
- gross_return = float(position["entry_price"]) / float(row.close) - 1.0
- mark_equity = max(0.0, equity * (1.0 + gross_return - TAKER_FEE))
- equity_curve.append({"ts": ts, "equity": mark_equity})
- if index == len(rows) - 1 or equity <= 0.0:
- continue
- if position is not None:
- held = index - int(position["entry_index"])
- if bool(signals["exit"].iloc[index]) or held >= int(strategy.params["max_hold"]):
- pending_exit = True
- elif bool(signals["entry"].iloc[index]):
- pending_entry = True
- if position is not None:
- last = rows[-1]
- ts = frame.index[-1]
- equity, net_return = close_short(float(position["entry_price"]), float(last.close), equity)
- trades.append(
- {
- "side": "Short",
- "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"),
- "exit_time": ts.strftime("%Y-%m-%d %H:%M"),
- "entry_price": float(position["entry_price"]),
- "exit_price": float(last.close),
- "return": net_return,
- "hold_bars": len(rows) - 1 - int(position["entry_index"]),
- }
- )
- equity_curve.append({"ts": ts, "equity": equity})
- return {"trades": trades, "equity_curve": equity_curve}
- def daily_equity(result: dict[str, object]) -> pd.Series:
- curve = pd.DataFrame(result["equity_curve"])
- curve["ts"] = pd.to_datetime(curve["ts"], utc=True)
- series = curve.set_index("ts")["equity"].sort_index()
- series = pd.concat([pd.Series([INITIAL_EQUITY], index=[series.index[0].normalize()]), series]).sort_index()
- series = series.groupby(level=0).last()
- index = pd.date_range(series.index[0].normalize(), series.index[-1].normalize(), freq="1D", tz="UTC")
- return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index)
- def equity_metrics(series: pd.Series) -> dict[str, float]:
- total = float(series.iloc[-1] / series.iloc[0] - 1.0)
- years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365
- annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0
- drawdown = ((series.cummax() - series) / series.cummax()).max()
- return {
- "total_return": total,
- "annualized_return": annual,
- "max_drawdown": float(drawdown),
- "calmar": annual / float(drawdown) if drawdown else 0.0,
- }
- def trade_metrics(trades: list[dict[str, object]]) -> dict[str, float | int]:
- returns = [float(trade["return"]) for trade in trades]
- wins = [value for value in returns if value > 0.0]
- losses = [value for value in returns if value < 0.0]
- avg_win = sum(wins) / len(wins) if wins else 0.0
- avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0
- gross_profit = sum(wins)
- gross_loss = abs(sum(losses))
- return {
- "trades": len(returns),
- "win_rate": len(wins) / len(returns) if returns else 0.0,
- "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else 0.0,
- "short_trade_ratio": 1.0 if returns else 0.0,
- }
- def horizon_returns(series: pd.Series) -> dict[str, float]:
- out: dict[str, float] = {}
- end = series.index[-1]
- for label, offset in HORIZONS:
- scoped = series[series.index >= end - offset]
- out[f"return_{label}"] = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) if len(scoped) >= 2 else 0.0
- return out
- def monthly_rows(strategy: Strategy, series: pd.Series) -> pd.DataFrame:
- monthly = series.resample("ME").last()
- out = pd.DataFrame(
- {
- "name": strategy.name,
- "symbol": strategy.symbol,
- "bar": strategy.bar,
- "family": strategy.family,
- "month": monthly.index.strftime("%Y-%m"),
- "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(),
- "end_equity": monthly.to_numpy(),
- }
- )
- out["return"] = out["end_equity"] / out["start_equity"] - 1.0
- return out
- def yearly_rows(strategy: Strategy, series: pd.Series) -> pd.DataFrame:
- yearly = series.resample("YE").last()
- out = pd.DataFrame(
- {
- "name": strategy.name,
- "symbol": strategy.symbol,
- "bar": strategy.bar,
- "family": strategy.family,
- "year": yearly.index.strftime("%Y"),
- "start_equity": yearly.shift(1).fillna(series.iloc[0]).to_numpy(),
- "end_equity": yearly.to_numpy(),
- }
- )
- out["return"] = out["end_equity"] / out["start_equity"] - 1.0
- return out
- def build_strategies() -> list[Strategy]:
- strategies: list[Strategy] = []
- holds = {"1H": 240, "4H": 120, "1D": 80}
- for symbol in SYMBOLS:
- for bar in BARS:
- for fast, slow in ((20, 80), (30, 120), (50, 200)):
- for entry, exit_ in ((20, 10), (55, 20)):
- for stop_atr, take_atr in ((2.0, 3.0), (3.0, 5.0)):
- base = {"fast": fast, "slow": slow, "entry": entry, "exit": exit_, "atr": 14, "stop_atr": stop_atr, "take_atr": take_atr, "max_hold": holds[bar]}
- strategies.append(Strategy("donchian_breakdown", symbol, bar, base))
- for stop_atr, take_atr in ((2.0, 4.0), (3.0, 6.0)):
- base = {"fast": fast, "slow": slow, "entry": 20, "exit": 10, "atr": 14, "stop_atr": stop_atr, "take_atr": take_atr, "max_hold": holds[bar]}
- strategies.append(Strategy("ema_death_cross", symbol, bar, base))
- strategies.append(Strategy("ma_rebound_short", symbol, bar, base))
- strategies.append(Strategy("vol_expansion_short", symbol, bar, {**base, "vol_window": 120, "vol_quantile": 0.8}))
- for bar in BARS:
- for btc_lookback, btc_drop in ((3, 0.015), (6, 0.025), (12, 0.04)):
- strategies.append(
- Strategy(
- "btc_riskoff_eth",
- "ETH-USDT-SWAP",
- bar,
- {
- "fast": 20,
- "slow": 80,
- "entry": 20,
- "exit": 10,
- "atr": 14,
- "stop_atr": 2.0,
- "take_atr": 4.0,
- "max_hold": holds[bar],
- "btc_fast": 20,
- "btc_slow": 80,
- "btc_lookback": btc_lookback,
- "eth_lookback": 3,
- "btc_drop": btc_drop,
- "eth_max_return": 0.01,
- },
- )
- )
- return strategies
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def markdown_report(command: str, paths: list[Path], totals: pd.DataFrame, monthly: pd.DataFrame, yearly: pd.DataFrame, qualified: pd.DataFrame) -> str:
- top = qualified.head(10) if len(qualified) else totals.head(10)
- top_names = set(top.head(3)["name"])
- conclusion = (
- f"Usable short-biased candidates found: {len(qualified)} meet positive total return, positive 3y/1y/6m/3m returns, Calmar >= 0.5, profit_factor >= 1.1, and at least 20 trades."
- if len(qualified)
- else "No usable short-biased candidate passed the acceptance filter."
- )
- lines = [
- "# Short-Bias Swing Search",
- "",
- f"Run command: `{command}`",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "Scope: BTC-USDT-SWAP and ETH-USDT-SWAP, resampled from local 15m cache to 1H/4H/1D. SOL was not included because no local SOL cache exists.",
- f"Cost: 0.04% single-side taker fee; closed trades subtract {ROUNDTRIP_FEE:.2%} roundtrip.",
- "Candidate families: Donchian breakdown, EMA death cross, moving-average rebound short, BTC risk-off short ETH, and volatility-expansion breakdown.",
- "",
- f"Conclusion: {conclusion}",
- "",
- "## Top candidates",
- "",
- markdown_table(
- top[
- [
- "name",
- "symbol",
- "bar",
- "family",
- "total_return",
- "annualized_return",
- "max_drawdown",
- "calmar",
- "trades",
- "win_rate",
- "payoff_ratio",
- "profit_factor",
- "short_trade_ratio",
- "return_3y",
- "return_1y",
- "return_6m",
- "return_3m",
- ]
- ]
- ),
- "",
- "## Monthly returns for top 3",
- "",
- markdown_table(monthly[monthly["name"].isin(top_names)].tail(120)),
- "",
- "## Year distribution for top 3",
- "",
- markdown_table(yearly[yearly["name"].isin(top_names)]),
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--years", type=float, default=YEARS)
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- parser.add_argument("--max-candidates", type=int, default=0)
- args = parser.parse_args()
- raw = {symbol: load_15m_frame(symbol, args.years) for symbol in SYMBOLS}
- data = {(symbol, bar): resample_frame(raw[symbol], bar) for symbol in SYMBOLS for bar in BARS}
- strategies = build_strategies()
- if args.max_candidates:
- strategies = strategies[: args.max_candidates]
- total_rows: list[dict[str, object]] = []
- monthly_frames: list[pd.DataFrame] = []
- yearly_frames: list[pd.DataFrame] = []
- trade_rows: list[dict[str, object]] = []
- for index, strategy in enumerate(strategies, start=1):
- btc_frame = data[("BTC-USDT-SWAP", strategy.bar)] if strategy.family == "btc_riskoff_eth" else None
- result = run_strategy(strategy, data[(strategy.symbol, strategy.bar)], btc_frame)
- series = daily_equity(result)
- monthly = monthly_rows(strategy, series)
- yearly = yearly_rows(strategy, series)
- trades = list(result["trades"])
- total_rows.append(
- {
- "name": strategy.name,
- "symbol": strategy.symbol,
- "bar": strategy.bar,
- "family": strategy.family,
- "first_day": series.index[0].strftime("%Y-%m-%d"),
- "last_day": series.index[-1].strftime("%Y-%m-%d"),
- "fee_single_side": TAKER_FEE,
- "roundtrip_fee": ROUNDTRIP_FEE,
- "worst_month_return": float(monthly["return"].min()),
- **equity_metrics(series),
- **trade_metrics(trades),
- **horizon_returns(series),
- **strategy.params,
- }
- )
- monthly_frames.append(monthly)
- yearly_frames.append(yearly)
- trade_rows.extend({"name": strategy.name, "symbol": strategy.symbol, "bar": strategy.bar, "family": strategy.family, **trade} for trade in trades)
- print(f"done {index}/{len(strategies)} {strategy.name}", flush=True)
- totals = pd.DataFrame(total_rows).sort_values(["calmar", "annualized_return", "profit_factor"], ascending=[False, False, False])
- monthly = pd.concat(monthly_frames, ignore_index=True)
- yearly = pd.concat(yearly_frames, ignore_index=True)
- trades = pd.DataFrame(trade_rows)
- qualified = totals[
- (totals["total_return"] > 0.0)
- & (totals["return_3y"] > 0.0)
- & (totals["return_1y"] > 0.0)
- & (totals["return_6m"] > 0.0)
- & (totals["return_3m"] > 0.0)
- & (totals["calmar"] >= 0.5)
- & (totals["profit_factor"] >= 1.1)
- & (totals["trades"] >= 20)
- ].sort_values(["calmar", "annualized_return", "profit_factor"], ascending=[False, False, False])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- totals_path = args.output_dir / f"{PREFIX}-totals.csv"
- monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv"
- yearly_path = args.output_dir / f"{PREFIX}-yearly-distribution.csv"
- trades_path = args.output_dir / f"{PREFIX}-trades.csv"
- qualified_path = args.output_dir / f"{PREFIX}-qualified.csv"
- summary_path = args.output_dir / f"{PREFIX}-summary.json"
- report_path = args.output_dir / f"{PREFIX}-report.md"
- totals.to_csv(totals_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- yearly.to_csv(yearly_path, index=False)
- trades.to_csv(trades_path, index=False)
- qualified.to_csv(qualified_path, index=False)
- summary = {
- "years_requested": args.years,
- "symbols": list(SYMBOLS),
- "bars": list(BARS),
- "sol_included": False,
- "strategy_count": len(strategies),
- "qualified_count": int(len(qualified)),
- "usable_short_biased_candidate_exists": bool(len(qualified)),
- "top_name": str((qualified if len(qualified) else totals).iloc[0]["name"]),
- "output_files": [str(path) for path in (totals_path, monthly_path, yearly_path, trades_path, qualified_path, summary_path, report_path)],
- }
- summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
- command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}"
- report_path.write_text(markdown_report(command, [totals_path, monthly_path, yearly_path, trades_path, qualified_path, summary_path, report_path], totals, monthly, yearly, qualified), encoding="utf-8")
- print(totals.head(10).to_string(index=False, formatters={col: format_cell for col in totals.columns}))
- print(f"qualified={len(qualified)} wrote={report_path}")
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|