from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/eth-exploration") SYMBOLS = ("ETH-USDT-SWAP", "BTC-USDT-SWAP") INITIAL_EQUITY = 10_000.0 FEE = 0.0004 ROUNDTRIP_FEE = FEE * 2 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Spec: symbol: str bar: str side_mode: str range_lookback: int compression_window: int compression_quantile: float sweep_pct: float htf_slow: int htf_slope_lookback: int stop_pct: float take_pct: float hold: int @property def name(self) -> str: base = self.symbol.split("-")[0].lower() return ( f"false_breakout_reversal-{base}-{self.bar}-{self.side_mode}" f"-rl{self.range_lookback}-cw{self.compression_window}-cq{self.compression_quantile:g}" f"-sw{self.sweep_pct:g}-hs{self.htf_slow}-hl{self.htf_slope_lookback}" f"-sl{self.stop_pct:g}-tp{self.take_pct:g}-h{self.hold}" ) def load_frame(symbol: str) -> pd.DataFrame: frame = pd.read_csv(DATA_DIR / symbol / "15m.csv") frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") def resample(frame: pd.DataFrame, bar: str) -> pd.DataFrame: rule = {"1H": "1h", "2H": "2h", "4H": "4h"}[bar] return ( frame.resample(rule, label="left", closed="left") .agg(open=("open", "first"), high=("high", "max"), low=("low", "min"), close=("close", "last"), volume=("volume", "sum")) .dropna() ) def signal_frame(spec: Spec, frame: pd.DataFrame) -> pd.DataFrame: close = frame["close"] open_ = frame["open"] prior_high = frame["high"].shift(1).rolling(spec.range_lookback).max() prior_low = frame["low"].shift(1).rolling(spec.range_lookback).min() mid = (prior_high + prior_low) / 2 width = (prior_high - prior_low) / close width_cap = width.rolling(spec.compression_window).quantile(spec.compression_quantile) compressed = width <= width_cap htf_rule = {"1H": "4h", "2H": "4h", "4H": "1D"}[spec.bar] htf_close = close.resample(htf_rule, label="left", closed="left").last().dropna() htf_ma = htf_close.ewm(span=spec.htf_slow, adjust=False).mean() htf_slope = htf_ma / htf_ma.shift(spec.htf_slope_lookback) - 1 htf = pd.DataFrame({"htf_ma": htf_ma, "htf_slope": htf_slope}).reindex(frame.index, method="ffill") short_entry = ( compressed & (frame["high"] > prior_high * (1 + spec.sweep_pct)) & (close < prior_high) & (close < open_) & (close <= htf["htf_ma"]) & (htf["htf_slope"] <= 0) ) long_entry = ( compressed & (frame["low"] < prior_low * (1 - spec.sweep_pct)) & (close > prior_low) & (close > open_) & (close >= htf["htf_ma"]) & (htf["htf_slope"] >= 0) ) if spec.side_mode == "short": long_entry = pd.Series(False, index=frame.index) return pd.DataFrame( { "short_entry": short_entry.fillna(False), "long_entry": long_entry.fillna(False), "short_exit": (close <= mid).fillna(False), "long_exit": (close >= mid).fillna(False), }, index=frame.index, ) def close_return(side: str, entry: float, exit_: float) -> float: gross = exit_ / entry - 1 if side == "long" else entry / exit_ - 1 return gross - ROUNDTRIP_FEE def run_spec(spec: Spec, frame: pd.DataFrame) -> tuple[pd.Series, list[dict[str, object]]]: signals = signal_frame(spec, frame) warmup = max(spec.range_lookback + spec.compression_window, spec.htf_slow * 4 + spec.htf_slope_lookback * 4) + 2 equity = INITIAL_EQUITY position: dict[str, object] | None = None pending_entry: str | None = None pending_exit = False trades: list[dict[str, object]] = [] curve: list[tuple[pd.Timestamp, float]] = [] rows = list(frame.itertuples()) for index in range(warmup, len(rows)): row = rows[index] ts = frame.index[index] if pending_exit and position is not None: net = close_return(str(position["side"]), float(position["entry_price"]), float(row.open)) equity *= max(0.0, 1 + net) trades.append({"side": position["side"], "entry_time": position["entry_time"], "exit_time": ts, "return": net}) position = None pending_exit = False if pending_entry is not None and position is None and equity > 0: side = pending_entry position = { "side": side, "entry_time": ts, "entry_index": index, "entry_price": float(row.open), "stop": float(row.open) * (1 - spec.stop_pct if side == "long" else 1 + spec.stop_pct), "take": float(row.open) * (1 + spec.take_pct if side == "long" else 1 - spec.take_pct), } pending_entry = None mark = equity if position is not None: side = str(position["side"]) stop_hit = row.low <= float(position["stop"]) if side == "long" else row.high >= float(position["stop"]) take_hit = row.high >= float(position["take"]) if side == "long" else row.low <= float(position["take"]) if stop_hit or take_hit: price = float(position["stop"] if stop_hit else position["take"]) net = close_return(side, float(position["entry_price"]), price) equity *= max(0.0, 1 + net) trades.append({"side": side, "entry_time": position["entry_time"], "exit_time": ts, "return": net}) position = None mark = equity else: gross = row.close / float(position["entry_price"]) - 1 if side == "long" else float(position["entry_price"]) / row.close - 1 mark = equity * (1 + gross - FEE) curve.append((ts, mark)) if index == len(rows) - 1 or equity <= 0: continue if position is not None: side = str(position["side"]) held = index - int(position["entry_index"]) if bool(signals[f"{side}_exit"].iloc[index]) or held >= spec.hold: pending_exit = True elif bool(signals["short_entry"].iloc[index]): pending_entry = "short" elif bool(signals["long_entry"].iloc[index]): pending_entry = "long" series = pd.Series({ts: value for ts, value in curve}).sort_index() daily = series.resample("1D").last().ffill() daily = pd.concat([pd.Series([INITIAL_EQUITY], index=[daily.index[0].normalize()]), daily]).sort_index() return daily.groupby(level=0).last(), trades def period_metrics(equity: pd.Series, trades: list[dict[str, object]], offset: pd.DateOffset | None) -> dict[str, object]: start = equity.index[0] if offset is None else equity.index[-1] - offset scoped = equity[equity.index >= start] scoped_trades = [trade for trade in trades if pd.Timestamp(trade["entry_time"]) >= scoped.index[0]] total = float(scoped.iloc[-1] / scoped.iloc[0] - 1) years = (scoped.index[-1] - scoped.index[0]).total_seconds() / 86_400 / 365 annual = (1 + total) ** (1 / years) - 1 if total > -1 and years > 0 else 0.0 drawdown = float(((scoped.cummax() - scoped) / scoped.cummax()).max()) returns = [float(trade["return"]) for trade in scoped_trades] wins = [value for value in returns if value > 0] losses = [value for value in returns if value < 0] profit_factor = sum(wins) / abs(sum(losses)) if losses else (999.0 if wins else 0.0) return { "total_return": total, "annualized_return": annual, "max_drawdown": drawdown, "win_rate": len(wins) / len(returns) if returns else 0.0, "profit_factor": profit_factor, "trades": len(returns), } def stability_rows(equity: pd.Series, trades: list[dict[str, object]]) -> pd.DataFrame: rows: list[dict[str, object]] = [] for freq, label_name in (("YE", "year"), ("ME", "month")): sampled = equity.resample(freq).last().dropna() starts = equity.resample(freq).first().reindex(sampled.index) returns = sampled / starts - 1 period_freq = "Y" if freq == "YE" else "M" periods = sampled.index.tz_localize(None).to_period(period_freq) for period, value in zip(periods.astype(str), returns): scoped = [ trade for trade in trades if pd.Timestamp(trade["entry_time"]).tz_localize(None).to_period(period_freq) == pd.Period(period) ] rows.append({"bucket": label_name, "period": period, "return": float(value), "trades": len(scoped)}) return pd.DataFrame(rows) def build_specs() -> list[Spec]: specs: list[Spec] = [] for symbol in SYMBOLS: for bar in ("1H", "2H", "4H"): for side_mode in ("short", "bidir"): for range_lookback in (12, 24, 36): for compression_quantile in (0.15, 0.25, 0.35): for sweep_pct in (0.001, 0.002, 0.0035): specs.append( Spec( symbol=symbol, bar=bar, side_mode=side_mode, range_lookback=range_lookback, compression_window=240, compression_quantile=compression_quantile, sweep_pct=sweep_pct, htf_slow=80, htf_slope_lookback=6, stop_pct=0.025, take_pct=0.035, hold=36, ) ) return specs def row_for_spec(spec: Spec, equity: pd.Series, trades: list[dict[str, object]]) -> dict[str, object]: row: dict[str, object] = {"name": spec.name, "symbol": spec.symbol, "bar": spec.bar, "side_mode": spec.side_mode} for label, offset in HORIZONS: metrics = period_metrics(equity, trades, offset) for key, value in metrics.items(): row[f"{label}_{key}"] = value return row def markdown_table(frame: pd.DataFrame) -> str: def cell(value: object) -> str: if isinstance(value, float): return f"{value:.4f}" return str(value).replace("|", "\\|") rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(cell(value) for value in row) + " |" for row in rows) def markdown_report(totals: pd.DataFrame, selected: pd.Series, stability: pd.DataFrame) -> str: metric_rows = [] for label, _ in HORIZONS: metric_rows.append( { "period": label, "total_return": selected[f"{label}_total_return"], "annualized_return": selected[f"{label}_annualized_return"], "max_drawdown": selected[f"{label}_max_drawdown"], "win_rate": selected[f"{label}_win_rate"], "profit_factor": selected[f"{label}_profit_factor"], "trades": selected[f"{label}_trades"], } ) years = stability[stability["bucket"] == "year"] months = stability[stability["bucket"] == "month"] active_months = months[months["trades"] > 0] losing_years = int((years["return"] < 0).sum()) losing_active_months = int((active_months["return"] < 0).sum()) active_month_ratio = len(active_months) / len(months) if len(months) else 0.0 verdict = "not worth continuing" if ( selected["full_profit_factor"] >= 1.15 and selected["full_max_drawdown"] <= 0.25 and selected["3y_total_return"] > 0 and selected["1y_total_return"] > 0 and selected["6m_total_return"] > 0 and selected["3m_total_return"] > 0 and selected["1y_trades"] >= 10 and active_month_ratio >= 0.4 and losing_years <= 2 ): verdict = "worth continuing with a narrower robustness pass" keep = [ "name", "symbol", "bar", "side_mode", "full_total_return", "full_annualized_return", "full_max_drawdown", "full_win_rate", "full_profit_factor", "full_trades", "3y_total_return", "1y_total_return", "6m_total_return", "3m_total_return", ] short_only = totals[totals["side_mode"] == "short"].head(5) return ( "# ETH/BTC False Breakout Reversal Search\n\n" "Scope: local OKX ETH/BTC candle CSV only; no live trading. " "Excluded: staged entry, ETH/BTC relative momentum, crash-follow, calendar/time buckets, trend_exhaustion.\n\n" "First-principles signal: a narrow rolling range means recent price agreement is compressed. " "If price sweeps the upper boundary but closes back inside while the higher-timeframe EMA slope is non-positive, " "the breakout has failed and a short targets reversion toward the range center. " "The bidirectional variant mirrors this for lower-boundary failures only for comparison.\n\n" f"Selected candidate: `{selected['name']}`.\n\n" f"Verdict: {verdict}.\n\n" "## Selected metrics\n\n" f"{markdown_table(pd.DataFrame(metric_rows))}\n\n" "## Stability\n\n" f"Years: {len(years)}, losing years: {losing_years}. " f"Months: {len(months)}, active months: {len(active_months)}, losing active months: {losing_active_months}, " f"active month ratio: {active_month_ratio:.4f}.\n\n" f"{markdown_table(years[['period', 'return', 'trades']])}\n\n" "Worst active months:\n\n" f"{markdown_table(active_months.sort_values('return').head(12)[['period', 'return', 'trades']])}\n\n" "## Best short-only variants\n\n" f"{markdown_table(short_only[keep])}\n\n" "## Top 10\n\n" f"{markdown_table(totals.head(10)[keep])}\n" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() raw_frames = {symbol: load_frame(symbol) for symbol in SYMBOLS} frames = {(symbol, bar): resample(raw_frames[symbol], bar) for symbol in SYMBOLS for bar in ("1H", "2H", "4H")} rows = [] curves: dict[str, pd.Series] = {} trade_sets: dict[str, list[dict[str, object]]] = {} for index, spec in enumerate(build_specs(), start=1): equity, trades = run_spec(spec, frames[(spec.symbol, spec.bar)]) rows.append(row_for_spec(spec, equity, trades)) curves[spec.name] = equity trade_sets[spec.name] = trades if index % 100 == 0: print(f"done {index}", flush=True) totals = pd.DataFrame(rows).sort_values( ["full_total_return", "3y_total_return", "1y_total_return", "full_profit_factor"], ascending=[False, False, False, False], ) viable = totals[ (totals["full_trades"] >= 30) & (totals["full_profit_factor"] > 1) & (totals["3y_total_return"] > 0) & (totals["1y_total_return"] > 0) & (totals["6m_total_return"] > 0) & (totals["3m_total_return"] > 0) ] selected = (viable if len(viable) else totals).iloc[0] stability = stability_rows(curves[str(selected["name"])], trade_sets[str(selected["name"])]) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / "eth-btc-false-breakout-reversal-totals.csv" stability_path = args.output_dir / "eth-btc-false-breakout-reversal-stability.csv" report_path = args.output_dir / "eth-btc-false-breakout-reversal-report.md" totals.to_csv(totals_path, index=False) stability.to_csv(stability_path, index=False) report_path.write_text(markdown_report(totals, selected, stability), encoding="utf-8") print(f"selected {selected['name']}") print(f"wrote {totals_path}, {stability_path}, {report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())