from __future__ import annotations import argparse import json from dataclasses import dataclass from itertools import product from pathlib import Path import pandas as pd DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/short-bias") PREFIX = "regime-alt" SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") BARS = ("4H", "1D") INITIAL_EQUITY = 10_000.0 TAKER_FEE = 0.0004 ROUNDTRIP_FEE = TAKER_FEE * 2 YEARS = 10.0 HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Strategy: family: str symbol: str bar: str params: dict[str, float | int] @property def name(self) -> str: params = "-".join(f"{key}{value}" for key, value in self.params.items()) return f"{self.family}-{self.symbol.split('-')[0].lower()}-{self.bar}-{params}" def load_15m_frame(symbol: str, years: float) -> pd.DataFrame: path = DATA_DIR / symbol / "15m.csv" if not path.exists(): raise FileNotFoundError(f"missing local cache: {path}") frame = pd.read_csv(path) frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True) frame = frame.sort_values("ts").drop_duplicates("ts", keep="last").set_index("ts") start = frame.index[-1] - pd.DateOffset(years=years) return frame[frame.index >= start] def resample_frame(frame: pd.DataFrame, bar: str) -> pd.DataFrame: rule = {"4H": "4h", "1D": "1D"}[bar] out = frame.resample(rule, label="left", closed="left").agg( open=("open", "first"), high=("high", "max"), low=("low", "min"), close=("close", "last"), volume=("volume", "sum"), ) return out.dropna() def true_range(frame: pd.DataFrame) -> pd.Series: previous = frame["close"].shift(1) return pd.concat( [ frame["high"] - frame["low"], (frame["high"] - previous).abs(), (frame["low"] - previous).abs(), ], axis=1, ).max(axis=1) def signal_frame(strategy: Strategy, frame: pd.DataFrame, btc_frame: pd.DataFrame | None) -> pd.DataFrame: close = frame["close"] high = frame["high"] low = frame["low"] p = strategy.params fast = close.ewm(span=int(p["fast"]), adjust=False).mean() slow = close.ewm(span=int(p["slow"]), adjust=False).mean() atr = true_range(frame).rolling(int(p["atr"])).mean() atr_pct = atr / close prior_low = low.shift(1).rolling(int(p["break_lookback"])).min() prior_high = high.shift(1).rolling(int(p["exit_lookback"])).max() broken_recently = (close < prior_low).rolling(int(p["retest_window"])).max().fillna(0.0).astype(bool) if strategy.family == "trend_break_retest_short": entry = broken_recently & (close < slow) & (high >= fast) & (close < fast) exit_ = (close > fast) | (close > prior_high) elif strategy.family == "vol_expansion_rebound_fail_short": vol_gate = atr_pct > atr_pct.rolling(int(p["vol_window"])).quantile(float(p["vol_quantile"])) failed_rebound = (high >= fast) & (close < fast) & (close < slow) entry = broken_recently & vol_gate & failed_rebound exit_ = (close > fast) | (atr_pct < atr_pct.rolling(int(p["vol_window"])).median()) elif strategy.family == "btc_weakness_transmission_short": if btc_frame is None: raise ValueError("btc_weakness_transmission_short requires BTC frame") aligned = pd.DataFrame({"asset_close": close, "asset_high": high}).join( btc_frame[["close"]].rename(columns={"close": "btc_close"}), how="inner" ) btc_close = aligned["btc_close"] btc_slow = btc_close.ewm(span=int(p["btc_slow"]), adjust=False).mean() btc_fast = btc_close.ewm(span=int(p["btc_fast"]), adjust=False).mean() btc_ret = btc_close / btc_close.shift(int(p["btc_lookback"])) - 1.0 asset_ret = aligned["asset_close"] / aligned["asset_close"].shift(int(p["asset_lookback"])) - 1.0 asset_fast = fast.reindex(aligned.index) asset_slow = slow.reindex(aligned.index) entry = ( (btc_close < btc_slow) & (btc_ret <= -float(p["btc_drop"])) & (asset_ret <= float(p["asset_max_return"])) & (aligned["asset_high"] >= asset_fast) & (aligned["asset_close"] < asset_fast) & (aligned["asset_close"] < asset_slow) ).reindex(frame.index, fill_value=False) exit_ = ((btc_close > btc_fast) | (aligned["asset_close"] > asset_fast)).reindex(frame.index, fill_value=False) else: raise ValueError(f"unknown family: {strategy.family}") return pd.DataFrame({"entry": entry.fillna(False), "exit": exit_.fillna(False), "atr": atr}, index=frame.index) def close_short(entry_price: float, exit_price: float, equity: float) -> tuple[float, float]: gross_return = entry_price / exit_price - 1.0 net_return = gross_return - ROUNDTRIP_FEE return max(0.0, equity * (1.0 + net_return)), net_return def run_strategy(strategy: Strategy, frame: pd.DataFrame, btc_frame: pd.DataFrame | None) -> dict[str, object]: signals = signal_frame(strategy, frame, btc_frame) warmup = max(int(strategy.params.get(key, 0)) for key in ("slow", "break_lookback", "exit_lookback", "atr", "vol_window", "btc_slow")) + 2 equity = INITIAL_EQUITY position: dict[str, float | int | pd.Timestamp] | None = None pending_entry = False pending_exit = False trades: list[dict[str, object]] = [] equity_curve: list[dict[str, object]] = [] rows = list(frame.itertuples()) for index in range(warmup, len(rows)): row = rows[index] ts = frame.index[index] if pending_exit and position is not None: equity, net_return = close_short(float(position["entry_price"]), float(row.open), equity) trades.append( { "side": "Short", "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"), "exit_time": ts.strftime("%Y-%m-%d %H:%M"), "entry_price": float(position["entry_price"]), "exit_price": float(row.open), "return": net_return, "hold_bars": index - int(position["entry_index"]), } ) position = None pending_exit = False if pending_entry and position is None and equity > 0.0: atr = float(signals["atr"].iloc[index - 1]) position = { "entry_time": ts, "entry_index": index, "entry_price": float(row.open), "stop_price": float(row.open) + atr * float(strategy.params["stop_atr"]), "take_price": max(0.01, float(row.open) - atr * float(strategy.params["take_atr"])), } pending_entry = False mark_equity = equity if position is not None: stop_hit = float(row.high) >= float(position["stop_price"]) take_hit = float(row.low) <= float(position["take_price"]) if stop_hit or take_hit: exit_price = float(position["stop_price"] if stop_hit else position["take_price"]) equity, net_return = close_short(float(position["entry_price"]), exit_price, equity) trades.append( { "side": "Short", "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"), "exit_time": ts.strftime("%Y-%m-%d %H:%M"), "entry_price": float(position["entry_price"]), "exit_price": exit_price, "return": net_return, "hold_bars": index - int(position["entry_index"]), } ) position = None mark_equity = equity if position is not None: gross_return = float(position["entry_price"]) / float(row.close) - 1.0 mark_equity = max(0.0, equity * (1.0 + gross_return - TAKER_FEE)) equity_curve.append({"ts": ts, "equity": mark_equity}) if index == len(rows) - 1 or equity <= 0.0: continue if position is not None: held = index - int(position["entry_index"]) if bool(signals["exit"].iloc[index]) or held >= int(strategy.params["max_hold"]): pending_exit = True elif bool(signals["entry"].iloc[index]): pending_entry = True if position is not None: last = rows[-1] ts = frame.index[-1] equity, net_return = close_short(float(position["entry_price"]), float(last.close), equity) trades.append( { "side": "Short", "entry_time": pd.Timestamp(position["entry_time"]).strftime("%Y-%m-%d %H:%M"), "exit_time": ts.strftime("%Y-%m-%d %H:%M"), "entry_price": float(position["entry_price"]), "exit_price": float(last.close), "return": net_return, "hold_bars": len(rows) - 1 - int(position["entry_index"]), } ) equity_curve.append({"ts": ts, "equity": equity}) return {"trades": trades, "equity_curve": equity_curve} def daily_equity(result: dict[str, object]) -> pd.Series: curve = pd.DataFrame(result["equity_curve"]) curve["ts"] = pd.to_datetime(curve["ts"], utc=True) series = curve.set_index("ts")["equity"].sort_index() series = pd.concat([pd.Series([INITIAL_EQUITY], index=[series.index[0].normalize()]), series]).sort_index() series = series.groupby(level=0).last() index = pd.date_range(series.index[0].normalize(), series.index[-1].normalize(), freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index) def equity_metrics(series: pd.Series) -> dict[str, float]: total = float(series.iloc[-1] / series.iloc[0] - 1.0) years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = float(((series.cummax() - series) / series.cummax()).max()) return { "total_return": total, "annualized_return": annual, "max_drawdown": drawdown, "calmar": annual / drawdown if drawdown else 0.0, } def trade_metrics(trades: list[dict[str, object]]) -> dict[str, float | int]: returns = [float(trade["return"]) for trade in trades] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] gross_profit = sum(wins) gross_loss = abs(sum(losses)) return { "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "profit_factor": gross_profit / gross_loss if gross_loss else 0.0, "short_trade_ratio": 1.0 if returns else 0.0, } def horizon_returns(series: pd.Series) -> dict[str, float]: out: dict[str, float] = {} end = series.index[-1] for label, offset in HORIZONS: scoped = series[series.index >= end - offset] out[f"return_{label}"] = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) if len(scoped) >= 2 else 0.0 return out def monthly_rows(strategy: Strategy, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() out = pd.DataFrame( { "name": strategy.name, "symbol": strategy.symbol, "bar": strategy.bar, "family": strategy.family, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) out["return"] = out["end_equity"] / out["start_equity"] - 1.0 return out def yearly_rows(strategy: Strategy, series: pd.Series) -> pd.DataFrame: yearly = series.resample("YE").last() out = pd.DataFrame( { "name": strategy.name, "symbol": strategy.symbol, "bar": strategy.bar, "family": strategy.family, "year": yearly.index.strftime("%Y"), "start_equity": yearly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": yearly.to_numpy(), } ) out["return"] = out["end_equity"] / out["start_equity"] - 1.0 return out def build_strategies() -> list[Strategy]: strategies: list[Strategy] = [] holds = {"4H": 90, "1D": 60} for symbol, bar, fast, slow, break_lookback, retest_window, exit_lookback, stop_atr, take_atr in product( SYMBOLS, BARS, (20, 30, 50), (80, 120, 200), (20, 55), (3, 6), (10, 20), (2.0, 3.0), (4.0, 6.0), ): if fast >= slow: continue base = { "fast": fast, "slow": slow, "break_lookback": break_lookback, "retest_window": retest_window, "exit_lookback": exit_lookback, "atr": 14, "stop_atr": stop_atr, "take_atr": take_atr, "max_hold": holds[bar], } strategies.append(Strategy("trend_break_retest_short", symbol, bar, base)) for vol_quantile in (0.75, 0.85): strategies.append(Strategy("vol_expansion_rebound_fail_short", symbol, bar, {**base, "vol_window": 120, "vol_quantile": vol_quantile})) for symbol in ("ETH-USDT-SWAP",): for bar, btc_lookback, btc_drop, asset_max_return, stop_atr, take_atr in product( BARS, (3, 6, 12), (0.015, 0.025, 0.04), (-0.02, 0.0, 0.01), (2.0, 3.0), (4.0, 6.0), ): strategies.append( Strategy( "btc_weakness_transmission_short", symbol, bar, { "fast": 20, "slow": 80, "break_lookback": 20, "retest_window": 3, "exit_lookback": 10, "atr": 14, "stop_atr": stop_atr, "take_atr": take_atr, "max_hold": holds[bar], "btc_fast": 20, "btc_slow": 80, "btc_lookback": btc_lookback, "asset_lookback": 3, "btc_drop": btc_drop, "asset_max_return": asset_max_return, }, ) ) return strategies def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def markdown_table(frame: pd.DataFrame) -> str: rows = [list(frame.columns), ["---" for _ in frame.columns]] rows.extend(frame.astype(object).where(pd.notna(frame), "").values.tolist()) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def read_only_decision(row: pd.Series) -> str: if ( row["trades"] >= 30 and row["total_return"] > 0.0 and row["return_3y"] > 0.0 and row["return_1y"] > 0.0 and row["return_6m"] > 0.0 and row["return_3m"] > 0.0 and row["profit_factor"] >= 1.2 and row["calmar"] >= 0.6 ): return "worth_readonly_observation" return "do_not_connect" def markdown_report(command: str, paths: list[Path], totals: pd.DataFrame, monthly: pd.DataFrame, yearly: pd.DataFrame, qualified: pd.DataFrame) -> str: top = qualified.head(10) if len(qualified) else totals.head(10) top_names = set(top.head(3)["name"]) conclusion = ( f"{len(qualified)} candidates are worth read-only observation under the explicit filter." if len(qualified) else "No candidate is worth connecting to read-only observation under the explicit filter." ) lines = [ "# Short-Bias Regime Alt Search", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: BTC-USDT-SWAP and ETH-USDT-SWAP perpetuals, resampled from local 15m cache to 4H/1D. SOL was not included because this repository has no local SOL candle cache.", f"Cost: {TAKER_FEE:.2%} single-side taker fee; closed trades subtract {ROUNDTRIP_FEE:.2%} roundtrip.", "Candidate families: trend break retest short, BTC weakness transmission to ETH short, and volatility expansion rebound-failure short.", "", f"Conclusion: {conclusion}", "", "## Top candidates", "", markdown_table( top[ [ "name", "symbol", "bar", "family", "total_return", "annualized_return", "max_drawdown", "calmar", "trades", "win_rate", "profit_factor", "short_trade_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "readonly_observation", ] ] ), "", "## Monthly returns for top 3", "", markdown_table(monthly[monthly["name"].isin(top_names)].tail(120)), "", "## Year distribution for top 3", "", markdown_table(yearly[yearly["name"].isin(top_names)]), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int, default=0) args = parser.parse_args() raw = {symbol: load_15m_frame(symbol, args.years) for symbol in SYMBOLS} data = {(symbol, bar): resample_frame(raw[symbol], bar) for symbol in SYMBOLS for bar in BARS} strategies = build_strategies() if args.max_candidates: strategies = strategies[: args.max_candidates] total_rows: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] yearly_frames: list[pd.DataFrame] = [] trade_rows: list[dict[str, object]] = [] for index, strategy in enumerate(strategies, start=1): btc_frame = data[("BTC-USDT-SWAP", strategy.bar)] if strategy.family == "btc_weakness_transmission_short" else None result = run_strategy(strategy, data[(strategy.symbol, strategy.bar)], btc_frame) if not result["equity_curve"]: continue series = daily_equity(result) monthly = monthly_rows(strategy, series) yearly = yearly_rows(strategy, series) trades = list(result["trades"]) row = { "name": strategy.name, "symbol": strategy.symbol, "bar": strategy.bar, "family": strategy.family, "first_day": series.index[0].strftime("%Y-%m-%d"), "last_day": series.index[-1].strftime("%Y-%m-%d"), "fee_single_side": TAKER_FEE, "roundtrip_fee": ROUNDTRIP_FEE, "worst_month_return": float(monthly["return"].min()), **equity_metrics(series), **trade_metrics(trades), **horizon_returns(series), **strategy.params, } total_rows.append(row) monthly_frames.append(monthly) yearly_frames.append(yearly) trade_rows.extend({"name": strategy.name, "symbol": strategy.symbol, "bar": strategy.bar, "family": strategy.family, **trade} for trade in trades) print(f"done {index}/{len(strategies)} {strategy.name}", flush=True) totals = pd.DataFrame(total_rows).sort_values(["calmar", "annualized_return", "profit_factor"], ascending=[False, False, False]) totals["readonly_observation"] = totals.apply(read_only_decision, axis=1) monthly = pd.concat(monthly_frames, ignore_index=True) yearly = pd.concat(yearly_frames, ignore_index=True) trades = pd.DataFrame(trade_rows) qualified = totals[totals["readonly_observation"] == "worth_readonly_observation"].copy() args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / f"{PREFIX}-totals.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv" yearly_path = args.output_dir / f"{PREFIX}-yearly-distribution.csv" trades_path = args.output_dir / f"{PREFIX}-trades.csv" qualified_path = args.output_dir / f"{PREFIX}-qualified.csv" summary_path = args.output_dir / f"{PREFIX}-summary.json" report_path = args.output_dir / f"{PREFIX}-report.md" totals.to_csv(totals_path, index=False) monthly.to_csv(monthly_path, index=False) yearly.to_csv(yearly_path, index=False) trades.to_csv(trades_path, index=False) qualified.to_csv(qualified_path, index=False) summary = { "years_requested": args.years, "symbols": list(SYMBOLS), "bars": list(BARS), "sol_included": False, "sol_exclusion_reason": "no local SOL candle cache under data/okx-candles", "strategy_count": len(strategies), "qualified_count": int(len(qualified)), "worth_readonly_observation": bool(len(qualified)), "top_name": str((qualified if len(qualified) else totals).iloc[0]["name"]), "output_files": [str(path) for path in (totals_path, monthly_path, yearly_path, trades_path, qualified_path, summary_path, report_path)], } summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" report_path.write_text(markdown_report(command, [totals_path, monthly_path, yearly_path, trades_path, qualified_path, summary_path, report_path], totals, monthly, yearly, qualified), encoding="utf-8") print(totals.head(10).to_string(index=False, formatters={col: format_cell for col in totals.columns})) print(f"qualified={len(qualified)} wrote={report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())