| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- from __future__ import annotations
- import argparse
- import sys
- from itertools import product
- from pathlib import Path
- from typing import Iterable
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts.explore_ultrashort import (
- INITIAL_EQUITY,
- LEVERAGE,
- _format_ts,
- _compute_rsi,
- annualized_metrics_from_equity,
- get_candles_cached,
- history_bars_for_years,
- recent_horizon_metrics_from_equity,
- )
- from okx_codex_trader.models import Candle
- from okx_codex_trader.okx_client import OkxClient
- from okx_codex_trader.sampled_report import SegmentResult, mark_to_market
- SYMBOL = "ETH-USDT-SWAP"
- BAR = "15m"
- YEARS = 3.0
- OUTPUT_DIR = Path("reports/eth-exploration")
- COSTS = {
- "maker_maker": 0.0012,
- "maker_taker": 0.0021,
- "taker_taker": 0.0030,
- }
- HORIZONS = (
- ("3y", pd.DateOffset(years=3)),
- ("1y", pd.DateOffset(years=1)),
- ("6m", pd.DateOffset(months=6)),
- ("3m", pd.DateOffset(months=3)),
- )
- ENTRY_OFFSET_SETS = (
- (0.001, 0.003, 0.005),
- (0.003, 0.006, 0.009),
- )
- def candidate_specs() -> Iterable[dict[str, object]]:
- for (
- trend_sma,
- rsi_threshold,
- exit_rsi,
- stop_loss_pct,
- max_hold_bars,
- entry_offsets,
- entry_valid_bars,
- fill_buffer,
- ) in product(
- (80, 160),
- (5.0, 8.0, 10.0),
- (50.0, 55.0),
- (0.008, 0.012),
- (48, 96),
- ENTRY_OFFSET_SETS,
- (2, 4),
- (0.0, 0.0002),
- ):
- yield {
- "trend_sma": trend_sma,
- "rsi_threshold": rsi_threshold,
- "exit_rsi": exit_rsi,
- "stop_loss_pct": stop_loss_pct,
- "max_hold_bars": max_hold_bars,
- "entry_offsets": entry_offsets,
- "entry_valid_bars": entry_valid_bars,
- "fill_buffer": fill_buffer,
- }
- def annualized_return(total_return: float, first_ts: int, last_ts: int) -> float:
- years = (last_ts - first_ts) / 86_400_000 / 365
- return (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else 0.0
- def strategy_name(spec: dict[str, object]) -> str:
- offsets = "-".join(f"{offset:.4f}" for offset in tuple(spec["entry_offsets"]))
- buffer_label = f"-fb{float(spec['fill_buffer']):.4f}" if float(spec["fill_buffer"]) else ""
- return (
- f"rsi2-long-guarded-price-twap-o{offsets}-v{spec['entry_valid_bars']}{buffer_label}"
- f"-t{spec['trend_sma']}-l{spec['rsi_threshold']}-x{spec['exit_rsi']}"
- f"-sl{spec['stop_loss_pct']}-mh{spec['max_hold_bars']}"
- )
- def close_position(
- *,
- trades: list[dict[str, object]],
- exits: list[dict[str, object]],
- position: dict[str, object],
- account_equity: float,
- candle: Candle,
- exit_price: float,
- roundtrip_cost_on_margin: float,
- ) -> tuple[float, bool]:
- margin_used = float(position["margin_used"])
- exit_equity = mark_to_market(
- side="long",
- margin_used=margin_used,
- entry_price=float(position["entry_price"]),
- mark_price=exit_price,
- leverage=LEVERAGE,
- )
- pnl = exit_equity - margin_used
- cost = margin_used * roundtrip_cost_on_margin
- net_pnl = pnl - cost
- trades.append(
- {
- "side": "Long",
- "entry_time": _format_ts(int(position["entry_time"])),
- "exit_time": _format_ts(candle.ts),
- "entry_price": round(float(position["entry_price"]), 4),
- "exit_price": round(exit_price, 4),
- "pnl": round(net_pnl, 4),
- "return_pct": round(net_pnl / account_equity * 100, 4),
- "cost_weight": round(margin_used / account_equity, 8),
- }
- )
- exits.append({"ts": candle.ts, "price": exit_price, "side": "long"})
- return account_equity + net_pnl, net_pnl > 0.0
- def run_price_twap_segment(
- *,
- candles: list[Candle],
- spec: dict[str, object],
- roundtrip_cost_on_margin: float,
- ) -> SegmentResult:
- closes = pd.Series([candle.close for candle in candles], dtype=float)
- trend = closes.rolling(int(spec["trend_sma"])).mean().tolist()
- rsi_values = _compute_rsi(closes, 2)
- equity = INITIAL_EQUITY
- ending_equity = equity
- peak_equity = equity
- max_drawdown = 0.0
- wins = 0
- trades: list[dict[str, object]] = []
- entries: list[dict[str, object]] = []
- exits: list[dict[str, object]] = []
- equity_curve: list[dict[str, float | int]] = []
- position: dict[str, object] | None = None
- pending_limits: list[dict[str, float | int]] = []
- pending_exit = False
- warmup_bars = max(int(spec["trend_sma"]), 3)
- entry_offsets = tuple(float(value) for value in spec["entry_offsets"])
- for index in range(warmup_bars, len(candles)):
- candle = candles[index]
- if pending_exit and position is not None:
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- account_equity=equity,
- candle=candle,
- exit_price=candle.open,
- roundtrip_cost_on_margin=roundtrip_cost_on_margin,
- )
- wins += 1 if won else 0
- position = None
- pending_exit = False
- pending_limits = []
- active_limits: list[dict[str, float | int]] = []
- for limit in pending_limits:
- if index > int(limit["expires_index"]):
- continue
- limit_price = float(limit["price"])
- if candle.low <= limit_price * (1.0 - float(spec["fill_buffer"])) and equity > 0.0:
- slice_margin = equity / len(entry_offsets)
- if position is None:
- position = {
- "side": "long",
- "entry_time": candle.ts,
- "entry_price": limit_price,
- "entry_index": index,
- "margin_used": slice_margin,
- "stop_price": limit_price * (1 - float(spec["stop_loss_pct"])),
- }
- else:
- old_margin = float(position["margin_used"])
- new_margin = old_margin + slice_margin
- entry_price = (float(position["entry_price"]) * old_margin + limit_price * slice_margin) / new_margin
- position["entry_price"] = entry_price
- position["margin_used"] = new_margin
- position["stop_price"] = entry_price * (1 - float(spec["stop_loss_pct"]))
- entries.append({"ts": candle.ts, "price": limit_price, "side": "long"})
- else:
- active_limits.append(limit)
- pending_limits = active_limits
- current_equity = equity
- if position is not None and candle.low <= float(position["stop_price"]):
- equity, won = close_position(
- trades=trades,
- exits=exits,
- position=position,
- account_equity=equity,
- candle=candle,
- exit_price=float(position["stop_price"]),
- roundtrip_cost_on_margin=roundtrip_cost_on_margin,
- )
- wins += 1 if won else 0
- current_equity = equity
- position = None
- pending_limits = []
- if position is not None:
- position_equity = mark_to_market(
- side="long",
- margin_used=float(position["margin_used"]),
- entry_price=float(position["entry_price"]),
- mark_price=candle.close,
- leverage=LEVERAGE,
- )
- current_equity = equity - float(position["margin_used"]) + position_equity
- peak_equity = max(peak_equity, current_equity)
- max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity)
- equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close})
- ending_equity = current_equity
- if index == len(candles) - 1 or equity <= 0.0:
- continue
- current_rsi = rsi_values[index]
- current_trend = trend[index]
- if current_rsi != current_rsi or current_trend != current_trend:
- continue
- if position is not None:
- held_bars = index - int(position["entry_index"])
- if current_rsi >= float(spec["exit_rsi"]) or held_bars >= int(spec["max_hold_bars"]):
- pending_exit = True
- pending_limits = []
- continue
- if not pending_limits and candle.close > float(current_trend) and current_rsi <= float(spec["rsi_threshold"]):
- pending_limits = [
- {
- "price": candle.close * (1.0 - offset),
- "expires_index": index + int(spec["entry_valid_bars"]),
- }
- for offset in entry_offsets
- ]
- trade_count = len(trades)
- return SegmentResult(
- trade_count=trade_count,
- total_return=(ending_equity - INITIAL_EQUITY) / INITIAL_EQUITY,
- win_rate=(wins / trade_count) if trade_count else 0.0,
- max_drawdown=max_drawdown,
- trades=trades,
- open_position=position,
- candles=candles[warmup_bars:],
- equity_curve=equity_curve,
- entries=entries,
- exits=exits,
- )
- def equity_frame(result: SegmentResult) -> pd.DataFrame:
- frame = pd.DataFrame(result.equity_curve)
- frame["ts"] = pd.to_datetime(frame["ts"], unit="ms", utc=True)
- return frame[["ts", "equity"]]
- def markdown_table(frame: pd.DataFrame, columns: list[str]) -> str:
- rows = [["" if pd.isna(value) else str(value) for value in row] for row in frame[columns].itertuples(index=False, name=None)]
- header = "| " + " | ".join(columns) + " |"
- separator = "| " + " | ".join("---" for _ in columns) + " |"
- body = ["| " + " | ".join(row) + " |" for row in rows]
- return "\n".join([header, separator, *body])
- def run_search(max_candidates: int | None) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
- client = OkxClient()
- candles = get_candles_cached(client, SYMBOL, BAR, history_bars_for_years(BAR, YEARS))
- rows: list[dict[str, object]] = []
- horizon_rows: list[dict[str, object]] = []
- specs = list(candidate_specs())
- if max_candidates is not None:
- specs = specs[:max_candidates]
- for index, spec in enumerate(specs, start=1):
- name = strategy_name(spec)
- gross_result = run_price_twap_segment(candles=candles, spec=spec, roundtrip_cost_on_margin=0.0)
- gross_annualized = annualized_return(gross_result.total_return, candles[0].ts, candles[-1].ts)
- for cost_label, roundtrip_cost in COSTS.items():
- result = run_price_twap_segment(candles=candles, spec=spec, roundtrip_cost_on_margin=roundtrip_cost)
- net_equity = equity_frame(result)
- metrics = annualized_metrics_from_equity(net_equity, candles[0].ts, candles[-1].ts)
- rows.append(
- {
- "symbol": SYMBOL,
- "bar": BAR,
- "cost_model": cost_label,
- "roundtrip_cost_on_margin": roundtrip_cost,
- "name": name,
- "first_candle": _format_ts(candles[0].ts),
- "last_candle": _format_ts(candles[-1].ts),
- "actual_bars": len(candles),
- "trades": result.trade_count,
- "gross_total_return": gross_result.total_return,
- "gross_annualized_return": gross_annualized,
- "gross_max_drawdown_mark_to_market": gross_result.max_drawdown,
- **spec,
- "entry_offsets": "-".join(f"{value:.4f}" for value in tuple(spec["entry_offsets"])),
- **metrics,
- }
- )
- horizon_frame = recent_horizon_metrics_from_equity(net_equity, candles[-1].ts, HORIZONS)
- for horizon_row in horizon_frame.to_dict("records"):
- horizon_rows.append(
- {
- "symbol": SYMBOL,
- "bar": BAR,
- "cost_model": cost_label,
- "roundtrip_cost_on_margin": roundtrip_cost,
- "name": name,
- "trades": result.trade_count,
- **spec,
- "entry_offsets": "-".join(f"{value:.4f}" for value in tuple(spec["entry_offsets"])),
- **horizon_row,
- }
- )
- print(f"{index}/{len(specs)} {name} trades={gross_result.trade_count}")
- all_results = pd.DataFrame(rows)
- horizons = pd.DataFrame(horizon_rows)
- all_results = all_results.sort_values(
- ["cost_model", "net_calmar", "net_annualized_return"],
- ascending=[True, False, False],
- )
- maker_taker = all_results[all_results["cost_model"] == "maker_taker"].sort_values(
- ["net_calmar", "net_annualized_return"],
- ascending=False,
- )
- horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=["3y", "1y", "6m", "3m"], ordered=True)
- horizons = horizons.sort_values(
- ["cost_model", "horizon", "net_calmar", "net_annualized_return"],
- ascending=[True, True, False, False],
- )
- return all_results, maker_taker, horizons
- def markdown_summary(maker_taker: pd.DataFrame, horizons: pd.DataFrame) -> str:
- top10 = maker_taker.head(10)
- top_names = set(top10["name"])
- horizon_top = horizons[
- (horizons["cost_model"] == "maker_taker")
- & (horizons["name"].isin(top_names))
- ].sort_values(["name", "horizon"])
- columns = [
- "name",
- "trades",
- "trend_sma",
- "rsi_threshold",
- "exit_rsi",
- "stop_loss_pct",
- "max_hold_bars",
- "entry_offsets",
- "entry_valid_bars",
- "fill_buffer",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "net_sharpe_daily",
- ]
- return "\n".join(
- [
- "# ETH price-TWAP variant search",
- "",
- "Primary sort: maker_taker by net_calmar, then net_annualized_return.",
- "",
- "## Top 10 maker_taker candidates",
- "",
- markdown_table(top10, columns),
- "",
- "## Recent horizons for top 10",
- "",
- markdown_table(
- horizon_top,
- [
- "name",
- "horizon",
- "horizon_start",
- "horizon_end",
- "net_total_return",
- "net_annualized_return",
- "net_max_drawdown",
- "net_calmar",
- "net_sharpe_daily",
- ],
- ),
- "",
- "## Next narrowing direction",
- "",
- "Favor the parameter cluster shared by the top maker_taker rows: keep the strongest trend_sma/rsi_threshold/offset combinations, then rerun a narrower grid around adjacent stop_loss_pct, max_hold_bars, entry_valid_bars, and fill_buffer values.",
- "",
- ]
- )
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--max-candidates", type=int, default=None)
- args = parser.parse_args()
- OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
- all_results, maker_taker, horizons = run_search(args.max_candidates)
- all_path = OUTPUT_DIR / "eth-price-twap-search.csv"
- top_path = OUTPUT_DIR / "eth-price-twap-top10.csv"
- horizon_path = OUTPUT_DIR / "eth-price-twap-horizons.csv"
- summary_path = OUTPUT_DIR / "eth-price-twap-summary.md"
- all_results.to_csv(all_path, index=False)
- maker_taker.head(10).to_csv(top_path, index=False)
- horizons.to_csv(horizon_path, index=False)
- summary_path.write_text(markdown_summary(maker_taker, horizons), encoding="utf-8")
- print(f"wrote {all_path}")
- print(f"wrote {top_path}")
- print(f"wrote {horizon_path}")
- print(f"wrote {summary_path}")
- print(maker_taker.head(10).to_string(index=False))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|