| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- from __future__ import annotations
- from datetime import datetime
- import pandas as pd
- from freqtrade.persistence import Trade
- from freqtrade.strategy import IStrategy
- class EthFocusedInformativeDry(IStrategy):
- INTERFACE_VERSION = 3
- timeframe = "5m"
- can_short = False
- startup_candle_count = 480
- process_only_new_candles = True
- minimal_roi = {"0": 100.0}
- stoploss = -0.02
- use_exit_signal = True
- exit_profit_only = False
- ignore_roi_if_entry_signal = False
- eth_rsi_trend_sma = 120
- eth_rsi_length = 2
- eth_rsi_threshold = 3.0
- eth_exit_rsi = 55.0
- btc_trend_sma = 480
- btc_momentum_lookback = 240
- btc_min_momentum = 0.0
- lead_lookback_15m = 8
- lead_lookback_5m = 16
- btc_return_threshold_15m = 0.018
- btc_return_threshold_5m = 0.012
- lag_gap = 0.006
- lead_lag_max_hold_bars = 8
- lead_lag_stop_loss = -0.006
- lead_lag_take_profit = 0.018
- rsi_filter_leverage = 3.0
- lead_lag_leverage = 3.0
- def informative_pairs(self) -> list[tuple[str, str]]:
- return [
- ("BTC/USDT:USDT", "5m"),
- ("BTC/USDT:USDT", "15m"),
- ("ETH/USDT:USDT", "15m"),
- ]
- def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
- dataframe["eth_return_5m"] = dataframe["close"].pct_change(self.lead_lookback_5m)
- if self.dp:
- btc_5m = self.dp.get_pair_dataframe(pair="BTC/USDT:USDT", timeframe="5m")
- btc_5m["btc_return"] = btc_5m["close"].pct_change(self.lead_lookback_5m)
- dataframe = self._merge_informative(dataframe, btc_5m, "btc", "5m")
- btc_15m = self.dp.get_pair_dataframe(pair="BTC/USDT:USDT", timeframe="15m")
- btc_15m["btc_trend"] = btc_15m["close"].rolling(self.btc_trend_sma).mean()
- btc_15m["btc_momentum"] = btc_15m["close"].pct_change(self.btc_momentum_lookback)
- btc_15m["btc_return"] = btc_15m["close"].pct_change(self.lead_lookback_15m)
- dataframe = self._merge_informative(dataframe, btc_15m, "btc", "15m")
- eth_15m = self.dp.get_pair_dataframe(pair=metadata["pair"], timeframe="15m")
- eth_15m["eth_trend"] = eth_15m["close"].rolling(self.eth_rsi_trend_sma).mean()
- eth_15m["eth_rsi2"] = self._rsi(eth_15m["close"], self.eth_rsi_length)
- eth_15m["eth_return"] = eth_15m["close"].pct_change(self.lead_lookback_15m)
- dataframe = self._merge_informative(dataframe, eth_15m, "eth", "15m")
- return dataframe
- def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
- rsi_filter = (
- (dataframe["eth_close_15m"] > dataframe["eth_trend_15m"])
- & (dataframe["eth_rsi2_15m"] <= self.eth_rsi_threshold)
- & (dataframe["btc_close_15m"] > dataframe["btc_trend_15m"])
- & (dataframe["btc_momentum_15m"] >= self.btc_min_momentum)
- )
- lead_lag_15m = (
- (dataframe["btc_return_15m"] >= self.btc_return_threshold_15m)
- & ((dataframe["btc_return_15m"] - dataframe["eth_return_15m"]) >= self.lag_gap)
- )
- lead_lag_5m = (
- (dataframe["btc_return_5m"] >= self.btc_return_threshold_5m)
- & ((dataframe["btc_return_5m"] - dataframe["eth_return_5m"]) >= self.lag_gap)
- )
- dataframe.loc[rsi_filter, ["enter_long", "enter_tag"]] = (1, "eth_btc_rsi_filter_15m")
- dataframe.loc[lead_lag_15m, ["enter_long", "enter_tag"]] = (1, "btc_lead_eth_lag_15m")
- dataframe.loc[lead_lag_5m, ["enter_long", "enter_tag"]] = (1, "btc_lead_eth_lag_5m")
- return dataframe
- def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
- dataframe.loc[
- (dataframe["eth_rsi2_15m"] >= self.eth_exit_rsi)
- | (dataframe["btc_close_15m"] <= dataframe["btc_trend_15m"]),
- ["exit_long", "exit_tag"],
- ] = (1, "rsi_or_btc_trend_exit")
- return dataframe
- def custom_exit(
- self,
- pair: str,
- trade: Trade,
- current_time: datetime,
- current_rate: float,
- current_profit: float,
- **kwargs,
- ) -> str | None:
- if trade.enter_tag not in {"btc_lead_eth_lag_15m", "btc_lead_eth_lag_5m"}:
- return None
- held_bars = int((current_time - trade.open_date_utc).total_seconds() // (5 * 60))
- if current_profit <= self.lead_lag_stop_loss:
- return "lead_lag_stop"
- if current_profit >= self.lead_lag_take_profit:
- return "lead_lag_take_profit"
- if held_bars >= self.lead_lag_max_hold_bars:
- return "lead_lag_max_hold"
- return None
- def leverage(
- self,
- pair: str,
- current_time: datetime,
- current_rate: float,
- proposed_leverage: float,
- max_leverage: float,
- entry_tag: str | None,
- side: str,
- **kwargs,
- ) -> float:
- if entry_tag in {"btc_lead_eth_lag_15m", "btc_lead_eth_lag_5m"}:
- return min(self.lead_lag_leverage, max_leverage)
- return min(self.rsi_filter_leverage, max_leverage)
- @staticmethod
- def _merge_informative(
- dataframe: pd.DataFrame,
- informative: pd.DataFrame,
- prefix: str,
- timeframe: str,
- ) -> pd.DataFrame:
- minutes = {"5m": 5, "15m": 15}[timeframe]
- informative = informative.copy()
- informative["merge_date"] = informative["date"] + pd.to_timedelta(minutes, unit="m")
- columns = ["merge_date", "open", "high", "low", "close", "volume"]
- columns += [column for column in informative.columns if column.startswith(f"{prefix}_")]
- informative = informative[columns].rename(
- columns={
- column: f"{prefix}_{column}_{timeframe}"
- for column in columns
- if column != "merge_date" and not column.startswith(f"{prefix}_")
- }
- )
- informative = informative.rename(
- columns={
- column: f"{column}_{timeframe}"
- for column in informative.columns
- if column.startswith(f"{prefix}_") and not column.endswith(f"_{timeframe}")
- }
- )
- merged = pd.merge_asof(
- dataframe.sort_values("date"),
- informative.sort_values("merge_date"),
- left_on="date",
- right_on="merge_date",
- direction="backward",
- ).ffill()
- return merged.drop(columns=[column for column in merged.columns if column.startswith("merge_date")])
- @staticmethod
- def _rsi(close: pd.Series, length: int) -> pd.Series:
- deltas = close.diff()
- gains = deltas.clip(lower=0.0)
- losses = -deltas.clip(upper=0.0)
- values = [float("nan")] * len(close)
- if len(close) <= length:
- return pd.Series(values, index=close.index)
- average_gain = float(gains.iloc[1 : length + 1].mean())
- average_loss = float(losses.iloc[1 : length + 1].mean())
- for index in range(length, len(close)):
- if index > length:
- average_gain = ((average_gain * (length - 1)) + float(gains.iloc[index])) / length
- average_loss = ((average_loss * (length - 1)) + float(losses.iloc[index])) / length
- if pd.isna(average_gain) or pd.isna(average_loss):
- continue
- if average_loss == 0.0:
- values[index] = 100.0 if average_gain > 0.0 else 50.0
- continue
- relative_strength = average_gain / average_loss
- values[index] = 100.0 - (100.0 / (1.0 + relative_strength))
- return pd.Series(values, index=close.index)
|