from __future__ import annotations from dataclasses import dataclass from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/recent-regime") PREFIX = "recent-squeeze-refine" INITIAL_EQUITY = 10_000.0 ROUNDTRIP_COST = 0.0021 HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("90d", pd.DateOffset(days=90)), ("30d", pd.DateOffset(days=30)), ("14d", pd.DateOffset(days=14)), ("7d", pd.DateOffset(days=7)), ) @dataclass(frozen=True) class Candidate: quantile: float trigger_bars: int trigger_pct: float stop_pct: float take_pct: float hold_bars: int side_mode: str ratio_filter: str cooldown_bars: int @property def name(self) -> str: return ( f"sqref-q{self.quantile:g}-tb{self.trigger_bars}-tr{self.trigger_pct:g}" f"-sl{self.stop_pct:g}-tp{self.take_pct:g}-h{self.hold_bars}" f"-{self.side_mode}-{self.ratio_filter}-cd{self.cooldown_bars}" ) def load_frame(symbol: str) -> pd.DataFrame: frame = pd.read_csv(DATA_DIR / symbol / "15m.csv") frame["dt"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("dt") def aligned_frame() -> pd.DataFrame: eth = load_frame("ETH-USDT-SWAP").add_prefix("eth_") btc = load_frame("BTC-USDT-SWAP").add_prefix("btc_") frame = eth.join(btc, how="inner") frame["ratio"] = frame["eth_close"] / frame["btc_close"] frame = frame.dropna() return frame[frame.index >= frame.index[-1] - pd.DateOffset(years=3)] def max_drawdown(equity: pd.Series) -> float: peak = equity.cummax() return float(((peak - equity) / peak).max()) if len(equity) else 0.0 def annualized(total: float, start: pd.Timestamp, end: pd.Timestamp) -> float: years = (end - start).total_seconds() / 31_536_000 if years <= 0.0 or total <= -1.0: return -1.0 if total <= -1.0 else 0.0 return (1.0 + total) ** (1.0 / years) - 1.0 def squeeze_series(frame: pd.DataFrame, quantile: float) -> pd.Series: close = frame["eth_close"] middle = close.rolling(96).mean() width = 4.0 * close.rolling(96).std(ddof=0) / middle return width <= width.rolling(960).quantile(quantile) def ratio_allows(frame: pd.DataFrame, index: int, side: str, ratio_filter: str) -> bool: if ratio_filter == "none": return True ratio_return = frame["ratio"].iloc[index] / frame["ratio"].iloc[index - 96] - 1.0 if ratio_filter == "weak-short": return side == "short" and ratio_return < -0.01 if ratio_filter == "strong-long": return side == "long" and ratio_return > 0.01 if ratio_filter == "directional": return bool((side == "short" and ratio_return < -0.005) or (side == "long" and ratio_return > 0.005)) raise ValueError(f"unknown ratio_filter {ratio_filter}") def signal_side(candidate: Candidate, frame: pd.DataFrame, squeeze: pd.Series, index: int) -> str | None: if not bool(squeeze.iloc[index]): return None move = frame["eth_close"].iloc[index] / frame["eth_close"].iloc[index - candidate.trigger_bars] - 1.0 if candidate.side_mode in ("long", "both") and move >= candidate.trigger_pct and ratio_allows(frame, index, "long", candidate.ratio_filter): return "long" if candidate.side_mode in ("short", "both") and move <= -candidate.trigger_pct and ratio_allows(frame, index, "short", candidate.ratio_filter): return "short" return None def exit_return(side: str, entry: float, exit_price: float) -> float: gross = exit_price / entry - 1.0 if side == "long" else entry / exit_price - 1.0 return gross - ROUNDTRIP_COST def mark_equity(equity: float, side: str, entry: float, mark: float) -> float: gross = mark / entry - 1.0 if side == "long" else entry / mark - 1.0 return equity * (1.0 + gross) def run_candidate(frame: pd.DataFrame, candidate: Candidate) -> tuple[pd.Series, pd.DataFrame]: squeeze = squeeze_series(frame, candidate.quantile) equity = INITIAL_EQUITY position: dict[str, object] | None = None pending_side: str | None = None cooldown_until = -1 curve: list[dict[str, object]] = [] trades: list[dict[str, object]] = [] rows = list(frame.itertuples()) warmup = max(960, 96, candidate.trigger_bars) + 1 for index in range(warmup, len(rows)): row = rows[index] ts = frame.index[index] if pending_side is not None and position is None: entry = float(row.eth_open) position = {"side": pending_side, "entry": entry, "entry_index": index, "entry_time": ts} pending_side = None current = equity if position is not None: side = str(position["side"]) entry = float(position["entry"]) if side == "long": stop_price = entry * (1.0 - candidate.stop_pct) take_price = entry * (1.0 + candidate.take_pct) stop_hit = float(row.eth_low) <= stop_price take_hit = float(row.eth_high) >= take_price else: stop_price = entry * (1.0 + candidate.stop_pct) take_price = entry * (1.0 - candidate.take_pct) stop_hit = float(row.eth_high) >= stop_price take_hit = float(row.eth_low) <= take_price hold_hit = index - int(position["entry_index"]) >= candidate.hold_bars if stop_hit or take_hit or hold_hit: exit_price = stop_price if stop_hit else take_price if take_hit else float(row.eth_close) ret = exit_return(side, entry, exit_price) equity *= 1.0 + ret trades.append( { "name": candidate.name, "entry_time": position["entry_time"], "exit_time": ts, "side": side, "entry": entry, "exit": exit_price, "return": ret, "bars": index - int(position["entry_index"]), } ) current = equity position = None cooldown_until = index + candidate.cooldown_bars else: current = mark_equity(equity, side, entry, float(row.eth_close)) curve.append({"dt": ts, "equity": current}) if index == len(rows) - 1 or position is not None or index < cooldown_until: continue side = signal_side(candidate, frame, squeeze, index) if side is not None: pending_side = side equity_series = pd.DataFrame(curve).set_index("dt")["equity"] return equity_series, pd.DataFrame(trades) def metrics(equity: pd.Series, trades: pd.DataFrame, start: pd.Timestamp | None = None) -> dict[str, object]: scoped = equity if start is None else equity[equity.index >= start] if len(scoped) < 2: scoped = equity scoped_trades = trades if start is None or trades.empty else trades[pd.to_datetime(trades["entry_time"], utc=True) >= start] trade_returns = scoped_trades["return"] if len(scoped_trades) else pd.Series(dtype=float) wins = trade_returns[trade_returns > 0.0] losses = trade_returns[trade_returns < 0.0] total = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) gross_profit = float(wins.sum()) if len(wins) else 0.0 gross_loss = abs(float(losses.sum())) if len(losses) else 0.0 avg_win = float(wins.mean()) if len(wins) else 0.0 avg_loss = abs(float(losses.mean())) if len(losses) else 0.0 return { "start": scoped.index[0].strftime("%Y-%m-%d %H:%M"), "end": scoped.index[-1].strftime("%Y-%m-%d %H:%M"), "total_return": total, "annualized_return": annualized(total, scoped.index[0], scoped.index[-1]), "max_drawdown": max_drawdown(scoped), "calmar": annualized(total, scoped.index[0], scoped.index[-1]) / max_drawdown(scoped) if max_drawdown(scoped) else 0.0, "trades": int(len(trade_returns)), "win_rate": float(len(wins) / len(trade_returns)) if len(trade_returns) else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else (999.0 if gross_profit else 0.0), "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, } def candidates() -> list[Candidate]: output: list[Candidate] = [] for quantile in (0.15, 0.20): for trigger_bars in (4,): for trigger_pct in (0.004,): for stop_pct, take_pct in ((0.006, 0.012), (0.008, 0.016)): for hold_bars in (48,): for side_mode in ("long", "short", "both"): for ratio_filter in ("none", "directional"): output.append( Candidate( quantile, trigger_bars, trigger_pct, stop_pct, take_pct, hold_bars, side_mode, ratio_filter, 8, ) ) return output def monthly_rows(name: str, equity: pd.Series) -> pd.DataFrame: monthly = equity.resample("ME").last() frame = pd.DataFrame( { "name": name, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(equity.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def main() -> int: OUTPUT_DIR.mkdir(parents=True, exist_ok=True) frame = aligned_frame() total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] monthly_output: list[pd.DataFrame] = [] equity_by_name: dict[str, pd.Series] = {} trades_by_name: dict[str, pd.DataFrame] = {} for candidate in candidates(): equity, trades = run_candidate(frame, candidate) full = metrics(equity, trades) horizons = {label: metrics(equity, trades, equity.index[-1] - offset) for label, offset in HORIZONS} recent_returns = [horizons[label]["total_return"] for label in ("90d", "30d", "14d", "7d")] row = { "name": candidate.name, "side_mode": candidate.side_mode, "ratio_filter": candidate.ratio_filter, "quantile": candidate.quantile, "trigger_bars": candidate.trigger_bars, "trigger_pct": candidate.trigger_pct, "stop_pct": candidate.stop_pct, "take_pct": candidate.take_pct, "hold_bars": candidate.hold_bars, **full, "return_3y": horizons["3y"]["total_return"], "return_1y": horizons["1y"]["total_return"], "return_6m": horizons["6m"]["total_return"], "return_90d": horizons["90d"]["total_return"], "return_30d": horizons["30d"]["total_return"], "return_14d": horizons["14d"]["total_return"], "return_7d": horizons["7d"]["total_return"], "min_recent_return": min(float(value) for value in recent_returns), "recent_trades": sum(int(horizons[label]["trades"]) for label in ("30d", "14d", "7d")), } row["score"] = ( 3.0 * float(row["return_30d"]) + 2.0 * float(row["return_14d"]) + float(row["return_7d"]) + float(row["return_90d"]) - 0.5 * float(row["max_drawdown"]) ) total_rows.append(row) for label, values in horizons.items(): horizon_rows.append({"name": candidate.name, "horizon": label, **values}) equity_by_name[candidate.name] = equity trades_by_name[candidate.name] = trades total = pd.DataFrame(total_rows).sort_values( ["min_recent_return", "return_30d", "return_14d", "score"], ascending=[False, False, False, False], ) qualified = total[ (total["return_30d"] > 0.0) & (total["return_14d"] > 0.0) & (total["return_7d"] > 0.0) & (total["trades"] >= 30) & (total["profit_factor"] >= 1.0) ].copy() top = qualified.head(12) if len(qualified) else total.head(12) for name in top["name"]: monthly_output.append(monthly_rows(str(name), equity_by_name[str(name)])) horizons = pd.DataFrame(horizon_rows) monthly = pd.concat(monthly_output, ignore_index=True) if monthly_output else pd.DataFrame(columns=["name", "month", "start_equity", "end_equity", "return"]) total_path = OUTPUT_DIR / f"{PREFIX}-total.csv" qualified_path = OUTPUT_DIR / f"{PREFIX}-qualified.csv" horizon_path = OUTPUT_DIR / f"{PREFIX}-horizons.csv" monthly_path = OUTPUT_DIR / f"{PREFIX}-monthly.csv" report_path = OUTPUT_DIR / f"{PREFIX}-report.md" total.to_csv(total_path, index=False) qualified.to_csv(qualified_path, index=False) horizons.to_csv(horizon_path, index=False) monthly.to_csv(monthly_path, index=False) report_path.write_text( "\n".join( [ "# Recent Squeeze Breakout Refine", "", "Scope: ETH 15m only, data through the local cache end, roundtrip cost 0.21% on margin.", "", "## Top Qualified", "", markdown_table( top[ [ "name", "total_return", "max_drawdown", "profit_factor", "trades", "return_90d", "return_30d", "return_14d", "return_7d", "min_recent_return", ] ] ), "", "## Interpretation", "", "Qualified rows require positive 30d/14d/7d returns, at least 30 full-period trades, and profit factor >= 1.0.", "If this table still has negative 90d rows, the pattern is recent-regime specific rather than robust across the full recent quarter.", "", ] ), encoding="utf-8", ) print(report_path) print(top.head(8).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())