| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- from __future__ import annotations
- import argparse
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- DATA_DIR = Path("data/okx-candles")
- OUTPUT_DIR = Path("reports/recent-regime")
- SYMBOLS = ("ETH-USDT-SWAP", "BTC-USDT-SWAP")
- WINDOW_DAYS = (90, 30, 14, 7)
- ROUNDTRIP_COST = 0.0021
- @dataclass(frozen=True)
- class Candidate:
- family: str
- name: str
- side: str
- lookback: int
- threshold: float
- stop: float
- take: float
- hold: int
- squeeze_quantile: float | None = None
- btc_lead: int | None = None
- def load_frame(symbol: str, bar: str) -> pd.DataFrame:
- frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv")
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts")
- def aligned_frames(bar: str) -> pd.DataFrame:
- eth = load_frame("ETH-USDT-SWAP", bar)
- btc = load_frame("BTC-USDT-SWAP", bar)
- joined = eth.add_prefix("eth_").join(btc.add_prefix("btc_"), how="inner")
- joined["eth_ret"] = joined["eth_close"].pct_change()
- joined["btc_ret"] = joined["btc_close"].pct_change()
- joined["ratio"] = joined["eth_close"] / joined["btc_close"]
- return joined.dropna()
- def max_drawdown(values: pd.Series) -> float:
- peak = values.cummax()
- return float(((peak - values) / peak).max()) if len(values) else 0.0
- def annualized_return(total: float, days: float) -> float:
- return (1.0 + total) ** (365.0 / days) - 1.0 if total > -1.0 and days > 0.0 else -1.0
- def style_row(frame: pd.DataFrame, days: int) -> dict[str, object]:
- scoped = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)]
- eth_total = float(scoped["eth_close"].iloc[-1] / scoped["eth_close"].iloc[0] - 1.0)
- btc_total = float(scoped["btc_close"].iloc[-1] / scoped["btc_close"].iloc[0] - 1.0)
- ratio_total = float(scoped["ratio"].iloc[-1] / scoped["ratio"].iloc[0] - 1.0)
- eth_vol = float(scoped["eth_ret"].std(ddof=0) * (365 * 24 * 4) ** 0.5)
- btc_vol = float(scoped["btc_ret"].std(ddof=0) * (365 * 24 * 4) ** 0.5)
- eth_range = float(scoped["eth_high"].max() / scoped["eth_low"].min() - 1.0)
- btc_range = float(scoped["btc_high"].max() / scoped["btc_low"].min() - 1.0)
- eth_efficiency = abs(eth_total) / eth_range if eth_range else 0.0
- btc_efficiency = abs(btc_total) / btc_range if btc_range else 0.0
- return {
- "days": days,
- "start": scoped.index[0].strftime("%Y-%m-%d %H:%M"),
- "end": scoped.index[-1].strftime("%Y-%m-%d %H:%M"),
- "eth_total_return": eth_total,
- "btc_total_return": btc_total,
- "eth_btc_ratio_return": ratio_total,
- "eth_ann_vol": eth_vol,
- "btc_ann_vol": btc_vol,
- "eth_high_low_range": eth_range,
- "btc_high_low_range": btc_range,
- "eth_trend_efficiency": eth_efficiency,
- "btc_trend_efficiency": btc_efficiency,
- "eth_max_drawdown": max_drawdown(scoped["eth_close"]),
- "btc_max_drawdown": max_drawdown(scoped["btc_close"]),
- }
- def false_breakout_rows(frame: pd.DataFrame) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- for days in WINDOW_DAYS:
- scoped = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)].copy()
- for symbol in ("eth", "btc"):
- prior_high = scoped[f"{symbol}_high"].shift(1).rolling(96).max()
- prior_low = scoped[f"{symbol}_low"].shift(1).rolling(96).min()
- close = scoped[f"{symbol}_close"]
- upper_sweep = (scoped[f"{symbol}_high"] > prior_high * 1.0015) & (close < prior_high)
- lower_sweep = (scoped[f"{symbol}_low"] < prior_low * 0.9985) & (close > prior_low)
- follow_24h = close.shift(-96) / close - 1.0
- upper_success = (follow_24h < 0.0) & upper_sweep
- lower_success = (follow_24h > 0.0) & lower_sweep
- rows.append(
- {
- "days": days,
- "symbol": symbol.upper(),
- "upper_failed_breakouts": int(upper_sweep.sum()),
- "upper_24h_reversal_rate": float(upper_success.sum() / upper_sweep.sum()) if int(upper_sweep.sum()) else 0.0,
- "lower_failed_breakouts": int(lower_sweep.sum()),
- "lower_24h_reversal_rate": float(lower_success.sum() / lower_sweep.sum()) if int(lower_sweep.sum()) else 0.0,
- }
- )
- return pd.DataFrame(rows)
- def lead_lag_rows(frame: pd.DataFrame) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- for days in WINDOW_DAYS:
- scoped = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)]
- for lag in (1, 2, 4, 8, 16):
- rows.append(
- {
- "days": days,
- "lag_bars": lag,
- "lag_minutes": lag * 15,
- "btc_leads_eth_corr": float(scoped["btc_ret"].shift(lag).corr(scoped["eth_ret"])),
- "eth_leads_btc_corr": float(scoped["eth_ret"].shift(lag).corr(scoped["btc_ret"])),
- "same_bar_corr": float(scoped["btc_ret"].corr(scoped["eth_ret"])),
- }
- )
- return pd.DataFrame(rows)
- def squeeze_signal_rows(frame: pd.DataFrame) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- close = frame["eth_close"]
- mid = close.rolling(96).mean()
- width = (mid + 2 * close.rolling(96).std(ddof=0) - (mid - 2 * close.rolling(96).std(ddof=0))) / mid
- threshold = width.rolling(960).quantile(0.20)
- squeeze = width <= threshold
- future_abs = close.shift(-96) / close - 1.0
- for days in WINDOW_DAYS:
- scoped_index = frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=days)].index
- scoped_squeeze = squeeze.loc[scoped_index].fillna(False)
- event_returns = future_abs.loc[scoped_index][scoped_squeeze].abs().dropna()
- base_returns = future_abs.loc[scoped_index].abs().dropna()
- rows.append(
- {
- "days": days,
- "squeeze_bars": int(scoped_squeeze.sum()),
- "squeeze_bar_rate": float(scoped_squeeze.mean()),
- "median_24h_abs_move_after_squeeze": float(event_returns.median()) if len(event_returns) else 0.0,
- "baseline_median_24h_abs_move": float(base_returns.median()) if len(base_returns) else 0.0,
- "signal_lift": float(event_returns.median() / base_returns.median()) if len(event_returns) and float(base_returns.median()) else 0.0,
- "current_squeeze": bool(squeeze.iloc[-1]),
- "current_bandwidth": float(width.iloc[-1]),
- "current_threshold": float(threshold.iloc[-1]),
- }
- )
- return pd.DataFrame(rows)
- def build_candidates() -> list[Candidate]:
- candidates: list[Candidate] = []
- for lookback in (16, 32, 64):
- for threshold in (0.006, 0.010, 0.014):
- for stop, take in ((0.006, 0.010), (0.008, 0.014), (0.010, 0.018)):
- candidates.append(Candidate("eth_momentum", f"eth-momo-lb{lookback}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", lookback, threshold, stop, take, 64))
- candidates.append(Candidate("eth_reversal", f"eth-revert-lb{lookback}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", lookback, threshold, stop, take, 64))
- for btc_lead in (1, 4, 8):
- candidates.append(Candidate("btc_lead_eth_follow", f"btc-lead{btc_lead}-lb{lookback}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", lookback, threshold, stop, take, 64, btc_lead=btc_lead))
- for quantile in (0.15, 0.20, 0.25):
- for threshold in (0.004, 0.007):
- for stop, take in ((0.006, 0.012), (0.008, 0.016)):
- candidates.append(Candidate("squeeze_breakout", f"squeeze-q{quantile:g}-th{threshold:g}-sl{stop:g}-tp{take:g}", "long", 96, threshold, stop, take, 96, squeeze_quantile=quantile))
- return candidates
- def candidate_signal(candidate: Candidate, frame: pd.DataFrame, index: int, squeeze: pd.Series | None) -> str | None:
- eth_return = frame["eth_close"].iloc[index] / frame["eth_close"].iloc[index - candidate.lookback] - 1.0
- if candidate.family == "eth_momentum" and eth_return >= candidate.threshold:
- return "long"
- if candidate.family == "eth_reversal" and eth_return <= -candidate.threshold:
- return "long"
- if candidate.family == "btc_lead_eth_follow":
- btc_index = index - int(candidate.btc_lead)
- btc_return = frame["btc_close"].iloc[btc_index] / frame["btc_close"].iloc[btc_index - candidate.lookback] - 1.0
- if btc_return >= candidate.threshold and eth_return >= 0.0:
- return "long"
- if candidate.family == "squeeze_breakout" and squeeze is not None:
- prior_return = frame["eth_close"].iloc[index] / frame["eth_close"].iloc[index - 4] - 1.0
- if bool(squeeze.iloc[index]) and prior_return >= candidate.threshold:
- return "long"
- return None
- def run_candidate(candidate: Candidate, frame: pd.DataFrame) -> tuple[pd.Series, list[dict[str, object]]]:
- close = frame["eth_close"]
- width = None
- if candidate.family == "squeeze_breakout":
- mid = close.rolling(96).mean()
- std = close.rolling(96).std(ddof=0)
- width = ((mid + 2 * std) - (mid - 2 * std)) / mid
- squeeze = width <= width.rolling(960).quantile(float(candidate.squeeze_quantile))
- else:
- squeeze = None
- warmup = max(candidate.lookback + 2, 960 if candidate.family == "squeeze_breakout" else 0)
- equity = 10_000.0
- position: dict[str, object] | None = None
- pending_entry: str | None = None
- curve: list[tuple[pd.Timestamp, float]] = []
- trades: list[dict[str, object]] = []
- rows = list(frame.itertuples())
- for index in range(warmup, len(rows)):
- row = rows[index]
- ts = frame.index[index]
- if pending_entry is not None and position is None:
- position = {
- "entry_index": index,
- "entry_time": ts,
- "entry_price": float(row.eth_open),
- "stop": float(row.eth_open) * (1.0 - candidate.stop),
- "take": float(row.eth_open) * (1.0 + candidate.take),
- }
- pending_entry = None
- mark = equity
- if position is not None:
- stop_hit = float(row.eth_low) <= float(position["stop"])
- take_hit = float(row.eth_high) >= float(position["take"])
- hold_hit = index - int(position["entry_index"]) >= candidate.hold
- if stop_hit or take_hit or hold_hit:
- exit_price = float(position["stop"] if stop_hit else position["take"] if take_hit else row.eth_close)
- gross = exit_price / float(position["entry_price"]) - 1.0
- net = gross - ROUNDTRIP_COST
- equity *= 1.0 + net
- trades.append({"entry_time": position["entry_time"], "exit_time": ts, "return": net})
- position = None
- mark = equity
- else:
- mark = equity * (float(row.eth_close) / float(position["entry_price"]))
- curve.append((ts, mark))
- if index == len(rows) - 1:
- continue
- if position is None and candidate_signal(candidate, frame, index, squeeze) == "long":
- pending_entry = "long"
- return pd.Series({ts: value for ts, value in curve}).sort_index(), trades
- def metrics_for(equity: pd.Series, trades: list[dict[str, object]], days: int) -> dict[str, object]:
- scoped = equity.loc[equity.index >= equity.index[-1] - pd.Timedelta(days=days)]
- start = float(scoped.iloc[0])
- end = float(scoped.iloc[-1])
- returns = [float(trade["return"]) for trade in trades if pd.Timestamp(trade["entry_time"]) >= scoped.index[0]]
- wins = [value for value in returns if value > 0.0]
- losses = [value for value in returns if value < 0.0]
- total = end / start - 1.0
- return {
- f"{days}d_total_return": total,
- f"{days}d_annualized_return": annualized_return(total, days),
- f"{days}d_max_drawdown": max_drawdown(scoped),
- f"{days}d_trades": len(returns),
- f"{days}d_win_rate": len(wins) / len(returns) if returns else 0.0,
- f"{days}d_profit_factor": sum(wins) / abs(sum(losses)) if losses else (999.0 if wins else 0.0),
- }
- def search_candidates(frame: pd.DataFrame) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- for candidate in build_candidates():
- equity, trades = run_candidate(candidate, frame)
- if not len(equity):
- continue
- row: dict[str, object] = {"family": candidate.family, "name": candidate.name}
- for days in WINDOW_DAYS:
- row.update(metrics_for(equity, trades, days))
- row["min_recent_return"] = min(float(row[f"{days}d_total_return"]) for days in (30, 14, 7))
- row["max_recent_drawdown"] = max(float(row[f"{days}d_max_drawdown"]) for days in (30, 14, 7))
- row["recent_trades"] = sum(int(row[f"{days}d_trades"]) for days in (30, 14, 7))
- rows.append(row)
- return pd.DataFrame(rows).sort_values(["min_recent_return", "90d_total_return", "max_recent_drawdown"], ascending=[False, False, True])
- def pct(value: float) -> str:
- return f"{value:.2%}"
- def markdown_table(frame: pd.DataFrame) -> str:
- rows = [list(frame.columns), ["---" for _ in frame.columns]]
- rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist())
- out = []
- for row in rows:
- out.append("| " + " | ".join(f"{value:.4f}" if isinstance(value, float) else str(value).replace("|", "\\|") for value in row) + " |")
- return "\n".join(out)
- def write_report(style: pd.DataFrame, false_breakouts: pd.DataFrame, lead_lag: pd.DataFrame, squeeze: pd.DataFrame, candidates: pd.DataFrame, paths: list[Path]) -> str:
- latest = style.iloc[-1]
- top = candidates.head(10)
- best_lag = lead_lag.sort_values(["days", "btc_leads_eth_corr"], ascending=[True, False]).groupby("days", observed=True).head(1)
- current_squeeze = squeeze.iloc[-1]
- lines = [
- "# Recent ETH/BTC Regime Analysis",
- "",
- "Scope: local OKX 15m CSV only; no network, no live executor, no orders.",
- "",
- "Output files:",
- *[f"- `{path}`" for path in paths],
- "",
- "## Regime Windows",
- markdown_table(style),
- "",
- "## BTC Lead/Lag Best Rows",
- markdown_table(best_lag[["days", "lag_minutes", "btc_leads_eth_corr", "eth_leads_btc_corr", "same_bar_corr"]]),
- "",
- "## False Breakout Profile",
- markdown_table(false_breakouts),
- "",
- "## BB Squeeze Quality",
- markdown_table(squeeze),
- "",
- "## Candidate Search Top 10",
- markdown_table(top[["family", "name", "90d_total_return", "30d_total_return", "14d_total_return", "7d_total_return", "max_recent_drawdown", "recent_trades"]]),
- "",
- "## Read",
- f"- Latest window in this run ends at `{latest['end']}`.",
- f"- ETH 7d return is {pct(float(latest['eth_total_return']))}; BTC 7d return is {pct(float(latest['btc_total_return']))}; ETH/BTC ratio 7d return is {pct(float(latest['eth_btc_ratio_return']))}.",
- f"- Current ETH BB squeeze flag is `{bool(current_squeeze['current_squeeze'])}` with bandwidth {float(current_squeeze['current_bandwidth']):.4f} vs threshold {float(current_squeeze['current_threshold']):.4f}.",
- "- Ranking is deliberately recent-regime first: min(30d, 14d, 7d) return, then 90d return, then lower recent drawdown.",
- ]
- return "\n".join(lines) + "\n"
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--bar", default="15m")
- parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR)
- args = parser.parse_args()
- frame = aligned_frames(args.bar)
- style = pd.DataFrame([style_row(frame, days) for days in WINDOW_DAYS])
- false_breakouts = false_breakout_rows(frame)
- lead_lag = lead_lag_rows(frame)
- squeeze = squeeze_signal_rows(frame)
- candidates = search_candidates(frame.loc[frame.index >= frame.index[-1] - pd.Timedelta(days=120)])
- args.output_dir.mkdir(parents=True, exist_ok=True)
- stamp = frame.index[-1].strftime("%Y%m%d-%H%M")
- style_path = args.output_dir / f"analysis-{stamp}-regime.csv"
- false_path = args.output_dir / f"analysis-{stamp}-false-breakouts.csv"
- lead_path = args.output_dir / f"analysis-{stamp}-lead-lag.csv"
- squeeze_path = args.output_dir / f"analysis-{stamp}-squeeze.csv"
- candidates_path = args.output_dir / f"analysis-{stamp}-candidates.csv"
- report_path = args.output_dir / f"analysis-{stamp}-report.md"
- paths = [style_path, false_path, lead_path, squeeze_path, candidates_path, report_path]
- style.to_csv(style_path, index=False)
- false_breakouts.to_csv(false_path, index=False)
- lead_lag.to_csv(lead_path, index=False)
- squeeze.to_csv(squeeze_path, index=False)
- candidates.to_csv(candidates_path, index=False)
- report_path.write_text(write_report(style, false_breakouts, lead_lag, squeeze, candidates, paths), encoding="utf-8")
- print(report_path)
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|