from __future__ import annotations from dataclasses import dataclass from html import escape from pathlib import Path from random import Random from statistics import median from typing import Callable import pandas as pd from bokeh.embed import components from bokeh.layouts import column from bokeh.plotting import figure from bokeh.resources import INLINE from okx_codex_trader.models import Candle WARMUP_BARS = 69 SAMPLER_SEED = 7 @dataclass(frozen=True) class SampledSegment: context_start: int report_start: int report_end: int start_ts: int end_ts: int @dataclass(frozen=True) class SegmentResult: trade_count: int total_return: float win_rate: float max_drawdown: float trades: list[dict[str, object]] open_position: dict[str, object] | None candles: list[Candle] equity_curve: list[dict[str, float | int]] entries: list[dict[str, object]] exits: list[dict[str, object]] @dataclass(frozen=True) class ReportSegment: index: int start_time: str end_time: str result: SegmentResult plot_div: str def _format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def sample_segments( *, candles: list[Candle], segments: int, window_size: int, warmup_bars: int = WARMUP_BARS, seed: int = SAMPLER_SEED, ) -> list[SampledSegment]: block_size = window_size + warmup_bars if len(candles) < segments * block_size: raise ValueError("history pool is too small") context_starts = list(range(0, len(candles) - block_size + 1, block_size)) if len(context_starts) < segments: raise ValueError("history pool is too small") rng = Random(seed) rng.shuffle(context_starts) selected_context_starts = sorted(context_starts[:segments]) return [ SampledSegment( context_start=context_start, report_start=context_start + warmup_bars, report_end=context_start + block_size, start_ts=candles[context_start + warmup_bars].ts, end_ts=candles[context_start + block_size - 1].ts, ) for context_start in selected_context_starts ] def trade_equity( *, side: str, margin_used: float, entry_price: float, exit_price: float, leverage: int, ) -> float: if side == "long": price_return = (exit_price - entry_price) / entry_price else: price_return = (entry_price - exit_price) / entry_price return margin_used + (margin_used * leverage * price_return) def mark_to_market( *, side: str, margin_used: float, entry_price: float, mark_price: float, leverage: int, ) -> float: return trade_equity( side=side, margin_used=margin_used, entry_price=entry_price, exit_price=mark_price, leverage=leverage, ) def build_segment_plot(segment: SegmentResult): timestamps = [pd.to_datetime(point["ts"], unit="ms", utc=True) for point in segment.equity_curve] closes = [point["close"] for point in segment.equity_curve] equities = [point["equity"] for point in segment.equity_curve] price_fig = figure(height=320, sizing_mode="stretch_width", x_axis_type="datetime", title="Price") price_fig.line(timestamps, closes, line_width=2, color="#1f6f78") if segment.entries: price_fig.scatter( [pd.to_datetime(entry["ts"], unit="ms", utc=True) for entry in segment.entries], [entry["price"] for entry in segment.entries], marker="triangle", size=12, color="#1d7c44", ) if segment.exits: price_fig.scatter( [pd.to_datetime(exit_point["ts"], unit="ms", utc=True) for exit_point in segment.exits], [exit_point["price"] for exit_point in segment.exits], marker="inverted_triangle", size=12, color="#a13d2d", ) equity_fig = figure(height=220, sizing_mode="stretch_width", x_axis_type="datetime", title="Equity") equity_fig.line(timestamps, equities, line_width=2, color="#7b4f9d") return column(price_fig, equity_fig, sizing_mode="stretch_width") def render_sampled_report( *, symbol: str, bar: str, leverage: int, history_limit: int, segments: int, window_size: int, report_title: str, strategy_label: str, strategy_description: str, strategy_params: dict[str, object], aggregate_summary: dict[str, object], segment_results: list[ReportSegment], bokeh_script: str, ) -> str: summary_cards = "".join( f"""
{escape(label)}
{escape(str(value))}
""" for label, value in ( ("History Limit", history_limit), ("Segment Count", segments), ("Window Size", window_size), ("Average Return Across Segments", aggregate_summary["average_return"]), ("Median Return Across Segments", aggregate_summary["median_return"]), ("Best Segment Return", aggregate_summary["best_segment_return"]), ("Worst Segment Return", aggregate_summary["worst_segment_return"]), ("Aggregate Trade Count", aggregate_summary["aggregate_trade_count"]), ) ) params_markup = "".join( f"""
{escape(key.replace('_', ' ').title())}
{escape(str(value))}
""" for key, value in strategy_params.items() ) selector = "".join( f'' for segment in segment_results ) panels = [] for segment in segment_results: rows = "".join( f""" {escape(str(trade["side"]))} {escape(str(trade["entry_time"]))} {escape(str(trade["exit_time"]))} {escape(str(trade["entry_price"]))} {escape(str(trade["exit_price"]))} {escape(str(trade["pnl"]))} {escape(str(trade["return_pct"]))} """ for trade in segment.result.trades ) panels.append( f"""
Sampled Range Start Time{escape(segment.start_time)}
Sampled Range End Time{escape(segment.end_time)}
Trade Count{escape(str(segment.result.trade_count))}
Total Return{escape(str(round(segment.result.total_return, 6)))}
Win Rate{escape(str(round(segment.result.win_rate, 6)))}
Max Drawdown{escape(str(round(segment.result.max_drawdown, 6)))}

Trade Journal

{rows}
Side Entry Time Exit Time Entry Exit PnL Return %
{segment.plot_div}
""" ) return f""" {escape(symbol)} {escape(report_title)}
{escape(strategy_label)} sampled report

{escape(symbol)}

Bar: {escape(bar)} ยท Leverage: {escape(str(leverage))}x
{escape(report_title)}

{escape(strategy_description)}

{params_markup}
{summary_cards}
{selector}
{''.join(panels)}
{bokeh_script} """ def generate_sampled_report( *, candles: list[Candle], leverage: int, output_file: Path, symbol: str, bar: str, segments: int, window_size: int, report_title: str, strategy_label: str, strategy_description: str, strategy_params: dict[str, object], run_segment: Callable[..., SegmentResult], warmup_bars: int = WARMUP_BARS, ) -> dict[str, object]: sampled = sample_segments(candles=candles, segments=segments, window_size=window_size, warmup_bars=warmup_bars) if len(sampled) != segments: raise ValueError("invalid sampling result") output_file.parent.mkdir(parents=True, exist_ok=True) segment_results: list[SegmentResult] = [] plots = {} for index, segment in enumerate(sampled): result = run_segment( candles=candles[segment.context_start : segment.report_end], leverage=leverage, warmup_bars=warmup_bars, ) segment_results.append(result) plots[f"segment_{index}"] = build_segment_plot(result) plot_script, plot_divs = components(plots) report_segments = [ ReportSegment( index=index, start_time=_format_ts(segment.start_ts), end_time=_format_ts(segment.end_ts), result=result, plot_div=plot_divs[f"segment_{index}"], ) for index, (segment, result) in enumerate(zip(sampled, segment_results, strict=True)) ] returns = [result.total_return for result in segment_results] aggregate_summary = { "aggregate_trade_count": sum(result.trade_count for result in segment_results), "average_return": round(sum(returns) / len(returns), 6), "median_return": round(float(median(returns)), 6), "best_segment_return": round(max(returns), 6), "worst_segment_return": round(min(returns), 6), } output_file.write_text( render_sampled_report( symbol=symbol, bar=bar, leverage=leverage, history_limit=len(candles), segments=segments, window_size=window_size, report_title=report_title, strategy_label=strategy_label, strategy_description=strategy_description, strategy_params=strategy_params, aggregate_summary=aggregate_summary, segment_results=report_segments, bokeh_script=INLINE.render_js() + plot_script, ) ) return { "report_file": str(output_file), "segment_count": segments, "window_size": window_size, "aggregate_trade_count": aggregate_summary["aggregate_trade_count"], "average_return": aggregate_summary["average_return"], }