| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- from __future__ import annotations
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/recent-regime")
- PREFIX = "recent-squeeze-refine"
- INITIAL_EQUITY = 10_000.0
- ROUNDTRIP_COST = 0.0021
- HORIZONS = (
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("90d", pd.DateOffset(days=90)),
- ("30d", pd.DateOffset(days=30)),
- ("14d", pd.DateOffset(days=14)),
- ("7d", pd.DateOffset(days=7)),
- )
- @dataclass(frozen=True)
- class Candidate:
- quantile: float
- trigger_bars: int
- trigger_pct: float
- stop_pct: float
- take_pct: float
- hold_bars: int
- side_mode: str
- ratio_filter: str
- cooldown_bars: int
- @property
- def name(self) -> str:
- return (
- f"sqref-q{self.quantile:g}-tb{self.trigger_bars}-tr{self.trigger_pct:g}"
- f"-sl{self.stop_pct:g}-tp{self.take_pct:g}-h{self.hold_bars}"
- f"-{self.side_mode}-{self.ratio_filter}-cd{self.cooldown_bars}"
- )
- def load_frame(symbol: str) -> pd.DataFrame:
- frame = pd.read_csv(DATA_DIR / symbol / "15m.csv")
- frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("dt")
- def aligned_frame() -> pd.DataFrame:
- eth = load_frame("ETH-USDT-SWAP").add_prefix("eth_")
- btc = load_frame("BTC-USDT-SWAP").add_prefix("btc_")
- frame = eth.join(btc, how="inner")
- frame["ratio"] = frame["eth_close"] / frame["btc_close"]
- frame = frame.dropna()
- return frame[frame.index >= frame.index[-1] - pd.DateOffset(years=3)]
- def max_drawdown(equity: pd.Series) -> float:
- peak = equity.cummax()
- return float(((peak - equity) / peak).max()) if len(equity) else 0.0
- def annualized(total: float, start: pd.Timestamp, end: pd.Timestamp) -> float:
- years = (end - start).total_seconds() / 31_536_000
- if years <= 0.0 or total <= -1.0:
- return -1.0 if total <= -1.0 else 0.0
- return (1.0 + total) ** (1.0 / years) - 1.0
- def squeeze_series(frame: pd.DataFrame, quantile: float) -> pd.Series:
- close = frame["eth_close"]
- middle = close.rolling(96).mean()
- width = 4.0 * close.rolling(96).std(ddof=0) / middle
- return width <= width.rolling(960).quantile(quantile)
- def ratio_allows(frame: pd.DataFrame, index: int, side: str, ratio_filter: str) -> bool:
- if ratio_filter == "none":
- return True
- ratio_return = frame["ratio"].iloc[index] / frame["ratio"].iloc[index - 96] - 1.0
- if ratio_filter == "weak-short":
- return side == "short" and ratio_return < -0.01
- if ratio_filter == "strong-long":
- return side == "long" and ratio_return > 0.01
- if ratio_filter == "directional":
- return bool((side == "short" and ratio_return < -0.005) or (side == "long" and ratio_return > 0.005))
- raise ValueError(f"unknown ratio_filter {ratio_filter}")
- def signal_side(candidate: Candidate, frame: pd.DataFrame, squeeze: pd.Series, index: int) -> str | None:
- if not bool(squeeze.iloc[index]):
- return None
- move = frame["eth_close"].iloc[index] / frame["eth_close"].iloc[index - candidate.trigger_bars] - 1.0
- if candidate.side_mode in ("long", "both") and move >= candidate.trigger_pct and ratio_allows(frame, index, "long", candidate.ratio_filter):
- return "long"
- if candidate.side_mode in ("short", "both") and move <= -candidate.trigger_pct and ratio_allows(frame, index, "short", candidate.ratio_filter):
- return "short"
- return None
- def exit_return(side: str, entry: float, exit_price: float) -> float:
- gross = exit_price / entry - 1.0 if side == "long" else entry / exit_price - 1.0
- return gross - ROUNDTRIP_COST
- def mark_equity(equity: float, side: str, entry: float, mark: float) -> float:
- gross = mark / entry - 1.0 if side == "long" else entry / mark - 1.0
- return equity * (1.0 + gross)
- def run_candidate(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.DataFrame]:
- squeeze = squeeze_series(frame, candidate.quantile)
- equity = INITIAL_EQUITY
- position: dict[str, object] | None = None
- pending_side: str | None = None
- cooldown_until = -1
- curve: list[dict[str, object]] = []
- trades: list[dict[str, object]] = []
- rows = list(frame.itertuples())
- warmup = max(960, 96, candidate.trigger_bars) + 1
- for index in range(warmup, len(rows)):
- row = rows[index]
- ts = frame.index[index]
- if pending_side is not None and position is None:
- entry = float(row.eth_open)
- position = {"side": pending_side, "entry": entry, "entry_index": index, "entry_time": ts}
- pending_side = None
- current = equity
- if position is not None:
- side = str(position["side"])
- entry = float(position["entry"])
- if side == "long":
- stop_price = entry * (1.0 - candidate.stop_pct)
- take_price = entry * (1.0 + candidate.take_pct)
- stop_hit = float(row.eth_low) <= stop_price
- take_hit = float(row.eth_high) >= take_price
- else:
- stop_price = entry * (1.0 + candidate.stop_pct)
- take_price = entry * (1.0 - candidate.take_pct)
- stop_hit = float(row.eth_high) >= stop_price
- take_hit = float(row.eth_low) <= take_price
- hold_hit = index - int(position["entry_index"]) >= candidate.hold_bars
- if stop_hit or take_hit or hold_hit:
- exit_price = stop_price if stop_hit else take_price if take_hit else float(row.eth_close)
- ret = exit_return(side, entry, exit_price)
- equity *= 1.0 + ret
- trades.append(
- {
- "name": candidate.name,
- "entry_time": position["entry_time"],
- "exit_time": ts,
- "side": side,
- "entry": entry,
- "exit": exit_price,
- "return": ret,
- "bars": index - int(position["entry_index"]),
- }
- )
- current = equity
- position = None
- cooldown_until = index + candidate.cooldown_bars
- else:
- current = mark_equity(equity, side, entry, float(row.eth_close))
- curve.append({"dt": ts, "equity": current})
- if index == len(rows) - 1 or position is not None or index < cooldown_until:
- continue
- side = signal_side(candidate, frame, squeeze, index)
- if side is not None:
- pending_side = side
- equity_series = pd.DataFrame(curve).set_index("dt")["equity"]
- return equity_series, pd.DataFrame(trades)
- def metrics(equity: pd.Series, trades: pd.DataFrame, start: pd.Timestamp | None = None) -> dict[str, object]:
- scoped = equity if start is None else equity[equity.index >= start]
- if len(scoped) < 2:
- scoped = equity
- scoped_trades = trades if start is None or trades.empty else trades[pd.to_datetime(trades["entry_time"], utc=True) >= start]
- trade_returns = scoped_trades["return"] if len(scoped_trades) else pd.Series(dtype=float)
- wins = trade_returns[trade_returns > 0.0]
- losses = trade_returns[trade_returns < 0.0]
- total = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0)
- gross_profit = float(wins.sum()) if len(wins) else 0.0
- gross_loss = abs(float(losses.sum())) if len(losses) else 0.0
- avg_win = float(wins.mean()) if len(wins) else 0.0
- avg_loss = abs(float(losses.mean())) if len(losses) else 0.0
- return {
- "start": scoped.index[0].strftime("%Y-%m-%d %H:%M"),
- "end": scoped.index[-1].strftime("%Y-%m-%d %H:%M"),
- "total_return": total,
- "annualized_return": annualized(total, scoped.index[0], scoped.index[-1]),
- "max_drawdown": max_drawdown(scoped),
- "calmar": annualized(total, scoped.index[0], scoped.index[-1]) / max_drawdown(scoped) if max_drawdown(scoped) else 0.0,
- "trades": int(len(trade_returns)),
- "win_rate": float(len(wins) / len(trade_returns)) if len(trade_returns) else 0.0,
- "profit_factor": gross_profit / gross_loss if gross_loss else (999.0 if gross_profit else 0.0),
- "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0,
- }
- def candidates() -> list[Candidate]:
- output: list[Candidate] = []
- for quantile in (0.15, 0.20):
- for trigger_bars in (4,):
- for trigger_pct in (0.004,):
- for stop_pct, take_pct in ((0.006, 0.012), (0.008, 0.016)):
- for hold_bars in (48,):
- for side_mode in ("long", "short", "both"):
- for ratio_filter in ("none", "directional"):
- output.append(
- Candidate(
- quantile,
- trigger_bars,
- trigger_pct,
- stop_pct,
- take_pct,
- hold_bars,
- side_mode,
- ratio_filter,
- 8,
- )
- )
- return output
- def monthly_rows(name: str, equity: pd.Series) -> pd.DataFrame:
- monthly = equity.resample("ME").last()
- frame = pd.DataFrame(
- {
- "name": name,
- "month": monthly.index.strftime("%Y-%m"),
- "start_equity": monthly.shift(1).fillna(equity.iloc[0]).to_numpy(),
- "end_equity": monthly.to_numpy(),
- }
- )
- frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0
- return frame
- def format_cell(value: object) -> str:
- if isinstance(value, float):
- return f"{value:.6g}"
- return str(value).replace("|", "\\|")
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows)
- def main() -> int:
- OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
- frame = aligned_frame()
- total_rows: list[dict[str, object]] = []
- horizon_rows: list[dict[str, object]] = []
- monthly_output: list[pd.DataFrame] = []
- equity_by_name: dict[str, pd.Series] = {}
- trades_by_name: dict[str, pd.DataFrame] = {}
- for candidate in candidates():
- equity, trades = run_candidate(frame, candidate)
- full = metrics(equity, trades)
- horizons = {label: metrics(equity, trades, equity.index[-1] - offset) for label, offset in HORIZONS}
- recent_returns = [horizons[label]["total_return"] for label in ("90d", "30d", "14d", "7d")]
- row = {
- "name": candidate.name,
- "side_mode": candidate.side_mode,
- "ratio_filter": candidate.ratio_filter,
- "quantile": candidate.quantile,
- "trigger_bars": candidate.trigger_bars,
- "trigger_pct": candidate.trigger_pct,
- "stop_pct": candidate.stop_pct,
- "take_pct": candidate.take_pct,
- "hold_bars": candidate.hold_bars,
- **full,
- "return_3y": horizons["3y"]["total_return"],
- "return_1y": horizons["1y"]["total_return"],
- "return_6m": horizons["6m"]["total_return"],
- "return_90d": horizons["90d"]["total_return"],
- "return_30d": horizons["30d"]["total_return"],
- "return_14d": horizons["14d"]["total_return"],
- "return_7d": horizons["7d"]["total_return"],
- "min_recent_return": min(float(value) for value in recent_returns),
- "recent_trades": sum(int(horizons[label]["trades"]) for label in ("30d", "14d", "7d")),
- }
- row["score"] = (
- 3.0 * float(row["return_30d"])
- + 2.0 * float(row["return_14d"])
- + float(row["return_7d"])
- + float(row["return_90d"])
- - 0.5 * float(row["max_drawdown"])
- )
- total_rows.append(row)
- for label, values in horizons.items():
- horizon_rows.append({"name": candidate.name, "horizon": label, **values})
- equity_by_name[candidate.name] = equity
- trades_by_name[candidate.name] = trades
- total = pd.DataFrame(total_rows).sort_values(
- ["min_recent_return", "return_30d", "return_14d", "score"],
- ascending=[False, False, False, False],
- )
- qualified = total[
- (total["return_30d"] > 0.0)
- & (total["return_14d"] > 0.0)
- & (total["return_7d"] > 0.0)
- & (total["trades"] >= 30)
- & (total["profit_factor"] >= 1.0)
- ].copy()
- top = qualified.head(12) if len(qualified) else total.head(12)
- for name in top["name"]:
- monthly_output.append(monthly_rows(str(name), equity_by_name[str(name)]))
- horizons = pd.DataFrame(horizon_rows)
- monthly = pd.concat(monthly_output, ignore_index=True) if monthly_output else pd.DataFrame(columns=["name", "month", "start_equity", "end_equity", "return"])
- total_path = OUTPUT_DIR / f"{PREFIX}-total.csv"
- qualified_path = OUTPUT_DIR / f"{PREFIX}-qualified.csv"
- horizon_path = OUTPUT_DIR / f"{PREFIX}-horizons.csv"
- monthly_path = OUTPUT_DIR / f"{PREFIX}-monthly.csv"
- report_path = OUTPUT_DIR / f"{PREFIX}-report.md"
- total.to_csv(total_path, index=False)
- qualified.to_csv(qualified_path, index=False)
- horizons.to_csv(horizon_path, index=False)
- monthly.to_csv(monthly_path, index=False)
- report_path.write_text(
- "\n".join(
- [
- "# Recent Squeeze Breakout Refine",
- "",
- "Scope: ETH 15m only, data through the local cache end, roundtrip cost 0.21% on margin.",
- "",
- "## Top Qualified",
- "",
- markdown_table(
- top[
- [
- "name",
- "total_return",
- "max_drawdown",
- "profit_factor",
- "trades",
- "return_90d",
- "return_30d",
- "return_14d",
- "return_7d",
- "min_recent_return",
- ]
- ]
- ),
- "",
- "## Interpretation",
- "",
- "Qualified rows require positive 30d/14d/7d returns, at least 30 full-period trades, and profit factor >= 1.0.",
- "If this table still has negative 90d rows, the pattern is recent-regime specific rather than robust across the full recent quarter.",
- "",
- ]
- ),
- encoding="utf-8",
- )
- print(report_path)
- print(top.head(8).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|