| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- from __future__ import annotations
- import json
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from scripts import explore_ultrashort as explore
- from scripts import search_eth_btc_nextgen_variants as nextgen
- from scripts import search_eth_microstructure_variants as micro
- REPORT_DIR = Path("reports/eth-exploration")
- TARGET_NAME = "switch-l30-r96_q0.15_mf0.25_us"
- COST_MODEL = "maker_taker"
- ROUNDTRIP_COST_ON_MARGIN = 0.0021
- NEXTGEN_LEGS = (
- "btc_trend_eth_rsi:15m:eth-btc-rsi-filter-et50-l3.0-x55.0-bt480-bm240-br0.0",
- "btc_shock_guard_eth_rsi:15m:eth-btc-shock-filter-et50-l3.0-x55.0-bt480-bm240-br0.01-sw96-sv0.01-sd0.05",
- )
- MICRO_NAME = "atr-compress-expand-r96-q0.15-sl0.008-tp0.016-mf0.25-us"
- LOOKBACK_DAYS = 30
- @dataclass(frozen=True)
- class TradeEvent:
- source: str
- leg: str
- side: str
- entry_time: pd.Timestamp
- exit_time: pd.Timestamp
- exit_date: pd.Timestamp
- entry_price: float
- exit_price: float
- net_return: float
- def daily_equity(frame: pd.DataFrame, index: pd.DatetimeIndex) -> pd.Series:
- series = frame.set_index("ts")["equity"].sort_index()
- daily = series.reindex(index.union(series.index)).sort_index().ffill().reindex(index).ffill()
- daily.iloc[0] = explore.INITIAL_EQUITY
- return daily
- def parse_time(value: object) -> pd.Timestamp:
- return pd.to_datetime(str(value), utc=True)
- def trade_from_nextgen(leg: str, trade: dict[str, object]) -> TradeEvent:
- net_return = (float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN * float(trade.get("cost_weight", 1.0))) * 0.5
- exit_time = parse_time(trade["exit_time"])
- return TradeEvent(
- source="nextgen",
- leg=leg,
- side=str(trade["side"]).lower(),
- entry_time=parse_time(trade["entry_time"]),
- exit_time=exit_time,
- exit_date=exit_time.normalize(),
- entry_price=float(trade["entry_price"]),
- exit_price=float(trade["exit_price"]),
- net_return=net_return,
- )
- def trade_from_micro(trade: dict[str, object]) -> TradeEvent:
- net_return = float(trade["return_pct"]) / 100.0 - ROUNDTRIP_COST_ON_MARGIN * float(trade["cost_weight"])
- exit_time = parse_time(trade["exit_time"])
- return TradeEvent(
- source="micro",
- leg=MICRO_NAME,
- side=str(trade["side"]).lower(),
- entry_time=parse_time(trade["entry_time"]),
- exit_time=exit_time,
- exit_date=exit_time.normalize(),
- entry_price=float(trade["entry_price"]),
- exit_price=float(trade["exit_price"]),
- net_return=net_return,
- )
- def load_components() -> tuple[pd.Series, pd.Series, pd.Series, list[TradeEvent], list[TradeEvent]]:
- published_equity = pd.read_csv(REPORT_DIR / "eth-btc-nextgen-equity.csv")
- base = published_equity[(published_equity["name"] == "equal-2-c0003") & (published_equity["cost_model"] == COST_MODEL)]
- index = pd.DatetimeIndex(pd.to_datetime(base["date"], utc=True))
- strategies = {
- f"{strategy.family}:{strategy.bar}:{strategy.candidate.name}": strategy
- for strategy in nextgen.build_strategies()
- }
- data = {
- (symbol, "15m"): nextgen.load_candles(symbol, "15m", 10.0)
- for symbol in ("ETH-USDT-SWAP", "BTC-USDT-SWAP")
- }
- leg_series = []
- nextgen_trades = []
- for leg in NEXTGEN_LEGS:
- result = nextgen.run_strategy(strategies[leg], data)
- leg_series.append(daily_equity(explore.cost_adjusted_trade_equity_frame(result, ROUNDTRIP_COST_ON_MARGIN), index))
- nextgen_trades.extend(trade_from_nextgen(leg, trade) for trade in result.trades)
- nextgen_returns = pd.DataFrame([series.pct_change().fillna(0.0) for series in leg_series]).T.mean(axis=1)
- nextgen_series = explore.INITIAL_EQUITY * (1.0 + nextgen_returns).cumprod()
- nextgen_series.iloc[0] = explore.INITIAL_EQUITY
- candles = micro._load_candles(micro.SYMBOL, micro.BAR)
- requested = int(10.0 * 365 * 24 * 60 / 15)
- candles = candles[-requested:]
- variant = {variant.name: variant for variant in micro.build_variants()}[MICRO_NAME]
- micro_result = variant.run(candles)
- micro_series = daily_equity(micro.cost_equity_frame(micro_result, ROUNDTRIP_COST_ON_MARGIN), index)
- micro_trades = [trade_from_micro(trade) for trade in micro_result.trades]
- nextgen_regime = nextgen_series / nextgen_series.shift(LOOKBACK_DAYS) - 1.0
- micro_regime = micro_series / micro_series.shift(LOOKBACK_DAYS) - 1.0
- active = ((nextgen_regime < 0.0) & (micro_regime > 0.0)).shift(1).fillna(False).astype(bool)
- return active, nextgen_series, micro_series, nextgen_trades, micro_trades
- def selected_trades(active: pd.Series, nextgen_trades: list[TradeEvent], micro_trades: list[TradeEvent]) -> list[TradeEvent]:
- selected = [
- trade for trade in nextgen_trades if not bool(active.reindex([trade.exit_date]).fillna(False).iloc[0])
- ]
- selected.extend(
- trade for trade in micro_trades if bool(active.reindex([trade.exit_date]).fillna(False).iloc[0])
- )
- return sorted(selected, key=lambda trade: (trade.entry_time, trade.exit_time, trade.source, trade.leg))
- def trade_frame(trades: list[TradeEvent]) -> pd.DataFrame:
- return pd.DataFrame(
- [
- {
- "source": trade.source,
- "leg": trade.leg,
- "side": trade.side,
- "entry_time": trade.entry_time.strftime("%Y-%m-%d %H:%M:%S%z"),
- "exit_time": trade.exit_time.strftime("%Y-%m-%d %H:%M:%S%z"),
- "exit_date": trade.exit_date.strftime("%Y-%m-%d"),
- "entry_price": trade.entry_price,
- "exit_price": trade.exit_price,
- "net_return": trade.net_return,
- }
- for trade in trades
- ]
- )
- def event_maps(trades: list[TradeEvent]) -> tuple[dict[pd.Timestamp, list[TradeEvent]], dict[pd.Timestamp, list[TradeEvent]]]:
- entries: dict[pd.Timestamp, list[TradeEvent]] = {}
- exits: dict[pd.Timestamp, list[TradeEvent]] = {}
- for trade in trades:
- entries.setdefault(trade.entry_time, []).append(trade)
- exits.setdefault(trade.exit_time, []).append(trade)
- return entries, exits
- def source_count(trades: list[TradeEvent], source: str) -> int:
- return sum(1 for trade in trades if trade.source == source)
- def labels(trades: list[TradeEvent]) -> str:
- return ";".join(f"{trade.source}:{trade.side}" for trade in trades)
- def stream_frame(active: pd.Series, selected: list[TradeEvent], raw_trades: list[TradeEvent]) -> pd.DataFrame:
- candles = micro._load_candles(micro.SYMBOL, micro.BAR)
- first = active.index[0]
- last = active.index[-1] + pd.Timedelta(days=1)
- selected_entries, selected_exits = event_maps(selected)
- raw_entries, raw_exits = event_maps(raw_trades)
- rows = []
- for candle in candles:
- ts = pd.to_datetime(candle.ts, unit="ms", utc=True)
- if ts < first or ts >= last:
- continue
- date = ts.normalize()
- micro_active = bool(active.reindex([date]).fillna(False).iloc[0])
- selected_entry_trades = selected_entries.get(ts, [])
- selected_exit_trades = selected_exits.get(ts, [])
- raw_entry_trades = raw_entries.get(ts, [])
- raw_exit_trades = raw_exits.get(ts, [])
- rows.append(
- {
- "time": ts.strftime("%Y-%m-%d %H:%M:%S%z"),
- "date": date.strftime("%Y-%m-%d"),
- "active_engine": "micro" if micro_active else "nextgen",
- "open": candle.open,
- "high": candle.high,
- "low": candle.low,
- "close": candle.close,
- "selected_entry_count": len(selected_entry_trades),
- "selected_exit_count": len(selected_exit_trades),
- "selected_entry_labels": labels(selected_entry_trades),
- "selected_exit_labels": labels(selected_exit_trades),
- "raw_nextgen_entry_count": source_count(raw_entry_trades, "nextgen"),
- "raw_nextgen_exit_count": source_count(raw_exit_trades, "nextgen"),
- "raw_micro_entry_count": source_count(raw_entry_trades, "micro"),
- "raw_micro_exit_count": source_count(raw_exit_trades, "micro"),
- }
- )
- return pd.DataFrame(rows)
- def write_report(stream: pd.DataFrame, trades: pd.DataFrame, active: pd.Series) -> str:
- source_counts = trades["source"].value_counts().to_dict()
- side_counts = trades["side"].value_counts().to_dict()
- entry_rows = stream[stream["selected_entry_count"] > 0]
- exit_rows = stream[stream["selected_exit_count"] > 0]
- lines = [
- "# ETH nextgen micro signal stream",
- "",
- f"Target: `{TARGET_NAME}` / `{COST_MODEL}`",
- "",
- "This is a read-only signal stream for cross-checking strategy timing. It does not call OKX private APIs and does not place orders.",
- "",
- "## Outputs",
- "",
- "- `reports/eth-exploration/eth-nextgen-micro-signal-stream.csv`",
- "- `reports/eth-exploration/eth-nextgen-micro-selected-trades.csv`",
- "- `reports/eth-exploration/eth-nextgen-micro-signal-stream-summary.json`",
- "",
- "## Summary",
- "",
- f"- 15m rows: `{len(stream)}`",
- f"- Selected trades: `{len(trades)}`",
- f"- Active micro days: `{int(active.sum())}`",
- f"- Active nextgen days: `{int((~active).sum())}`",
- f"- Selected source counts: `{json.dumps(source_counts, sort_keys=True)}`",
- f"- Selected side counts: `{json.dumps(side_counts, sort_keys=True)}`",
- f"- Entry candles with selected trades: `{len(entry_rows)}`",
- f"- Exit candles with selected trades: `{len(exit_rows)}`",
- "",
- "## Interpretation",
- "",
- "The stream records which engine the switch rule selects on each 15m candle's UTC date. A trade is selected by the engine active on its exit date, matching the validated portfolio accounting path.",
- "",
- ]
- return "\n".join(lines)
- def main() -> int:
- active, _, _, nextgen_trades, micro_trades = load_components()
- raw_trades = sorted([*nextgen_trades, *micro_trades], key=lambda trade: (trade.entry_time, trade.exit_time))
- selected = selected_trades(active, nextgen_trades, micro_trades)
- trades = trade_frame(selected)
- stream = stream_frame(active, selected, raw_trades)
- stream_path = REPORT_DIR / "eth-nextgen-micro-signal-stream.csv"
- trades_path = REPORT_DIR / "eth-nextgen-micro-selected-trades.csv"
- summary_path = REPORT_DIR / "eth-nextgen-micro-signal-stream-summary.json"
- report_path = REPORT_DIR / "eth-nextgen-micro-signal-stream.md"
- summary = {
- "target": TARGET_NAME,
- "cost_model": COST_MODEL,
- "stream_rows": len(stream),
- "selected_trades": len(trades),
- "active_micro_days": int(active.sum()),
- "active_nextgen_days": int((~active).sum()),
- "source_counts": trades["source"].value_counts().to_dict(),
- "side_counts": trades["side"].value_counts().to_dict(),
- "selected_entry_candles": int((stream["selected_entry_count"] > 0).sum()),
- "selected_exit_candles": int((stream["selected_exit_count"] > 0).sum()),
- }
- stream.to_csv(stream_path, index=False)
- trades.to_csv(trades_path, index=False)
- summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True), encoding="utf-8")
- report_path.write_text(write_report(stream, trades, active), encoding="utf-8")
- print(report_path)
- print(json.dumps(summary, indent=2, sort_keys=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|