| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- from dataclasses import replace
- import pytest
- from okx_codex_trader.live_execution import TargetPosition
- from okx_codex_trader.models import Candle
- from scripts.run_bb_squeeze_executor import EMPTY_STATE, aligned_frame_from_candles, refresh_live_frame, run_once, save_state, signal_from_frame, strategy_name
- def candles_with_latest_breakout() -> list[Candle]:
- candles = []
- price = 100.0
- for index in range(1_001):
- if index == 1_000:
- price = 101.5
- candles.append(
- Candle(
- symbol="ETH-USDT-SWAP",
- ts=1_700_000_000_000 + (index * 900_000),
- open=price,
- high=price,
- low=price,
- close=price,
- volume=1_000.0,
- )
- )
- return candles
- def btc_up_candles(timestamps: list[int]) -> list[Candle]:
- return [
- Candle(
- symbol="BTC-USDT-SWAP",
- ts=ts,
- open=100.0 + index * 0.01,
- high=100.0 + index * 0.01,
- low=100.0 + index * 0.01,
- close=100.0 + index * 0.01,
- volume=1_000.0,
- )
- for index, ts in enumerate(timestamps)
- ]
- def btc_down_candles(timestamps: list[int]) -> list[Candle]:
- return [
- Candle(
- symbol="BTC-USDT-SWAP",
- ts=ts,
- open=200.0 - index * 0.01,
- high=200.0 - index * 0.01,
- low=200.0 - index * 0.01,
- close=200.0 - index * 0.01,
- volume=1_000.0,
- )
- for index, ts in enumerate(timestamps)
- ]
- def live_frame(candles: list[Candle]):
- return aligned_frame_from_candles(candles, btc_up_candles([candle.ts for candle in candles]))
- def test_strategy_name_matches_live_parameters() -> None:
- assert strategy_name() == (
- "bb-rr-time-l96-bw960-q0.25-sl0.01-rr3-hybrid_signal_rr-both-btc-up-"
- "vc0.006-ddnone-cd24-mxbuf0.001-mxc1-entryweekday-openexitskip-be0.008-0.001"
- )
- def test_signal_uses_latest_confirmed_candle() -> None:
- candles = candles_with_latest_breakout()
- frame = live_frame(candles)
- next_state, signal = signal_from_frame(frame, EMPTY_STATE)
- assert signal["decision_candle_ts"] == candles[-1].ts
- assert next_state.last_candle_ts == candles[-1].ts
- def test_seen_latest_candle_replays_without_order_signal() -> None:
- candles = candles_with_latest_breakout()
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, last_candle_ts=candles[-1].ts)
- _, signal = signal_from_frame(frame, state)
- assert signal["decision_candle_ts"] == candles[-1].ts
- assert signal["signal"] == "state_replay"
- def test_loop_predicate_skips_seen_decision_candle() -> None:
- candles = candles_with_latest_breakout()
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, last_candle_ts=candles[-1].ts)
- _, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "state_replay"
- def test_refresh_live_frame_fetches_recent_candles_after_initial_load() -> None:
- class Client:
- def __init__(self) -> None:
- self.limits = []
- def get_candles(self, symbol: str, bar: str, limit: int) -> list[Candle]:
- self.limits.append(limit)
- return candles_with_latest_breakout()
- def get_recent_candles(self, symbol: str, bar: str, limit: int) -> list[Candle]:
- self.limits.append(limit)
- candles = candles_with_latest_breakout()
- return [
- *candles[-19:],
- Candle(
- symbol="ETH-USDT-SWAP",
- ts=candles[-1].ts + 900_000,
- open=102.0,
- high=102.0,
- low=102.0,
- close=102.0,
- volume=1_000.0,
- ),
- ]
- client = Client()
- initial = refresh_live_frame(client, None)
- refreshed = refresh_live_frame(client, initial)
- assert client.limits == [1_200, 1_200, 20, 20]
- assert int(refreshed.iloc[-1]["ts"]) == int(initial.iloc[-1]["ts"]) + 900_000
- def middle_exit_candles(close_multiplier: float) -> list[Candle]:
- candles = []
- for index in range(1_001):
- close = 100.0
- if index == 1_000:
- close = 100.0 * close_multiplier
- candles.append(
- Candle(
- symbol="ETH-USDT-SWAP",
- ts=1_700_000_000_000 + (index * 900_000),
- open=close,
- high=close,
- low=close,
- close=close,
- volume=1_000.0,
- )
- )
- return candles
- def shift_candles(candles: list[Candle], offset_ms: int) -> list[Candle]:
- return [
- Candle(candle.symbol, candle.ts + offset_ms, candle.open, candle.high, candle.low, candle.close, candle.volume)
- for candle in candles
- ]
- def test_short_middle_exit_requires_buffer_break() -> None:
- candles = middle_exit_candles(1.0004)
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, active_side="short", entry_price=101.0, entry_candle_ts=candles[-2].ts, last_candle_ts=candles[-2].ts)
- _, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "hold"
- assert signal["target_side"] == "short"
- def test_short_middle_exit_triggers_after_buffer_break() -> None:
- candles = shift_candles(middle_exit_candles(1.002), 12 * 3_600_000)
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, active_side="short", entry_price=101.0, entry_candle_ts=candles[-2].ts, last_candle_ts=candles[-2].ts)
- _, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "exit_middle"
- assert signal["target_side"] == "flat"
- def test_us_open_middle_exit_is_skipped() -> None:
- candles = middle_exit_candles(1.001)
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, active_side="short", entry_price=101.0, entry_candle_ts=candles[-2].ts, last_candle_ts=candles[-2].ts)
- _, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "hold"
- assert signal["target_side"] == "short"
- def test_long_take_profit_triggers_exit() -> None:
- candles = middle_exit_candles(1.0)
- candles[-1] = Candle(
- symbol="ETH-USDT-SWAP",
- ts=candles[-1].ts,
- open=100.0,
- high=103.1,
- low=100.0,
- close=102.0,
- volume=1_000.0,
- )
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, active_side="long", entry_price=100.0, entry_candle_ts=candles[-2].ts, last_candle_ts=candles[-2].ts)
- _, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "exit_take_profit"
- assert signal["target_side"] == "flat"
- def test_long_breakeven_protection_updates_after_favorable_move() -> None:
- candles = middle_exit_candles(1.0)
- candles[-1] = Candle(
- symbol="ETH-USDT-SWAP",
- ts=candles[-1].ts,
- open=100.0,
- high=100.9,
- low=100.0,
- close=100.4,
- volume=1_000.0,
- )
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, active_side="long", entry_price=100.0, entry_candle_ts=candles[-2].ts, last_candle_ts=candles[-2].ts, max_favorable_move_pct=0.0)
- next_state, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "hold"
- assert next_state.max_favorable_move_pct == pytest.approx(0.009)
- def test_long_breakeven_protection_triggers_exit_after_prior_favorable_move() -> None:
- candles = middle_exit_candles(1.0)
- candles[-1] = Candle(
- symbol="ETH-USDT-SWAP",
- ts=candles[-1].ts,
- open=100.4,
- high=100.5,
- low=100.05,
- close=100.2,
- volume=1_000.0,
- )
- frame = live_frame(candles)
- state = replace(EMPTY_STATE, active_side="long", entry_price=100.0, entry_candle_ts=candles[-2].ts, last_candle_ts=candles[-2].ts, max_favorable_move_pct=0.008)
- next_state, signal = signal_from_frame(frame, state)
- assert signal["signal"] == "exit_breakeven"
- assert signal["target_side"] == "flat"
- assert next_state.active_side is None
- def test_btc_down_filter_blocks_new_breakout_entry() -> None:
- candles = candles_with_latest_breakout()
- frame = aligned_frame_from_candles(candles, btc_down_candles([candle.ts for candle in candles]))
- _, signal = signal_from_frame(frame, EMPTY_STATE)
- assert signal["signal"] == "hold"
- assert signal["target_side"] == "flat"
- def test_run_once_syncs_external_flat_without_reopening(monkeypatch, tmp_path) -> None:
- frame = live_frame(middle_exit_candles(0.999))
- state_dir = tmp_path / "state"
- previous = replace(
- EMPTY_STATE,
- active_side="short",
- entry_price=101.0,
- entry_candle_ts=int(frame.iloc[-2]["ts"]),
- last_candle_ts=int(frame.iloc[-2]["ts"]),
- middle_exit_streak=0,
- )
- save_state(state_dir / "runtime-state.json", previous)
- monkeypatch.setattr("scripts.run_bb_squeeze_executor.load_config", lambda: object())
- monkeypatch.setattr(
- "scripts.run_bb_squeeze_executor.account_current_position",
- lambda client, margin: (
- TargetPosition(side="flat", unit=0.0, known=True, reason="no open OKX position", contracts=0.0),
- {"positions": [], "instrument_meta": {"ct_val": 0.1, "lot_sz": 0.01, "min_sz": 0.01}, "mark_price": 100.0},
- ),
- )
- snapshot = run_once(
- state_dir=state_dir,
- margin_per_unit_usdt=100.0,
- max_new_margin_usdt=100.0,
- max_total_margin_usdt=200.0,
- submit_live=False,
- frame=frame,
- )
- assert snapshot["signal"]["signal"] == "external_flat_sync"
- assert snapshot["target_position"]["side"] == "flat"
- assert snapshot["rendered_orders"] == []
- assert snapshot["next_state"]["active_side"] is None
|