from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/recent-regime") SYMBOLS = ("ETH-USDT-SWAP", "BTC-USDT-SWAP") WINDOW_DAYS = (90, 30, 14, 7) ROUNDTRIP_COST = 0.0021 @dataclass(frozen=True) class Candidate: family: str name: str side: str lookback: int threshold: float stop: float take: float hold: int squeeze_quantile: float | None = None btc_lead: int | None = None def load_frame(symbol: str, bar: str) -> pd.DataFrame: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") def aligned_frames(bar: str) -> pd.DataFrame: eth = load_frame("ETH-USDT-SWAP", bar) btc = load_frame("BTC-USDT-SWAP", bar) joined = eth.add_prefix("eth_").join(btc.add_prefix("btc_"), how="inner") joined["eth_ret"] = joined["eth_close"].pct_change() joined["btc_ret"] = joined["btc_close"].pct_change() joined["ratio"] = joined["eth_close"] / joined["btc_close"] return joined.dropna() def max_drawdown(values: pd.Series) -> float: peak = values.cummax() return float(((peak - values) / peak).max()) if len(values) else 0.0 def annualized_return(total: float, days: float) -> float: return (1.0 + total) ** (365.0 / days) - 1.0 if total > -1.0 and days > 0.0 else -1.0 def style_row(frame: pd.DataFrame, days: int) -> dict[str, object]: scoped = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)] eth_total = float(scoped["eth_close"].iloc[-1] / scoped["eth_close"].iloc[0] - 1.0) btc_total = float(scoped["btc_close"].iloc[-1] / scoped["btc_close"].iloc[0] - 1.0) ratio_total = float(scoped["ratio"].iloc[-1] / scoped["ratio"].iloc[0] - 1.0) eth_vol = float(scoped["eth_ret"].std(ddof=0) * (365 * 24 * 4) ** 0.5) btc_vol = float(scoped["btc_ret"].std(ddof=0) * (365 * 24 * 4) ** 0.5) eth_range = float(scoped["eth_high"].max() / scoped["eth_low"].min() - 1.0) btc_range = float(scoped["btc_high"].max() / scoped["btc_low"].min() - 1.0) eth_efficiency = abs(eth_total) / eth_range if eth_range else 0.0 btc_efficiency = abs(btc_total) / btc_range if btc_range else 0.0 return { "days": days, "start": scoped.index[0].strftime("%Y-%m-%d %H:%M"), "end": scoped.index[-1].strftime("%Y-%m-%d %H:%M"), "eth_total_return": eth_total, "btc_total_return": btc_total, "eth_btc_ratio_return": ratio_total, "eth_ann_vol": eth_vol, "btc_ann_vol": btc_vol, "eth_high_low_range": eth_range, "btc_high_low_range": btc_range, "eth_trend_efficiency": eth_efficiency, "btc_trend_efficiency": btc_efficiency, "eth_max_drawdown": max_drawdown(scoped["eth_close"]), "btc_max_drawdown": max_drawdown(scoped["btc_close"]), } def false_breakout_rows(frame: pd.DataFrame) -> pd.DataFrame: rows: list[dict[str, object]] = [] for days in WINDOW_DAYS: scoped = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)].copy() for symbol in ("eth", "btc"): prior_high = scoped[f"{symbol}_high"].shift(1).rolling(96).max() prior_low = scoped[f"{symbol}_low"].shift(1).rolling(96).min() close = scoped[f"{symbol}_close"] upper_sweep = (scoped[f"{symbol}_high"] > prior_high * 1.0015) & (close < prior_high) lower_sweep = (scoped[f"{symbol}_low"] < prior_low * 0.9985) & (close > prior_low) follow_24h = close.shift(-96) / close - 1.0 upper_success = (follow_24h < 0.0) & upper_sweep lower_success = (follow_24h > 0.0) & lower_sweep rows.append( { "days": days, "symbol": symbol.upper(), "upper_failed_breakouts": int(upper_sweep.sum()), "upper_24h_reversal_rate": float(upper_success.sum() / upper_sweep.sum()) if int(upper_sweep.sum()) else 0.0, "lower_failed_breakouts": int(lower_sweep.sum()), "lower_24h_reversal_rate": float(lower_success.sum() / lower_sweep.sum()) if int(lower_sweep.sum()) else 0.0, } ) return pd.DataFrame(rows) def lead_lag_rows(frame: pd.DataFrame) -> pd.DataFrame: rows: list[dict[str, object]] = [] for days in WINDOW_DAYS: scoped = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)] for lag in (1, 2, 4, 8, 16): rows.append( { "days": days, "lag_bars": lag, "lag_minutes": lag * 15, "btc_leads_eth_corr": float(scoped["btc_ret"].shift(lag).corr(scoped["eth_ret"])), "eth_leads_btc_corr": float(scoped["eth_ret"].shift(lag).corr(scoped["btc_ret"])), "same_bar_corr": float(scoped["btc_ret"].corr(scoped["eth_ret"])), } ) return pd.DataFrame(rows) def squeeze_signal_rows(frame: pd.DataFrame) -> pd.DataFrame: rows: list[dict[str, object]] = [] close = frame["eth_close"] mid = close.rolling(96).mean() width = (mid + 2 * close.rolling(96).std(ddof=0) - (mid - 2 * close.rolling(96).std(ddof=0))) / mid threshold = width.rolling(960).quantile(0.20) squeeze = width <= threshold future_abs = close.shift(-96) / close - 1.0 for days in WINDOW_DAYS: scoped_index = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)].index scoped_squeeze = squeeze.loc[scoped_index].fillna(False) event_returns = future_abs.loc[scoped_index][scoped_squeeze].abs().dropna() base_returns = future_abs.loc[scoped_index].abs().dropna() rows.append( { "days": days, "squeeze_bars": int(scoped_squeeze.sum()), "squeeze_bar_rate": float(scoped_squeeze.mean()), "median_24h_abs_move_after_squeeze": float(event_returns.median()) if len(event_returns) else 0.0, "baseline_median_24h_abs_move": float(base_returns.median()) if len(base_returns) else 0.0, "signal_lift": float(event_returns.median() / base_returns.median()) if len(event_returns) and float(base_returns.median()) else 0.0, "current_squeeze": bool(squeeze.iloc[-1]), "current_bandwidth": float(width.iloc[-1]), "current_threshold": float(threshold.iloc[-1]), } ) return pd.DataFrame(rows) def build_candidates() -> list[Candidate]: candidates: list[Candidate] = [] for lookback in (16, 32, 64): for threshold in (0.006, 0.010, 0.014): for stop, take in ((0.006, 0.010), (0.008, 0.014), (0.010, 0.018)): candidates.append(Candidate("eth_momentum", f"eth-momo-lb{lookback}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", lookback, threshold, stop, take, 64)) candidates.append(Candidate("eth_reversal", f"eth-revert-lb{lookback}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", lookback, threshold, stop, take, 64)) for btc_lead in (1, 4, 8): candidates.append(Candidate("btc_lead_eth_follow", f"btc-lead{btc_lead}-lb{lookback}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", lookback, threshold, stop, take, 64, btc_lead=btc_lead)) for quantile in (0.15, 0.20, 0.25): for threshold in (0.004, 0.007): for stop, take in ((0.006, 0.012), (0.008, 0.016)): candidates.append(Candidate("squeeze_breakout", f"squeeze-q{quantile:g}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", 96, threshold, stop, take, 96, squeeze_quantile=quantile)) return candidates def candidate_signal(candidate: Candidate, frame: pd.DataFrame, index: int, squeeze: pd.Series | None) -> str | None: eth_return = frame["eth_close"].iloc[index] / frame["eth_close"].iloc[index - candidate.lookback] - 1.0 if candidate.family == "eth_momentum" and eth_return >= candidate.threshold: return "long" if candidate.family == "eth_reversal" and eth_return <= -candidate.threshold: return "long" if candidate.family == "btc_lead_eth_follow": btc_index = index - int(candidate.btc_lead) btc_return = frame["btc_close"].iloc[btc_index] / frame["btc_close"].iloc[btc_index - candidate.lookback] - 1.0 if btc_return >= candidate.threshold and eth_return >= 0.0: return "long" if candidate.family == "squeeze_breakout" and squeeze is not None: prior_return = frame["eth_close"].iloc[index] / frame["eth_close"].iloc[index - 4] - 1.0 if bool(squeeze.iloc[index]) and prior_return >= candidate.threshold: return "long" return None def run_candidate(candidate: Candidate, frame: pd.DataFrame) -> tuple[pd.Series, list[dict[str, object]]]: close = frame["eth_close"] width = None if candidate.family == "squeeze_breakout": mid = close.rolling(96).mean() std = close.rolling(96).std(ddof=0) width = ((mid + 2 * std) - (mid - 2 * std)) / mid squeeze = width <= width.rolling(960).quantile(float(candidate.squeeze_quantile)) else: squeeze = None warmup = max(candidate.lookback + 2, 960 if candidate.family == "squeeze_breakout" else 0) equity = 10_000.0 position: dict[str, object] | None = None pending_entry: str | None = None curve: list[tuple[pd.Timestamp, float]] = [] trades: list[dict[str, object]] = [] rows = list(frame.itertuples()) for index in range(warmup, len(rows)): row = rows[index] ts = frame.index[index] if pending_entry is not None and position is None: position = { "entry_index": index, "entry_time": ts, "entry_price": float(row.eth_open), "stop": float(row.eth_open) * (1.0 - candidate.stop), "take": float(row.eth_open) * (1.0 + candidate.take), } pending_entry = None mark = equity if position is not None: stop_hit = float(row.eth_low) <= float(position["stop"]) take_hit = float(row.eth_high) >= float(position["take"]) hold_hit = index - int(position["entry_index"]) >= candidate.hold if stop_hit or take_hit or hold_hit: exit_price = float(position["stop"] if stop_hit else position["take"] if take_hit else row.eth_close) gross = exit_price / float(position["entry_price"]) - 1.0 net = gross - ROUNDTRIP_COST equity *= 1.0 + net trades.append({"entry_time": position["entry_time"], "exit_time": ts, "return": net}) position = None mark = equity else: mark = equity * (float(row.eth_close) / float(position["entry_price"])) curve.append((ts, mark)) if index == len(rows) - 1: continue if position is None and candidate_signal(candidate, frame, index, squeeze) == "long": pending_entry = "long" return pd.Series({ts: value for ts, value in curve}).sort_index(), trades def metrics_for(equity: pd.Series, trades: list[dict[str, object]], days: int) -> dict[str, object]: scoped = equity.loc[equity.index >= equity.index[-1] - pd.Timedelta(days=days)] start = float(scoped.iloc[0]) end = float(scoped.iloc[-1]) returns = [float(trade["return"]) for trade in trades if pd.Timestamp(trade["entry_time"]) >= scoped.index[0]] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] total = end / start - 1.0 return { f"{days}d_total_return": total, f"{days}d_annualized_return": annualized_return(total, days), f"{days}d_max_drawdown": max_drawdown(scoped), f"{days}d_trades": len(returns), f"{days}d_win_rate": len(wins) / len(returns) if returns else 0.0, f"{days}d_profit_factor": sum(wins) / abs(sum(losses)) if losses else (999.0 if wins else 0.0), } def search_candidates(frame: pd.DataFrame) -> pd.DataFrame: rows: list[dict[str, object]] = [] for candidate in build_candidates(): equity, trades = run_candidate(candidate, frame) if not len(equity): continue row: dict[str, object] = {"family": candidate.family, "name": candidate.name} for days in WINDOW_DAYS: row.update(metrics_for(equity, trades, days)) row["min_recent_return"] = min(float(row[f"{days}d_total_return"]) for days in (30, 14, 7)) row["max_recent_drawdown"] = max(float(row[f"{days}d_max_drawdown"]) for days in (30, 14, 7)) row["recent_trades"] = sum(int(row[f"{days}d_trades"]) for days in (30, 14, 7)) rows.append(row) return pd.DataFrame(rows).sort_values(["min_recent_return", "90d_total_return", "max_recent_drawdown"], ascending=[False, False, True]) def pct(value: float) -> str: return f"{value:.2%}" def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) out = [] for row in rows: out.append("| " + " | ".join(f"{value:.4f}" if isinstance(value, float) else str(value).replace("|", "\\|") for value in row) + " |") return "\n".join(out) def write_report(style: pd.DataFrame, false_breakouts: pd.DataFrame, lead_lag: pd.DataFrame, squeeze: pd.DataFrame, candidates: pd.DataFrame, paths: list[Path]) -> str: latest = style.iloc[-1] top = candidates.head(10) best_lag = lead_lag.sort_values(["days", "btc_leads_eth_corr"], ascending=[True, False]).groupby("days", observed=True).head(1) current_squeeze = squeeze.iloc[-1] lines = [ "# Recent ETH/BTC Regime Analysis", "", "Scope: local OKX 15m CSV only; no network, no live executor, no orders.", "", "Output files:", *[f"- `{path}`" for path in paths], "", "## Regime Windows", markdown_table(style), "", "## BTC Lead/Lag Best Rows", markdown_table(best_lag[["days", "lag_minutes", "btc_leads_eth_corr", "eth_leads_btc_corr", "same_bar_corr"]]), "", "## False Breakout Profile", markdown_table(false_breakouts), "", "## BB Squeeze Quality", markdown_table(squeeze), "", "## Candidate Search Top 10", markdown_table(top[["family", "name", "90d_total_return", "30d_total_return", "14d_total_return", "7d_total_return", "max_recent_drawdown", "recent_trades"]]), "", "## Read", f"- Latest window in this run ends at `{latest['end']}`.", f"- ETH 7d return is {pct(float(latest['eth_total_return']))}; BTC 7d return is {pct(float(latest['btc_total_return']))}; ETH/BTC ratio 7d return is {pct(float(latest['eth_btc_ratio_return']))}.", f"- Current ETH BB squeeze flag is `{bool(current_squeeze['current_squeeze'])}` with bandwidth {float(current_squeeze['current_bandwidth']):.4f} vs threshold {float(current_squeeze['current_threshold']):.4f}.", "- Ranking is deliberately recent-regime first: min(30d, 14d, 7d) return, then 90d return, then lower recent drawdown.", ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--bar", default="15m") parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) args = parser.parse_args() frame = aligned_frames(args.bar) style = pd.DataFrame([style_row(frame, days) for days in WINDOW_DAYS]) false_breakouts = false_breakout_rows(frame) lead_lag = lead_lag_rows(frame) squeeze = squeeze_signal_rows(frame) candidates = search_candidates(frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=120)]) args.output_dir.mkdir(parents=True, exist_ok=True) stamp = frame.index[-1].strftime("%Y%m%d-%H%M") style_path = args.output_dir / f"analysis-{stamp}-regime.csv" false_path = args.output_dir / f"analysis-{stamp}-false-breakouts.csv" lead_path = args.output_dir / f"analysis-{stamp}-lead-lag.csv" squeeze_path = args.output_dir / f"analysis-{stamp}-squeeze.csv" candidates_path = args.output_dir / f"analysis-{stamp}-candidates.csv" report_path = args.output_dir / f"analysis-{stamp}-report.md" paths = [style_path, false_path, lead_path, squeeze_path, candidates_path, report_path] style.to_csv(style_path, index=False) false_breakouts.to_csv(false_path, index=False) lead_lag.to_csv(lead_path, index=False) squeeze.to_csv(squeeze_path, index=False) candidates.to_csv(candidates_path, index=False) report_path.write_text(write_report(style, false_breakouts, lead_lag, squeeze, candidates, paths), encoding="utf-8") print(report_path) return 0 if __name__ == "__main__": raise SystemExit(main())