from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path from typing import Callable import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from okx_codex_trader.models import Candle DATA_DIR = Path("data/okx-candles") OUTPUT_DIR = Path("reports/ultrashort") SYMBOLS = ("ETH-USDT-SWAP", "BTC-USDT-SWAP") BARS = ("15m",) INITIAL_EQUITY = 10_000.0 LEVERAGE = 3 ROUNDTRIP_COST = 0.0021 HORIZONS = ( ("full", None), ("3y", pd.DateOffset(years=3)), ("1y", pd.DateOffset(years=1)), ("6m", pd.DateOffset(months=6)), ("3m", pd.DateOffset(months=3)), ) @dataclass(frozen=True) class Strategy: family: str name: str warmup_bars: int params: dict[str, object] build_signals: Callable[[list[Candle]], tuple[list[str | None], list[str | None], list[float | None], list[float | None]]] @dataclass(frozen=True) class BacktestResult: trades: list[dict[str, object]] equity: pd.DataFrame long_entries: int short_entries: int def load_candles(symbol: str, bar: str) -> list[Candle]: frame = pd.read_csv(DATA_DIR / symbol / f"{bar}.csv") return [ Candle( symbol=symbol, ts=int(row.ts), open=float(row.open), high=float(row.high), low=float(row.low), close=float(row.close), volume=float(row.volume), ) for row in frame.itertuples(index=False) ] def format_ts(ts: int) -> str: return pd.to_datetime(ts, unit="ms", utc=True).strftime("%Y-%m-%d %H:%M") def rsi(values: pd.Series, length: int) -> pd.Series: delta = values.diff() gain = delta.clip(lower=0).rolling(length).mean() loss = (-delta.clip(upper=0)).rolling(length).mean() rs = gain / loss return 100.0 - (100.0 / (1.0 + rs)) def crossing_up(left: pd.Series, right: pd.Series) -> pd.Series: return (left.shift(1) <= right.shift(1)) & (left > right) def crossing_down(left: pd.Series, right: pd.Series) -> pd.Series: return (left.shift(1) >= right.shift(1)) & (left < right) def build_rsi2_signals( candles: list[Candle], *, trend_sma: int, entry_rsi: float, exit_rsi: float, stop_loss_pct: float, max_hold_bars: int, ) -> tuple[list[str | None], list[str | None], list[float | None], list[float | None]]: close = pd.Series([candle.close for candle in candles], dtype=float) trend = close.rolling(trend_sma).mean() rsi2 = rsi(close, 2) entries: list[str | None] = [None] * len(candles) exits: list[str | None] = [None] * len(candles) stops: list[float | None] = [stop_loss_pct] * len(candles) takes: list[float | None] = [None] * len(candles) for index, candle in enumerate(candles): if trend.iloc[index] != trend.iloc[index] or rsi2.iloc[index] != rsi2.iloc[index]: continue if candle.close > float(trend.iloc[index]) and float(rsi2.iloc[index]) <= entry_rsi: entries[index] = "long" elif candle.close < float(trend.iloc[index]) and float(rsi2.iloc[index]) >= 100.0 - entry_rsi: entries[index] = "short" if float(rsi2.iloc[index]) >= exit_rsi: exits[index] = "long" elif float(rsi2.iloc[index]) <= 100.0 - exit_rsi: exits[index] = "short" return with_max_hold(entries, exits, stops, takes, max_hold_bars) def build_ma_cross_signals( candles: list[Candle], *, fast: int, slow: int, stop_loss_pct: float, ) -> tuple[list[str | None], list[str | None], list[float | None], list[float | None]]: close = pd.Series([candle.close for candle in candles], dtype=float) fast_ma = close.rolling(fast).mean() slow_ma = close.rolling(slow).mean() entries: list[str | None] = [None] * len(candles) exits: list[str | None] = [None] * len(candles) stops: list[float | None] = [stop_loss_pct] * len(candles) takes: list[float | None] = [None] * len(candles) up = crossing_up(fast_ma, slow_ma) down = crossing_down(fast_ma, slow_ma) for index in range(len(candles)): if up.iloc[index]: entries[index] = "long" exits[index] = "both" elif down.iloc[index]: entries[index] = "short" exits[index] = "both" return entries, exits, stops, takes def build_vwap_reversion_signals( candles: list[Candle], *, window: int, entry_z: float, exit_z: float, stop_loss_pct: float, max_hold_bars: int, ) -> tuple[list[str | None], list[str | None], list[float | None], list[float | None]]: close = pd.Series([candle.close for candle in candles], dtype=float) volume = pd.Series([candle.volume for candle in candles], dtype=float) vwap = (close * volume).rolling(window).sum() / volume.rolling(window).sum() stdev = close.rolling(window).std(ddof=0) z = (close - vwap) / stdev entries: list[str | None] = [None] * len(candles) exits: list[str | None] = [None] * len(candles) stops: list[float | None] = [stop_loss_pct] * len(candles) takes: list[float | None] = [None] * len(candles) for index in range(len(candles)): if z.iloc[index] != z.iloc[index]: continue if float(z.iloc[index]) <= -entry_z: entries[index] = "long" elif float(z.iloc[index]) >= entry_z: entries[index] = "short" if abs(float(z.iloc[index])) <= exit_z: exits[index] = "both" return with_max_hold(entries, exits, stops, takes, max_hold_bars) def build_bbmr_signals( candles: list[Candle], *, trend_sma: int, band_length: int, stdev_mult: float, atr_length: int, max_atr_pct: float, stop_loss_pct: float, take_profit_pct: float, max_hold_bars: int, ) -> tuple[list[str | None], list[str | None], list[float | None], list[float | None]]: close = pd.Series([candle.close for candle in candles], dtype=float) high = pd.Series([candle.high for candle in candles], dtype=float) low = pd.Series([candle.low for candle in candles], dtype=float) trend = close.rolling(trend_sma).mean() middle = close.rolling(band_length).mean() stdev = close.rolling(band_length).std(ddof=0) upper = middle + stdev_mult * stdev lower = middle - stdev_mult * stdev true_range = pd.concat([(high - low), (high - close.shift(1)).abs(), (low - close.shift(1)).abs()], axis=1).max(axis=1) atr_pct = true_range.rolling(atr_length).mean() / close entries: list[str | None] = [None] * len(candles) exits: list[str | None] = [None] * len(candles) stops: list[float | None] = [stop_loss_pct] * len(candles) takes: list[float | None] = [take_profit_pct] * len(candles) for index, candle in enumerate(candles): values = (trend.iloc[index], middle.iloc[index], upper.iloc[index], lower.iloc[index], atr_pct.iloc[index]) if any(value != value for value in values): continue if float(atr_pct.iloc[index]) > max_atr_pct: continue if candle.close > float(trend.iloc[index]) and candle.close <= float(lower.iloc[index]): entries[index] = "long" elif candle.close < float(trend.iloc[index]) and candle.close >= float(upper.iloc[index]): entries[index] = "short" if candle.close >= float(middle.iloc[index]): exits[index] = "long" elif candle.close <= float(middle.iloc[index]): exits[index] = "short" return with_max_hold(entries, exits, stops, takes, max_hold_bars) def build_donchian_false_breakout_signals( candles: list[Candle], *, lookback: int, reclaim_bars: int, stop_loss_pct: float, take_profit_pct: float, max_hold_bars: int, ) -> tuple[list[str | None], list[str | None], list[float | None], list[float | None]]: close = pd.Series([candle.close for candle in candles], dtype=float) high = pd.Series([candle.high for candle in candles], dtype=float) low = pd.Series([candle.low for candle in candles], dtype=float) prior_high = high.shift(1).rolling(lookback).max() prior_low = low.shift(1).rolling(lookback).min() entries: list[str | None] = [None] * len(candles) exits: list[str | None] = [None] * len(candles) stops: list[float | None] = [stop_loss_pct] * len(candles) takes: list[float | None] = [take_profit_pct] * len(candles) broke_high_at: int | None = None broke_low_at: int | None = None for index, candle in enumerate(candles): if prior_high.iloc[index] != prior_high.iloc[index] or prior_low.iloc[index] != prior_low.iloc[index]: continue if candle.high > float(prior_high.iloc[index]): broke_high_at = index if candle.low < float(prior_low.iloc[index]): broke_low_at = index if broke_high_at is not None and index - broke_high_at <= reclaim_bars and candle.close < float(prior_high.iloc[index]): entries[index] = "short" broke_high_at = None if broke_low_at is not None and index - broke_low_at <= reclaim_bars and candle.close > float(prior_low.iloc[index]): entries[index] = "long" broke_low_at = None if broke_high_at is not None and index - broke_high_at > reclaim_bars: broke_high_at = None if broke_low_at is not None and index - broke_low_at > reclaim_bars: broke_low_at = None return with_max_hold(entries, exits, stops, takes, max_hold_bars) def with_max_hold( entries: list[str | None], exits: list[str | None], stops: list[float | None], takes: list[float | None], max_hold_bars: int, ) -> tuple[list[str | None], list[str | None], list[float | None], list[float | None]]: return entries, exits, stops, takes def exit_equity(side: str, margin: float, entry_price: float, exit_price: float) -> float: if side == "long": price_return = exit_price / entry_price - 1.0 else: price_return = (entry_price - exit_price) / entry_price return margin * (1.0 + LEVERAGE * price_return) def run_strategy(candles: list[Candle], strategy: Strategy) -> BacktestResult: entries, exits, stops, takes = strategy.build_signals(candles) trades: list[dict[str, object]] = [] equity_points: list[dict[str, object]] = [{"ts": pd.to_datetime(candles[0].ts, unit="ms", utc=True), "equity": INITIAL_EQUITY}] equity = INITIAL_EQUITY position: dict[str, object] | None = None pending_entry: str | None = None pending_exit = False long_entries = 0 short_entries = 0 for index in range(strategy.warmup_bars, len(candles)): candle = candles[index] if pending_exit and position is not None: equity = close_position(trades, position, candle.ts, candle.open, "signal") position = None pending_exit = False equity_points.append({"ts": pd.to_datetime(candle.ts, unit="ms", utc=True), "equity": equity}) if pending_entry is not None and position is None and equity > 0.0: position = { "side": pending_entry, "entry_index": index, "entry_ts": candle.ts, "entry_price": candle.open, "margin": equity, "stop_price": candle.open * (1.0 - float(stops[index]) if pending_entry == "long" else 1.0 + float(stops[index])) if stops[index] is not None else None, "take_price": candle.open * (1.0 + float(takes[index]) if pending_entry == "long" else 1.0 - float(takes[index])) if takes[index] is not None else None, } long_entries += 1 if pending_entry == "long" else 0 short_entries += 1 if pending_entry == "short" else 0 pending_entry = None if position is not None: side = str(position["side"]) stop_price = position["stop_price"] take_price = position["take_price"] stop_hit = stop_price is not None and ( (side == "long" and candle.low <= float(stop_price)) or (side == "short" and candle.high >= float(stop_price)) ) take_hit = take_price is not None and ( (side == "long" and candle.high >= float(take_price)) or (side == "short" and candle.low <= float(take_price)) ) if stop_hit or take_hit: exit_price = float(stop_price if stop_hit else take_price) equity = close_position(trades, position, candle.ts, exit_price, "stop" if stop_hit else "take_profit") position = None equity_points.append({"ts": pd.to_datetime(candle.ts, unit="ms", utc=True), "equity": equity}) if index == len(candles) - 1 or equity <= 0.0: continue entry_side = entries[index] if position is not None: max_hold = int(strategy.params.get("max_hold_bars", 0) or 0) reverse = entry_side is not None and entry_side != position["side"] stale = max_hold > 0 and index - int(position["entry_index"]) >= max_hold exit_signal = exits[index] in ("both", position["side"]) if exit_signal or reverse or stale: pending_exit = True pending_entry = entry_side if reverse else None continue if entry_side is not None: pending_entry = entry_side if position is not None: equity = close_position(trades, position, candles[-1].ts, candles[-1].close, "final") equity_points.append({"ts": pd.to_datetime(candles[-1].ts, unit="ms", utc=True), "equity": equity}) return BacktestResult(trades=trades, equity=pd.DataFrame(equity_points), long_entries=long_entries, short_entries=short_entries) def close_position( trades: list[dict[str, object]], position: dict[str, object], exit_ts: int, exit_price: float, reason: str, ) -> float: margin = float(position["margin"]) gross = exit_equity(str(position["side"]), margin, float(position["entry_price"]), exit_price) net = gross - margin * ROUNDTRIP_COST pnl = net - margin trades.append( { "side": str(position["side"]), "entry_time": format_ts(int(position["entry_ts"])), "exit_time": format_ts(exit_ts), "entry_ts": int(position["entry_ts"]), "exit_ts": exit_ts, "entry_price": float(position["entry_price"]), "exit_price": exit_price, "return": pnl / margin, "pnl": pnl, "exit_reason": reason, } ) return net def max_drawdown(values: list[float]) -> float: peak = values[0] drawdown = 0.0 for value in values: peak = max(peak, value) drawdown = max(drawdown, (peak - value) / peak if peak > 0.0 else 0.0) return drawdown def metrics_for(equity: pd.DataFrame, trades: list[dict[str, object]], first_ts: int, last_ts: int) -> dict[str, object]: years = (last_ts - first_ts) / 86_400_000 / 365 total_return = float(equity["equity"].iloc[-1] / equity["equity"].iloc[0] - 1.0) annualized = (1.0 + total_return) ** (1.0 / years) - 1.0 if total_return > -1.0 and years > 0.0 else -1.0 dd = max_drawdown([float(value) for value in equity["equity"]]) wins = [float(trade["return"]) for trade in trades if float(trade["return"]) > 0.0] losses = [float(trade["return"]) for trade in trades if float(trade["return"]) < 0.0] avg_win = sum(wins) / len(wins) if wins else 0.0 avg_loss = abs(sum(losses) / len(losses)) if losses else 0.0 months = max(years * 12.0, 1.0 / 30.0) worst_month_label, worst_month_return = worst_month(equity) return { "net_total_return": total_return, "net_annualized_return": annualized, "net_max_drawdown": dd, "net_calmar": annualized / dd if dd else 0.0, "win_rate": len(wins) / len(trades) if trades else 0.0, "payoff_ratio": avg_win / avg_loss if avg_loss else 0.0, "profit_factor": sum(wins) / abs(sum(losses)) if losses else 0.0, "risk_reward_ratio": total_return / dd if dd else 0.0, "trades": len(trades), "trades_per_month": len(trades) / months, "worst_month": worst_month_label, "worst_month_return": worst_month_return, } def worst_month(equity: pd.DataFrame) -> tuple[str, float]: monthly = equity.set_index("ts")["equity"].resample("ME").last().ffill().pct_change().dropna() if not len(monthly): return "", 0.0 index = monthly.idxmin() return index.strftime("%Y-%m"), float(monthly.loc[index]) def horizon_slice(equity: pd.DataFrame, trades: list[dict[str, object]], last_ts: int, offset: pd.DateOffset | None) -> tuple[pd.DataFrame, list[dict[str, object]], int]: if offset is None: return equity.copy(), trades, int(equity["ts"].iloc[0].timestamp() * 1000) cutoff = pd.to_datetime(last_ts, unit="ms", utc=True) - offset before = equity[equity["ts"] <= cutoff] start_equity = float(before["equity"].iloc[-1]) if len(before) else float(equity["equity"].iloc[0]) frame = pd.concat( [ pd.DataFrame([{"ts": cutoff, "equity": start_equity}]), equity[equity["ts"] > cutoff][["ts", "equity"]], ], ignore_index=True, ) return frame, [trade for trade in trades if int(trade["exit_ts"]) >= int(cutoff.timestamp() * 1000)], int(cutoff.timestamp() * 1000) def build_strategies() -> list[Strategy]: strategies: list[Strategy] = [] for trend in (96, 192): for entry in (5.0, 10.0): for exit_level in (55.0,): for stop in (0.006, 0.01): params = {"trend_sma": trend, "entry_rsi": entry, "exit_rsi": exit_level, "stop_loss_pct": stop, "max_hold_bars": 96} strategies.append(Strategy("RSI2 both", f"rsi2-both-t{trend}-e{entry:g}-x{exit_level:g}-sl{stop:g}", trend, params, lambda c, p=params: build_rsi2_signals(c, **p))) for fast, slow in ((8, 34), (13, 55), (21, 89)): for stop in (0.01,): params = {"fast": fast, "slow": slow, "stop_loss_pct": stop} strategies.append(Strategy("MA cross both", f"ma-cross-both-f{fast}-s{slow}-sl{stop:g}", slow, params, lambda c, p=params: build_ma_cross_signals(c, **p))) for window in (48, 96): for entry_z in (1.5, 2.0): for stop in (0.006,): params = {"window": window, "entry_z": entry_z, "exit_z": 0.25, "stop_loss_pct": stop, "max_hold_bars": window} strategies.append(Strategy("VWAP reversion", f"vwap-reversion-w{window}-z{entry_z:g}-sl{stop:g}", window, params, lambda c, p=params: build_vwap_reversion_signals(c, **p))) for trend in (192,): for length in (48, 96): for max_atr in (0.01,): for stop, take in ((0.006, 0.009), (0.01, 0.015)): params = { "trend_sma": trend, "band_length": length, "stdev_mult": 2.0, "atr_length": length, "max_atr_pct": max_atr, "stop_loss_pct": stop, "take_profit_pct": take, "max_hold_bars": length, } strategies.append(Strategy("BBMR risk-filtered", f"bbmr-risk-t{trend}-l{length}-atr{max_atr:g}-sl{stop:g}-tp{take:g}", max(trend, length), params, lambda c, p=params: build_bbmr_signals(c, **p))) for lookback in (48, 96): for reclaim in (2, 4): for stop, take in ((0.006, 0.009),): params = {"lookback": lookback, "reclaim_bars": reclaim, "stop_loss_pct": stop, "take_profit_pct": take, "max_hold_bars": lookback} strategies.append(Strategy("Donchian false breakout", f"donchian-false-l{lookback}-r{reclaim}-sl{stop:g}-tp{take:g}", lookback, params, lambda c, p=params: build_donchian_false_breakout_signals(c, **p))) return strategies def markdown_table(frame: pd.DataFrame) -> str: columns = list(frame.columns) rows = [columns, ["---" for _ in columns]] for record in frame.to_dict("records"): rows.append([record[column] for column in columns]) return "\n".join("| " + " | ".join(format_cell(value) for value in row) + " |" for row in rows) def format_cell(value: object) -> str: if isinstance(value, float): return f"{value:.6g}" return str(value).replace("|", "\\|") def write_report(total: pd.DataFrame, horizons: pd.DataFrame, report_files: list[Path], command: str) -> str: executable = total[total["directly_liveable"] & (total["supports_short"]) & (total["trades_per_month"] >= 10.0)] positive_fast = executable[executable["net_total_return"] > 0.0] positive_slow = total[ (total["directly_liveable"]) & (total["supports_short"]) & (~total["needs_synthetic_bookkeeping"]) & (total["net_total_return"] > 0.0) & (total["trades_per_month"] < 10.0) ].head(10) top = executable.head(15) family_best = executable.sort_values(["family", "net_calmar", "net_annualized_return"], ascending=[True, False, False]).groupby("family", as_index=False).head(1) lines = [ "# Executable bidirectional strategy search", "", f"Run command: `{command}`", "Execution safety: local CSV backtest only; no private API, no order submission, no commit.", f"Cost model: fixed roundtrip cost on margin `{ROUNDTRIP_COST}`; leverage `{LEVERAGE}x`.", "", "Output files:", *[f"- `{path}`" for path in report_files], "", "Eligibility: directly liveable, supports short, no synthetic portfolio bookkeeping, at least 10 trades/month.", f"Decision: positive >=10 trades/month candidates found: `{len(positive_fast)}`.", "", "Top executable candidates:", markdown_table(top[["family", "symbol", "bar", "name", "trades_per_month", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "win_rate", "payoff_ratio", "profit_factor", "risk_reward_ratio", "worst_month_return"]]), "", "Family leaders:", markdown_table(family_best[["family", "symbol", "bar", "name", "trades_per_month", "net_calmar", "net_annualized_return", "net_max_drawdown", "profit_factor"]]), "", "Positive but below 10 trades/month:", markdown_table(positive_slow[["family", "symbol", "bar", "name", "trades_per_month", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "profit_factor", "worst_month_return"]]), "", "Recent horizon leaders:", markdown_table( horizons[horizons["name"].isin(set(top.head(8)["name"]))] .sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) .groupby("horizon", observed=True) .head(5)[["horizon", "family", "symbol", "bar", "name", "trades_per_month", "net_total_return", "net_annualized_return", "net_max_drawdown", "net_calmar", "worst_month_return"]] ), ] return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=Path, default=OUTPUT_DIR) parser.add_argument("--symbols", nargs="+", default=list(SYMBOLS)) parser.add_argument("--bars", nargs="+", default=list(BARS)) args = parser.parse_args() strategies = build_strategies() total_rows: list[dict[str, object]] = [] horizon_rows: list[dict[str, object]] = [] for symbol in args.symbols: for bar in args.bars: candles = load_candles(symbol, bar) for index, strategy in enumerate(strategies, start=1): result = run_strategy(candles, strategy) if not len(result.equity): continue full_metrics = metrics_for(result.equity, result.trades, candles[0].ts, candles[-1].ts) row_base = { "family": strategy.family, "symbol": symbol, "bar": bar, "name": strategy.name, "first_candle": format_ts(candles[0].ts), "last_candle": format_ts(candles[-1].ts), "years": (candles[-1].ts - candles[0].ts) / 86_400_000 / 365, "directly_liveable": True, "needs_synthetic_bookkeeping": False, "supports_short": result.short_entries > 0, "long_entries": result.long_entries, "short_entries": result.short_entries, "order_intent": "single_symbol_long_short_entry_exit", "params_json": json.dumps(strategy.params, separators=(",", ":")), } total_rows.append({**row_base, **full_metrics}) for horizon, offset in HORIZONS: frame, trades, start_ts = horizon_slice(result.equity, result.trades, candles[-1].ts, offset) horizon_rows.append( { **row_base, "horizon": horizon, "horizon_start": format_ts(start_ts), "horizon_end": format_ts(candles[-1].ts), **metrics_for(frame, trades, start_ts, candles[-1].ts), } ) print(f"done {symbol} {bar} {index}/{len(strategies)} {strategy.family} {strategy.name}") total = pd.DataFrame(total_rows).sort_values( ["directly_liveable", "supports_short", "trades_per_month", "net_calmar", "net_annualized_return", "profit_factor"], ascending=[False, False, False, False, False, False], ) total["candidate_tier"] = "other" total.loc[ (total["directly_liveable"]) & (total["supports_short"]) & (total["trades_per_month"] >= 10.0) & (total["net_total_return"] > 0.0), "candidate_tier", ] = "positive_10pm" total.loc[ (total["directly_liveable"]) & (total["supports_short"]) & (total["trades_per_month"] < 10.0) & (total["net_total_return"] > 0.0), "candidate_tier", ] = "positive_sub10pm" total.loc[ (total["directly_liveable"]) & (total["supports_short"]) & (total["trades_per_month"] >= 10.0) & (total["net_total_return"] <= 0.0), "candidate_tier", ] = "nonpositive_10pm" total["candidate_tier"] = pd.Categorical( total["candidate_tier"], categories=["positive_10pm", "positive_sub10pm", "nonpositive_10pm", "other"], ordered=True, ) executable = total[(total["directly_liveable"]) & (total["supports_short"]) & (total["trades_per_month"] >= 10.0)] executable = executable.sort_values(["net_calmar", "net_annualized_return", "profit_factor"], ascending=[False, False, False]) total = total.sort_values( ["candidate_tier", "net_calmar", "net_annualized_return", "trades_per_month", "profit_factor"], ascending=[True, False, False, False, False], ).reset_index(drop=True) horizons = pd.DataFrame(horizon_rows) horizons["horizon"] = pd.Categorical(horizons["horizon"], categories=[label for label, _ in HORIZONS], ordered=True) horizons = horizons.sort_values(["horizon", "net_calmar", "net_annualized_return"], ascending=[True, False, False]) args.output_dir.mkdir(parents=True, exist_ok=True) total_path = args.output_dir / "executable-bidir-total.csv" horizon_path = args.output_dir / "executable-bidir-horizons.csv" top_path = args.output_dir / "executable-bidir-top.csv" json_path = args.output_dir / "executable-bidir-top.json" report_path = args.output_dir / "executable-bidir-report.md" output_files = [total_path, horizon_path, top_path, json_path, report_path] total.to_csv(total_path, index=False) horizons.to_csv(horizon_path, index=False) total.head(50).to_csv(top_path, index=False) json_path.write_text(json.dumps(total.head(20).to_dict("records"), indent=2), encoding="utf-8") command = f"rtk .venv/bin/python {Path(__file__).as_posix()} --symbols {' '.join(args.symbols)} --bars {' '.join(args.bars)}" report_path.write_text(write_report(total, horizons, output_files, command), encoding="utf-8") print(total.head(20).to_string(index=False)) return 0 if __name__ == "__main__": raise SystemExit(main())