from __future__ import annotations import json import sys from dataclasses import dataclass from pathlib import Path import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts import explore_ultrashort as explore from scripts import search_eth_btc_nextgen_variants as nextgen from scripts import search_eth_microstructure_variants as micro REPORT_DIR = Path("reports/eth-exploration") TARGET_NAME = "switch-l30-r96_q0.15_mf0.25_us" COST_MODEL = "maker_taker" ROUNDTRIP_COST_ON_MARGIN = 0.0021 NEXTGEN_LEGS = ( "btc_trend_eth_rsi:15m:eth-btc-rsi-filter-et50-l3.0-x55.0-bt480-bm240-br0.0", "btc_shock_guard_eth_rsi:15m:eth-btc-shock-filter-et50-l3.0-x55.0-bt480-bm240-br0.01-sw96-sv0.01-sd0.05", ) MICRO_NAME = "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.25-us" LOOKBACK_DAYS = 30 @dataclass(frozen=True) class TradeEvent: source: str leg: str side: str entry_time: pd.Timestamp exit_time: pd.Timestamp exit_date: pd.Timestamp entry_price: float exit_price: float net_return: float def daily_equity(frame: pd.DataFrame, index: pd.DatetimeIndex) -> pd.Series: series = frame.set_index("ts")["equity"].sort_index() daily = series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).ffill() daily.iloc[0] = explore.INITIAL_EQUITY return daily def parse_time(value: object) -> pd.Timestamp: return pd.to_datetime(str(value), utc=True) def trade_from_nextgen(leg: str, trade: dict[str, object]) -> TradeEvent: net_return = (float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN * float(trade.get("cost_weight", 1.0))) * 0.5 exit_time = parse_time(trade["exit_time"]) return TradeEvent( source="nextgen", leg=leg, side=str(trade["side"]).lower(), entry_time=parse_time(trade["entry_time"]), exit_time=exit_time, exit_date=exit_time.normalize(), entry_price=float(trade["entry_price"]), exit_price=float(trade["exit_price"]), net_return=net_return, ) def trade_from_micro(trade: dict[str, object]) -> TradeEvent: net_return = float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN * float(trade["cost_weight"]) exit_time = parse_time(trade["exit_time"]) return TradeEvent( source="micro", leg=MICRO_NAME, side=str(trade["side"]).lower(), entry_time=parse_time(trade["entry_time"]), exit_time=exit_time, exit_date=exit_time.normalize(), entry_price=float(trade["entry_price"]), exit_price=float(trade["exit_price"]), net_return=net_return, ) def load_components() -> tuple[pd.Series, pd.Series, pd.Series, list[TradeEvent], list[TradeEvent]]: published_equity = pd.read_csv(REPORT_DIR / "eth-btc-nextgen-equity.csv") base = published_equity[(published_equity["name"] == "equal-2-c0003") & (published_equity["cost_model"] == COST_MODEL)] index = pd.DatetimeIndex(pd.to_datetime(base["date"], utc=True)) strategies = { f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}": strategy for strategy in nextgen.build_strategies() } data = { (symbol, "15m"): nextgen.load_candles(symbol, "15m", 10.0) for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP") } leg_series = [] nextgen_trades = [] for leg in NEXTGEN_LEGS: result = nextgen.run_strategy(strategies[leg], data) leg_series.append(daily_equity(explore.cost_adjusted_trade_equity_frame(result, ROUNDTRIP_COST_ON_MARGIN), index)) nextgen_trades.extend(trade_from_nextgen(leg, trade) for trade in result.trades) nextgen_returns = pd.DataFrame([series.pct_change().fillna(0.0) for series in leg_series]).T.mean(axis=1) nextgen_series = explore.INITIAL_EQUITY * (1.0 + nextgen_returns).cumprod() nextgen_series.iloc[0] = explore.INITIAL_EQUITY candles = micro._load_candles(micro.SYMBOL, micro.BAR) requested = int(10.0 * 365 * 24 * 60 / 15) candles = candles[-requested:] variant = {variant.name: variant for variant in micro.build_variants()}[MICRO_NAME] micro_result = variant.run(candles) micro_series = daily_equity(micro.cost_equity_frame(micro_result, ROUNDTRIP_COST_ON_MARGIN), index) micro_trades = [trade_from_micro(trade) for trade in micro_result.trades] nextgen_regime = nextgen_series / nextgen_series.shift(LOOKBACK_DAYS) - 1.0 micro_regime = micro_series / micro_series.shift(LOOKBACK_DAYS) - 1.0 active = ((nextgen_regime < 0.0) & (micro_regime > 0.0)).shift(1).fillna(False).astype(bool) return active, nextgen_series, micro_series, nextgen_trades, micro_trades def selected_trades(active: pd.Series, nextgen_trades: list[TradeEvent], micro_trades: list[TradeEvent]) -> list[TradeEvent]: selected = [ trade for trade in nextgen_trades if not bool(active.reindex([trade.exit_date]).fillna(False).iloc[0]) ] selected.extend( trade for trade in micro_trades if bool(active.reindex([trade.exit_date]).fillna(False).iloc[0]) ) return sorted(selected, key=lambda trade: (trade.entry_time, trade.exit_time, trade.source, trade.leg)) def trade_frame(trades: list[TradeEvent]) -> pd.DataFrame: return pd.DataFrame( [ { "source": trade.source, "leg": trade.leg, "side": trade.side, "entry_time": trade.entry_time.strftime("%Y-%m-%d %H:%M:%S%z"), "exit_time": trade.exit_time.strftime("%Y-%m-%d %H:%M:%S%z"), "exit_date": trade.exit_date.strftime("%Y-%m-%d"), "entry_price": trade.entry_price, "exit_price": trade.exit_price, "net_return": trade.net_return, } for trade in trades ] ) def event_maps(trades: list[TradeEvent]) -> tuple[dict[pd.Timestamp, list[TradeEvent]], dict[pd.Timestamp, list[TradeEvent]]]: entries: dict[pd.Timestamp, list[TradeEvent]] = {} exits: dict[pd.Timestamp, list[TradeEvent]] = {} for trade in trades: entries.setdefault(trade.entry_time, []).append(trade) exits.setdefault(trade.exit_time, []).append(trade) return entries, exits def source_count(trades: list[TradeEvent], source: str) -> int: return sum(1 for trade in trades if trade.source == source) def labels(trades: list[TradeEvent]) -> str: return ";".join(f"{trade.source}:{trade.side}" for trade in trades) def stream_frame(active: pd.Series, selected: list[TradeEvent], raw_trades: list[TradeEvent]) -> pd.DataFrame: candles = micro._load_candles(micro.SYMBOL, micro.BAR) first = active.index[0] last = active.index[-1] + pd.Timedelta(days=1) selected_entries, selected_exits = event_maps(selected) raw_entries, raw_exits = event_maps(raw_trades) rows = [] for candle in candles: ts = pd.to_datetime(candle.ts, unit="ms", utc=True) if ts < first or ts >= last: continue date = ts.normalize() micro_active = bool(active.reindex([date]).fillna(False).iloc[0]) selected_entry_trades = selected_entries.get(ts, []) selected_exit_trades = selected_exits.get(ts, []) raw_entry_trades = raw_entries.get(ts, []) raw_exit_trades = raw_exits.get(ts, []) rows.append( { "time": ts.strftime("%Y-%m-%d %H:%M:%S%z"), "date": date.strftime("%Y-%m-%d"), "active_engine": "micro" if micro_active else "nextgen", "open": candle.open, "high": candle.high, "low": candle.low, "close": candle.close, "selected_entry_count": len(selected_entry_trades), "selected_exit_count": len(selected_exit_trades), "selected_entry_labels": labels(selected_entry_trades), "selected_exit_labels": labels(selected_exit_trades), "raw_nextgen_entry_count": source_count(raw_entry_trades, "nextgen"), "raw_nextgen_exit_count": source_count(raw_exit_trades, "nextgen"), "raw_micro_entry_count": source_count(raw_entry_trades, "micro"), "raw_micro_exit_count": source_count(raw_exit_trades, "micro"), } ) return pd.DataFrame(rows) def write_report(stream: pd.DataFrame, trades: pd.DataFrame, active: pd.Series) -> str: source_counts = trades["source"].value_counts().to_dict() side_counts = trades["side"].value_counts().to_dict() entry_rows = stream[stream["selected_entry_count"] > 0] exit_rows = stream[stream["selected_exit_count"] > 0] lines = [ "# ETH nextgen micro signal stream", "", f"Target: `{TARGET_NAME}` / `{COST_MODEL}`", "", "This is a read-only signal stream for cross-checking strategy timing. It does not call OKX private APIs and does not place orders.", "", "## Outputs", "", "- `reports/eth-exploration/eth-nextgen-micro-signal-stream.csv`", "- `reports/eth-exploration/eth-nextgen-micro-selected-trades.csv`", "- `reports/eth-exploration/eth-nextgen-micro-signal-stream-summary.json`", "", "## Summary", "", f"- 15m rows: `{len(stream)}`", f"- Selected trades: `{len(trades)}`", f"- Active micro days: `{int(active.sum())}`", f"- Active nextgen days: `{int((~active).sum())}`", f"- Selected source counts: `{json.dumps(source_counts, sort_keys=True)}`", f"- Selected side counts: `{json.dumps(side_counts, sort_keys=True)}`", f"- Entry candles with selected trades: `{len(entry_rows)}`", f"- Exit candles with selected trades: `{len(exit_rows)}`", "", "## Interpretation", "", "The stream records which engine the switch rule selects on each 15m candle's UTC date. A trade is selected by the engine active on its exit date, matching the validated portfolio accounting path.", "", ] return "\n".join(lines) def main() -> int: active, _, _, nextgen_trades, micro_trades = load_components() raw_trades = sorted([*nextgen_trades, *micro_trades], key=lambda trade: (trade.entry_time, trade.exit_time)) selected = selected_trades(active, nextgen_trades, micro_trades) trades = trade_frame(selected) stream = stream_frame(active, selected, raw_trades) stream_path = REPORT_DIR / "eth-nextgen-micro-signal-stream.csv" trades_path = REPORT_DIR / "eth-nextgen-micro-selected-trades.csv" summary_path = REPORT_DIR / "eth-nextgen-micro-signal-stream-summary.json" report_path = REPORT_DIR / "eth-nextgen-micro-signal-stream.md" summary = { "target": TARGET_NAME, "cost_model": COST_MODEL, "stream_rows": len(stream), "selected_trades": len(trades), "active_micro_days": int(active.sum()), "active_nextgen_days": int((~active).sum()), "source_counts": trades["source"].value_counts().to_dict(), "side_counts": trades["side"].value_counts().to_dict(), "selected_entry_candles": int((stream["selected_entry_count"] > 0).sum()), "selected_exit_candles": int((stream["selected_exit_count"] > 0).sum()), } stream.to_csv(stream_path, index=False) trades.to_csv(trades_path, index=False) summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True), encoding="utf-8") report_path.write_text(write_report(stream, trades, active), encoding="utf-8") print(report_path) print(json.dumps(summary, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())