from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle from okx_codex_trader.sampled_report import SegmentResult, mark_to_market, trade_equity from scripts import explore_ultrashort as explore from scripts.search_eth_btc_nextgen_variants import format_cell, markdown_table from scripts.search_expansion_trend_swing import ( Params, frame_to_candles, load_15m_frame, resample_frame, true_range, ) OUTPUT_DIR = Path("reports/strategy-expansion") INPUT_TOTALS = OUTPUT_DIR / "trend-swing-totals.csv" PREFIX = "trend-validation" SYMBOLS = ("BTC-USDT-SWAP", "ETH-USDT-SWAP") BARS = ("1D", "4H") YEARS = 10.0 FEE_SINGLE_SIDE = 0.0004 HORIZONS = ( ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) MARKET_PHASES = ( ("2020_2021_bull", "2020-03-13", "2021-11-10"), ("2021_2022_bear", "2021-11-10", "2022-11-21"), ("2022_2024_bull", "2022-11-21", "2024-03-14"), ("2024_2026_late_cycle", "2024-03-14", "2026-05-31"), ) @dataclass(frozen=True) class Risk: leverage: int position_fraction: float @property def label(self) -> str: return f"lev{self.leverage}-pos{self.position_fraction:g}" RISK_PROFILES = ( Risk(leverage=1, position_fraction=1.0), Risk(leverage=2, position_fraction=0.5), Risk(leverage=1, position_fraction=0.5), ) def close_trade( *, trades: list[dict[str, object]], exits: list[dict[str, object]], account_equity: float, position: dict[str, object], candle: Candle, exit_price: float, risk: Risk, ) -> tuple[float, bool]: margin_used = float(position["margin_used"]) exit_equity = trade_equity( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), exit_price=exit_price, leverage=risk.leverage, ) gross_pnl = exit_equity - margin_used fee = margin_used * FEE_SINGLE_SIDE * 2.0 * risk.leverage pnl = gross_pnl - fee next_equity = account_equity + pnl trades.append( { "side": "Long" if position["side"] == "long" else "Short", "entry_time": explore._format_ts(int(position["entry_time"])), "exit_time": explore._format_ts(candle.ts), "entry_price": round(float(position["entry_price"]), 4), "exit_price": round(exit_price, 4), "pnl": round(pnl, 4), "return_pct": round(pnl / account_equity * 100.0, 4), "return_on_margin_pct": round(pnl / margin_used * 100.0, 4), } ) exits.append({"ts": candle.ts, "price": exit_price, "side": position["side"]}) return next_equity, pnl > 0.0 def run_segment(candles: list[Candle], params: Params, risk: Risk) -> SegmentResult: highs = pd.Series([c.high for c in candles], dtype=float) lows = pd.Series([c.low for c in candles], dtype=float) closes = pd.Series([c.close for c in candles], dtype=float) fast = closes.ewm(span=params.fast, adjust=False).mean() slow = closes.ewm(span=params.slow, adjust=False).mean() atr = true_range(highs, lows, closes).rolling(params.atr).mean() entry_high = highs.shift(1).rolling(params.entry).max() entry_low = lows.shift(1).rolling(params.entry).min() exit_high = highs.shift(1).rolling(params.exit).max() exit_low = lows.shift(1).rolling(params.exit).min() rsi = explore._compute_rsi(closes, 5) warmup = max(params.slow, params.entry, params.exit, params.atr, 8) account_equity = explore.INITIAL_EQUITY ending_equity = account_equity peak_equity = account_equity max_drawdown = 0.0 wins = 0 trades: list[dict[str, object]] = [] entries: list[dict[str, object]] = [] exits: list[dict[str, object]] = [] equity_curve: list[dict[str, float | int]] = [] position: dict[str, object] | None = None pending_side: str | None = None pending_exit = False for index in range(warmup, len(candles)): candle = candles[index] if pending_exit and position is not None: account_equity, won = close_trade( trades=trades, exits=exits, account_equity=account_equity, position=position, candle=candle, exit_price=candle.open, risk=risk, ) wins += 1 if won else 0 position = None pending_exit = False if pending_side is not None and position is None and account_equity > 0.0: side = pending_side current_atr = float(atr.iloc[index - 1]) margin_used = account_equity * risk.position_fraction position = { "side": side, "entry_time": candle.ts, "entry_price": candle.open, "entry_index": index, "margin_used": margin_used, "stop_price": candle.open - params.stop_atr * current_atr if side == "long" else candle.open + params.stop_atr * current_atr, "take_price": candle.open + params.take_atr * current_atr if side == "long" else candle.open - params.take_atr * current_atr, } entries.append({"ts": candle.ts, "price": candle.open, "side": side}) pending_side = None current_equity = account_equity if position is not None: side = str(position["side"]) stop_hit = (side == "long" and candle.low <= float(position["stop_price"])) or ( side == "short" and candle.high >= float(position["stop_price"]) ) take_hit = (side == "long" and candle.high >= float(position["take_price"])) or ( side == "short" and candle.low <= float(position["take_price"]) ) if stop_hit or take_hit: exit_price = float(position["stop_price"] if stop_hit else position["take_price"]) account_equity, won = close_trade( trades=trades, exits=exits, account_equity=account_equity, position=position, candle=candle, exit_price=exit_price, risk=risk, ) wins += 1 if won else 0 current_equity = account_equity position = None if position is not None: margin_used = float(position["margin_used"]) position_equity = mark_to_market( side=str(position["side"]), margin_used=margin_used, entry_price=float(position["entry_price"]), mark_price=candle.close, leverage=risk.leverage, ) current_equity = account_equity - margin_used + position_equity peak_equity = max(peak_equity, current_equity) max_drawdown = max(max_drawdown, (peak_equity - current_equity) / peak_equity) equity_curve.append({"ts": candle.ts, "equity": current_equity, "close": candle.close}) ending_equity = current_equity if index == len(candles) - 1 or account_equity <= 0.0: continue if position is not None: held = index - int(position["entry_index"]) side = str(position["side"]) if side == "long": pending_exit = candle.close < float(exit_low.iloc[index]) or fast.iloc[index] < slow.iloc[index] or held >= params.max_hold else: pending_exit = candle.close > float(exit_high.iloc[index]) or fast.iloc[index] > slow.iloc[index] or held >= params.max_hold continue if params.family == "donchian": if candle.close > float(entry_high.iloc[index]) and fast.iloc[index] > slow.iloc[index]: pending_side = "long" elif candle.close < float(entry_low.iloc[index]) and fast.iloc[index] < slow.iloc[index]: pending_side = "short" elif params.family == "ema_cross": prev_fast = fast.iloc[index - 1] prev_slow = slow.iloc[index - 1] if prev_fast <= prev_slow and fast.iloc[index] > slow.iloc[index]: pending_side = "long" elif prev_fast >= prev_slow and fast.iloc[index] < slow.iloc[index]: pending_side = "short" elif params.family == "trend_pullback": if fast.iloc[index] > slow.iloc[index] and candle.close <= fast.iloc[index] and rsi[index] <= 45: pending_side = "long" elif fast.iloc[index] < slow.iloc[index] and candle.close >= fast.iloc[index] and rsi[index] >= 55: pending_side = "short" trade_count = len(trades) return SegmentResult( trade_count=trade_count, total_return=(ending_equity - explore.INITIAL_EQUITY) / explore.INITIAL_EQUITY, win_rate=wins / trade_count if trade_count else 0.0, max_drawdown=max_drawdown, trades=trades, open_position=position, candles=candles[warmup:], equity_curve=equity_curve, entries=entries, exits=exits, ) def daily_equity(result: SegmentResult) -> pd.Series: frame = pd.DataFrame( { "ts": [pd.to_datetime(point["ts"], unit="ms", utc=True) for point in result.equity_curve], "equity": [float(point["equity"]) for point in result.equity_curve], } ) series = frame.set_index("ts")["equity"].sort_index() index = pd.date_range(series.index[0].normalize(), series.index[-1].normalize(), freq="1D", tz="UTC") return series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).fillna(explore.INITIAL_EQUITY) def metrics_from_series(series: pd.Series) -> dict[str, float]: if len(series) < 2: return {"total_return": 0.0, "annualized_return": 0.0, "max_drawdown": 0.0, "calmar": 0.0} years = (series.index[-1] - series.index[0]).total_seconds() / 86_400 / 365 total = float(series.iloc[-1] / series.iloc[0] - 1.0) annual = (1.0 + total) ** (1.0 / years) - 1.0 if total > -1.0 and years > 0.0 else 0.0 drawdown = explore.max_drawdown_from_equity([float(value) for value in series]) return { "total_return": total, "annualized_return": annual, "max_drawdown": drawdown, "calmar": annual / drawdown if drawdown else 0.0, } def trade_stats(trades: list[dict[str, object]]) -> dict[str, float | int]: returns = [float(trade["return_pct"]) / 100.0 for trade in trades] wins = [value for value in returns if value > 0.0] losses = [value for value in returns if value < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0 return { "trades": len(returns), "win_rate": len(wins) / len(returns) if returns else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, } def trades_between(trades: list[dict[str, object]], start: pd.Timestamp, end: pd.Timestamp) -> list[dict[str, object]]: rows = [] for trade in trades: exit_time = pd.to_datetime(str(trade["exit_time"]), utc=True) if start <= exit_time <= end: rows.append(trade) return rows def horizon_returns(series: pd.Series) -> dict[str, float]: out: dict[str, float] = {} end = series.index[-1] for label, offset in HORIZONS: scoped = series[series.index >= end - offset] out[f"return_{label}"] = float(scoped.iloc[-1] / scoped.iloc[0] - 1.0) if len(scoped) >= 2 else 0.0 return out def monthly_rows(name: str, params: Params, risk: Risk, series: pd.Series) -> pd.DataFrame: monthly = series.resample("ME").last() frame = pd.DataFrame( { "name": name, "symbol": params.symbol, "bar": params.bar, "family": params.family, "risk": risk.label, "month": monthly.index.strftime("%Y-%m"), "start_equity": monthly.shift(1).fillna(series.iloc[0]).to_numpy(), "end_equity": monthly.to_numpy(), } ) frame["return"] = frame["end_equity"] / frame["start_equity"] - 1.0 return frame def period_row( *, name: str, period_type: str, period: str, params: Params, risk: Risk, series: pd.Series, trades: list[dict[str, object]], ) -> dict[str, object]: metrics = metrics_from_series(series) stats = trade_stats(trades) return { "name": name, "period_type": period_type, "period": period, "symbol": params.symbol, "bar": params.bar, "family": params.family, "risk": risk.label, "start": series.index[0].strftime("%Y-%m-%d"), "end": series.index[-1].strftime("%Y-%m-%d"), **metrics, **stats, } def yearly_rows(name: str, params: Params, risk: Risk, series: pd.Series, trades: list[dict[str, object]]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for year, scoped in series.groupby(series.index.year): if len(scoped) < 2: continue start = scoped.index[0] end = scoped.index[-1] rows.append( period_row( name=name, period_type="year", period=str(year), params=params, risk=risk, series=scoped, trades=trades_between(trades, start, end), ) ) return rows def phase_rows(name: str, params: Params, risk: Risk, series: pd.Series, trades: list[dict[str, object]]) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for label, raw_start, raw_end in MARKET_PHASES: start = pd.Timestamp(raw_start, tz="UTC") end = pd.Timestamp(raw_end, tz="UTC") scoped = series[(series.index >= start) & (series.index <= end)] if len(scoped) < 2: continue rows.append( period_row( name=name, period_type="market_phase", period=label, params=params, risk=risk, series=scoped, trades=trades_between(trades, scoped.index[0], scoped.index[-1]), ) ) return rows def rolling_3y_worst(series: pd.Series) -> dict[str, object]: monthly = series.resample("ME").last() rows: list[dict[str, object]] = [] for start in monthly.index: end = start + pd.DateOffset(years=3) scoped = series[(series.index >= start) & (series.index <= end)] if len(scoped) < 365 * 2: continue metrics = metrics_from_series(scoped) rows.append( { "rolling_3y_start": scoped.index[0].strftime("%Y-%m-%d"), "rolling_3y_end": scoped.index[-1].strftime("%Y-%m-%d"), "rolling_3y_total_return": metrics["total_return"], "rolling_3y_annualized_return": metrics["annualized_return"], "rolling_3y_max_drawdown": metrics["max_drawdown"], "rolling_3y_calmar": metrics["calmar"], } ) if not rows: return { "rolling_3y_start": "", "rolling_3y_end": "", "rolling_3y_total_return": 0.0, "rolling_3y_annualized_return": 0.0, "rolling_3y_max_drawdown": 0.0, "rolling_3y_calmar": 0.0, } return min(rows, key=lambda row: float(row["rolling_3y_total_return"])) def params_from_totals(path: Path) -> list[Params]: totals = pd.read_csv(path) scoped = totals[totals["bar"].isin(BARS)].copy() scoped = scoped.sort_values(["symbol", "bar", "family", "fast", "slow", "entry", "exit", "stop_atr", "take_atr"]) rows: list[Params] = [] for row in scoped.itertuples(index=False): rows.append( Params( symbol=str(row.symbol), bar=str(row.bar), family=str(row.family), fast=int(row.fast), slow=int(row.slow), entry=int(row.entry), exit=int(row.exit), atr=int(row.atr), stop_atr=float(row.stop_atr), take_atr=float(row.take_atr), max_hold=int(row.max_hold), ) ) return rows def candidate_name(params: Params, risk: Risk) -> str: return f"{params.name}-{risk.label}" def verdict(totals: pd.DataFrame) -> str: daily_ema = totals[(totals["bar"] == "1D") & (totals["family"] == "ema_cross")] daily_ema_pass = daily_ema[ (daily_ema["calmar"] > 0.8) & (daily_ema["return_1y"] >= 0.0) & (daily_ema["return_6m"] >= 0.0) & (daily_ema["return_3m"] >= 0.0) ] liquid_pass = totals[ (totals["trades"] >= 60) & (totals["calmar"] > 0.8) & (totals["return_1y"] >= 0.0) & (totals["return_6m"] >= 0.0) & (totals["return_3m"] >= 0.0) ] if daily_ema.empty: return "No 1D EMA branch was present in the validation input." max_daily_trades = int(daily_ema["trades"].max()) if daily_ema_pass.empty: return ( "Exclude the 1D EMA branch: it does not pass Calmar > 0.8 with non-negative 1y/6m/3m returns " f"under lower-risk validation. Its maximum trade count is {max_daily_trades}, so the sample is also too small." ) if max_daily_trades < 30: return ( "Exclude the 1D EMA branch despite headline metrics: every passing 1D EMA row has fewer than 30 trades, " "so the result is dominated by a small number of exits rather than repeatable evidence." ) if liquid_pass.empty: return "No validated candidate has both adequate trade count and non-negative 1y/6m/3m returns." return "Keep only the liquid candidates listed in the validated table; 1D EMA is not the preferred branch." def markdown_report( command: str, paths: list[Path], totals: pd.DataFrame, periods: pd.DataFrame, monthly: pd.DataFrame, ) -> str: validated = totals[ (totals["trades"] >= 60) & (totals["calmar"] > 0.8) & (totals["return_1y"] >= 0.0) & (totals["return_6m"] >= 0.0) & (totals["return_3m"] >= 0.0) ].sort_values(["calmar", "trades"], ascending=[False, False]) daily_ema = totals[(totals["bar"] == "1D") & (totals["family"] == "ema_cross")].sort_values("calmar", ascending=False) top = totals.sort_values(["calmar", "trades"], ascending=[False, False]).head(12) lines = [ "# Trend validation", "", f"Run command: `{command}`", "", "Output files:", *[f"- `{path}`" for path in paths], "", "Scope: existing trend-swing 1D and 4H parameter rows, rerun on BTC-USDT-SWAP and ETH-USDT-SWAP local 15m cache.", "Risk profiles: 1x full notional, 2x half notional, 1x half notional. Fee is 0.04% per side.", "Validation pass: trades >= 60, Calmar > 0.8, and non-negative 1y/6m/3m returns.", "", "## Verdict", "", verdict(totals), "", "## Validated candidates", "", markdown_table( validated[ [ "name", "symbol", "bar", "family", "risk", "trades", "total_return", "annualized_return", "max_drawdown", "calmar", "win_rate", "payoff_ratio", "return_3y", "return_1y", "return_6m", "return_3m", "rolling_3y_total_return", "rolling_3y_calmar", ] ].head(20) if not validated.empty else pd.DataFrame(columns=["result"]) ), "", "## Headline ranking", "", markdown_table( top[ [ "name", "symbol", "bar", "family", "risk", "trades", "total_return", "annualized_return", "max_drawdown", "calmar", "return_1y", "return_6m", "return_3m", "rolling_3y_total_return", ] ] ), "", "## 1D EMA sample check", "", markdown_table( daily_ema[ [ "name", "symbol", "risk", "trades", "total_return", "annualized_return", "max_drawdown", "calmar", "return_1y", "return_6m", "return_3m", "rolling_3y_total_return", ] ].head(12) ), "", "## Worst rolling 3-year by symbol/bar", "", markdown_table( totals.sort_values("rolling_3y_total_return")[ [ "name", "symbol", "bar", "family", "risk", "trades", "rolling_3y_start", "rolling_3y_end", "rolling_3y_total_return", "rolling_3y_annualized_return", "rolling_3y_max_drawdown", "rolling_3y_calmar", ] ].head(16) ), "", "## Year and market phase files", "", "Yearly and bull/bear phase metrics are written to the periods CSV. Monthly returns are written to the monthly CSV.", ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--years", type=float, default=YEARS) parser.add_argument("--input-totals", type=Path, default=INPUT_TOTALS) parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--max-candidates", type=int, default=0) args = parser.parse_args() params_grid = params_from_totals(args.input_totals) if args.max_candidates: params_grid = params_grid[: args.max_candidates] raw = {symbol: load_15m_frame(symbol, args.years) for symbol in SYMBOLS} candles = { (symbol, bar): frame_to_candles(symbol, resample_frame(raw[symbol], bar)) for symbol in SYMBOLS for bar in BARS } total_rows: list[dict[str, object]] = [] period_rows: list[dict[str, object]] = [] monthly_frames: list[pd.DataFrame] = [] tasks = [(params, risk) for params in params_grid for risk in RISK_PROFILES] for index, (params, risk) in enumerate(tasks, start=1): result = run_segment(candles[(params.symbol, params.bar)], params, risk) series = daily_equity(result) name = candidate_name(params, risk) monthly = monthly_rows(name, params, risk, series) row = { "name": name, **params.__dict__, "risk": risk.label, "leverage": risk.leverage, "position_fraction": risk.position_fraction, "first_candle": series.index[0].strftime("%Y-%m-%d"), "last_candle": series.index[-1].strftime("%Y-%m-%d"), "fee_single_side": FEE_SINGLE_SIDE, "worst_month_return": float(monthly["return"].min()), **metrics_from_series(series), **trade_stats(result.trades), **horizon_returns(series), **rolling_3y_worst(series), } total_rows.append(row) period_rows.extend(yearly_rows(name, params, risk, series, result.trades)) period_rows.extend(phase_rows(name, params, risk, series, result.trades)) monthly_frames.append(monthly) print(f"done {index}/{len(tasks)} {name}", flush=True) totals = pd.DataFrame(total_rows).sort_values( ["calmar", "annualized_return", "trades"], ascending=[False, False, False], ) periods = pd.DataFrame(period_rows) monthly_all = pd.concat(monthly_frames, ignore_index=True) validated = totals[ (totals["trades"] >= 60) & (totals["calmar"] > 0.8) & (totals["return_1y"] >= 0.0) & (totals["return_6m"] >= 0.0) & (totals["return_3m"] >= 0.0) ].sort_values(["calmar", "trades"], ascending=[False, False]) args.output_dir.mkdir(parents=True, exist_ok=True) totals_path = args.output_dir / f"{PREFIX}-totals.csv" periods_path = args.output_dir / f"{PREFIX}-periods.csv" monthly_path = args.output_dir / f"{PREFIX}-monthly-returns.csv" validated_path = args.output_dir / f"{PREFIX}-validated.csv" best_path = args.output_dir / f"{PREFIX}-best.json" report_path = args.output_dir / f"{PREFIX}-report.md" totals.to_csv(totals_path, index=False) periods.to_csv(periods_path, index=False) monthly_all.to_csv(monthly_path, index=False) validated.to_csv(validated_path, index=False) best_path.write_text(json.dumps(validated.head(20).to_dict(orient="records"), indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --years {args.years}" report_path.write_text( markdown_report(command, [totals_path, periods_path, monthly_path, validated_path, best_path, report_path], totals, periods, monthly_all), encoding="utf-8", ) print(validated.head(10).to_string(index=False, formatters={col: format_cell for col in validated.columns})) print(verdict(totals)) return 0 if __name__ == "__main__": raise SystemExit(main())