from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/recent-regime") PREFIX = "recent-regime-mean-reversion" SYMBOLS = ("ETH-USDT-SWAP", "BTC-USDT-SWAP") BARS = ("3m", "5m", "15m") INITIAL_EQUITY = 10_000.0 ROUNDTRIP_FEE = 0.0008 HORIZONS = ( ("7d", pd.DateOffset(days=7)), ("14d", pd.DateOffset(days=14)), ("30d", pd.DateOffset(days=30)), ("90d", pd.DateOffset(days=90)), ("6m", pd.DateOffset(months=6)), ("1y", pd.DateOffset(years=1)), ("3y", pd.DateOffset(years=3)), ) @dataclass(frozen=True) class Spec: symbol: str bar: str side_mode: str range_lookback: int compression_window: int compression_quantile: float sweep_pct: float stop_pct: float take_pct: float hold: int @property def name(self) -> str: base = self.symbol.split("-")[0].lower() return ( f"{base}-{self.bar}-fbmr-{self.side_mode}" f"-rl{self.range_lookback}-cw{self.compression_window}-cq{self.compression_quantile:g}" f"-sw{self.sweep_pct:g}-sl{self.stop_pct:g}-tp{self.take_pct:g}-h{self.hold}" ) def load_frame(symbol: str, bar: str, months: int) -> pd.DataFrame: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") start = frame.index[-1] - pd.DateOffset(months=months) return frame[frame.index >= start].copy() def signal_frame(frame: pd.DataFrame, spec: Spec) -> pd.DataFrame: prior_high = frame["high"].shift(1).rolling(spec.range_lookback).max() prior_low = frame["low"].shift(1).rolling(spec.range_lookback).min() midpoint = (prior_high + prior_low) / 2.0 width = (prior_high - prior_low) / frame["close"] width_cap = width.rolling(spec.compression_window).quantile(spec.compression_quantile) compressed = width <= width_cap upper_fake = ( compressed & (frame["high"] >= prior_high * (1.0 + spec.sweep_pct)) & (frame["close"] < prior_high) & (frame["close"] < frame["open"]) ) lower_fake = ( compressed & (frame["low"] <= prior_low * (1.0 - spec.sweep_pct)) & (frame["close"] > prior_low) & (frame["close"] > frame["open"]) ) if spec.side_mode == "short": lower_fake = pd.Series(False, index=frame.index) elif spec.side_mode == "long": upper_fake = pd.Series(False, index=frame.index) return pd.DataFrame( { "long_entry": lower_fake.fillna(False), "short_entry": upper_fake.fillna(False), "long_midpoint": midpoint, "short_midpoint": midpoint, }, index=frame.index, ) def trade_return(side: str, entry_price: float, exit_price: float) -> float: gross = exit_price / entry_price - 1.0 if side == "long" else entry_price / exit_price - 1.0 return gross - ROUNDTRIP_FEE def exit_price(position: dict[str, object], row: object) -> float | None: side = str(position["side"]) stop = float(position["stop"]) take = float(position["take"]) midpoint = float(position["midpoint"]) if side == "long": if float(row.open) <= stop or float(row.open) >= take: return float(row.open) if float(row.low) <= stop: return stop if float(row.high) >= take: return take if float(row.high) >= midpoint: return midpoint else: if float(row.open) >= stop or float(row.open) <= take: return float(row.open) if float(row.high) >= stop: return stop if float(row.low) <= take: return take if float(row.low) <= midpoint: return midpoint return None def run_spec(frame: pd.DataFrame, spec: Spec) -> tuple[pd.Series, list[dict[str, object]]]: signals = signal_frame(frame, spec) warmup = spec.range_lookback + spec.compression_window + 2 trades: list[dict[str, object]] = [] rows = list(frame.itertuples()) short_indices = set(signals.index[signals["short_entry"]].to_series().map(frame.index.get_loc).astype(int)) long_indices = set(signals.index[signals["long_entry"]].to_series().map(frame.index.get_loc).astype(int)) index = warmup while index < len(rows) - 1: side = "short" if index in short_indices else "long" if index in long_indices else "" if not side: index += 1 continue entry_index = index + 1 entry_row = rows[entry_index] entry = float(entry_row.open) position = { "side": side, "entry_time": frame.index[entry_index], "entry_index": entry_index, "entry_price": entry, "stop": entry * (1.0 - spec.stop_pct if side == "long" else 1.0 + spec.stop_pct), "take": entry * (1.0 + spec.take_pct if side == "long" else 1.0 - spec.take_pct), "midpoint": float(signals[f"{side}_midpoint"].iloc[index]), } exit_index = min(entry_index + spec.hold, len(rows) - 1) price = float(rows[exit_index].close) for scan_index in range(entry_index, exit_index + 1): found = exit_price(position, rows[scan_index]) if found is not None: exit_index = scan_index price = found break trades.append( { "side": side, "entry_time": position["entry_time"], "exit_time": frame.index[exit_index], "return": trade_return(side, entry, price), } ) index = exit_index + 1 daily_index = pd.date_range(frame.index[0].normalize(), frame.index[-1].normalize(), freq="1D", tz="UTC") if not trades: return pd.Series(INITIAL_EQUITY, index=daily_index), trades returns = pd.DataFrame( { "date": [pd.Timestamp(trade["exit_time"]).normalize() for trade in trades], "return": [float(trade["return"]) for trade in trades], } ) daily_returns = returns.groupby("date")["return"].apply(lambda values: (1.0 + values).prod() - 1.0) daily_returns = daily_returns.reindex(daily_index, fill_value=0.0) daily = INITIAL_EQUITY * (1.0 + daily_returns).cumprod() daily.iloc[0] = INITIAL_EQUITY return daily, trades def max_drawdown(series: pd.Series) -> float: return float(((series.cummax() - series) / series.cummax()).max()) def metrics(series: pd.Series, trades: list[dict[str, object]], start: pd.Timestamp) -> dict[str, object]: scoped = series[series.index >= start] scoped_trades = [trade for trade in trades if pd.Timestamp(trade["entry_time"]) >= scoped.index[0]] years = max((scoped.index[-1] - scoped.index[0]).total_seconds() / 86_400.0 / 365.0, 1e-9) total_return = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 else -1.0 returns = [float(trade["return"]) for trade in scoped_trades] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] gross_profit = sum(wins) gross_loss = abs(sum(losses)) avg_win = gross_profit / len(wins) if wins else 0.0 avg_loss = gross_loss / len(losses) if losses else 0.0 drawdown = max_drawdown(scoped) return { "start": scoped.index[0].strftime("%Y-%m-%d"), "end": scoped.index[-1].strftime("%Y-%m-%d"), "total_return": total_return, "annualized": annualized, "max_drawdown": drawdown, "calmar": annualized / drawdown if drawdown else 0.0, "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, } def total_row(spec: Spec, series: pd.Series, trades: list[dict[str, object]]) -> dict[str, object]: row = { "name": spec.name, "symbol": spec.symbol, "bar": spec.bar, "side_mode": spec.side_mode, "range_lookback": spec.range_lookback, "compression_window": spec.compression_window, "compression_quantile": spec.compression_quantile, "sweep_pct": spec.sweep_pct, "stop_pct": spec.stop_pct, "take_pct": spec.take_pct, "hold": spec.hold, } row.update(metrics(series, trades, series.index[0])) return row def horizon_rows(spec: Spec, series: pd.Series, trades: list[dict[str, object]]) -> list[dict[str, object]]: rows = [] for label, offset in HORIZONS: start = max(series.index[0], series.index[-1] - offset) row = { "name": spec.name, "symbol": spec.symbol, "bar": spec.bar, "side_mode": spec.side_mode, "horizon": label, } row.update(metrics(series, trades, start)) rows.append(row) return rows def monthly_rows(spec: Spec, series: pd.Series, trades: list[dict[str, object]]) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "name": spec.name, "symbol": spec.symbol, "bar": spec.bar, "side_mode": spec.side_mode, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["total_return"] = frame["end_equity"] / frame["start_equity"] - 1.0 trade_months = pd.Series([pd.Timestamp(trade["entry_time"]).strftime("%Y-%m") for trade in trades], dtype=object) counts = trade_months.value_counts() if len(trade_months) else pd.Series(dtype=int) frame["trades"] = frame["month"].map(counts).fillna(0).astype(int) return frame def build_specs() -> list[Spec]: specs: list[Spec] = [] bar_holds = {"3m": 30, "5m": 18, "15m": 10} for symbol in SYMBOLS: for bar in BARS: for side_mode in ("short", "long", "bidir"): for range_lookback in (24, 48): for compression_quantile in (0.20, 0.35): for sweep_pct in (0.0008, 0.0016): specs.append( Spec( symbol=symbol, bar=bar, side_mode=side_mode, range_lookback=range_lookback, compression_window=range_lookback * 6, compression_quantile=compression_quantile, sweep_pct=sweep_pct, stop_pct=0.006, take_pct=0.008, hold=bar_holds[bar], ) ) return specs def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.4f}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def report_text(totals: pd.DataFrame, horizons: pd.DataFrame, monthly: pd.DataFrame, selected_name: str) -> str: selected_horizons = horizons[horizons["name"] == selected_name] selected_monthly = monthly[monthly["name"] == selected_name] top_cols = [ "name", "symbol", "bar", "side_mode", "total_return", "annualized", "max_drawdown", "calmar", "trades", "win_rate", "profit_factor", "payoff_ratio", ] active_months = selected_monthly[selected_monthly["trades"] > 0] return "\n".join( [ "# Recent Regime False Breakout Mean Reversion", "", "Scope: ETH/BTC perpetual swap local OKX candles, 3m/5m/15m, most recent 36 months only. No network and no live executor changes.", "", "Signal definition: compressed rolling range, sweep beyond the prior range, close back inside, enter the opposite side on next open, exit at range midpoint, stop, take-profit, or max hold.", "", f"Selected by Calmar then annualized return among candidates with at least 30 trades: `{selected_name}`.", "", "## Selected Horizons", "", markdown_table(selected_horizons[["horizon", "total_return", "annualized", "max_drawdown", "calmar", "trades", "win_rate", "profit_factor", "payoff_ratio"]]), "", "## Selected Monthly Summary", "", f"Months: {len(selected_monthly)}, active months: {len(active_months)}, positive active months: {int((active_months['total_return'] > 0.0).sum())}.", "", "Worst active months:", "", markdown_table(active_months.sort_values("total_return").head(10)[["month", "total_return", "trades"]]), "", "## Top Candidates", "", markdown_table(totals.head(15)[top_cols]), "", "## Output Files", "", f"- `{PREFIX}-total.csv`", f"- `{PREFIX}-horizons.csv`", f"- `{PREFIX}-monthly.csv`", f"- `{PREFIX}-report.md`", "", ] ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--months", type=int, default=36) args = parser.parse_args() frames = {(symbol, bar): load_frame(symbol, bar, args.months) for symbol in SYMBOLS for bar in BARS} total_data: list[dict[str, object]] = [] horizon_data: list[dict[str, object]] = [] monthly_parts: list[pd.DataFrame] = [] for index, spec in enumerate(build_specs(), start=1): series, trades = run_spec(frames[(spec.symbol, spec.bar)], spec) total_data.append(total_row(spec, series, trades)) horizon_data.extend(horizon_rows(spec, series, trades)) monthly_parts.append(monthly_rows(spec, series, trades)) if index % 100 == 0: print(f"done {index}", flush=True) totals = pd.DataFrame(total_data).sort_values(["calmar", "annualized", "trades"], ascending=[False, False, False]) horizons = pd.DataFrame(horizon_data) monthly = pd.concat(monthly_parts, ignore_index=True) trade_eligible = totals[totals["trades"] >= 30] selected = (trade_eligible if len(trade_eligible) else totals).iloc[0] args.output_dir.mkdir(parents=True, exist_ok=True) total_path = args.output_dir / f"{PREFIX}-total.csv" horizon_path = args.output_dir / f"{PREFIX}-horizons.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly.csv" report_path = args.output_dir / f"{PREFIX}-report.md" totals.to_csv(total_path, index=False) horizons.to_csv(horizon_path, index=False) monthly.to_csv(monthly_path, index=False) report_path.write_text(report_text(totals, horizons, monthly, str(selected["name"])), encoding="utf-8") print(totals.head(10).to_string(index=False)) print(f"wrote {total_path}, {horizon_path}, {monthly_path}, {report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())