| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- from __future__ import annotations
- import os
- from datetime import datetime
- from pathlib import Path
- import pandas as pd
- from freqtrade.strategy import IStrategy
- class EthNextgenMicroLeverageUnitsShifted(IStrategy):
- INTERFACE_VERSION = 3
- timeframe = "15m"
- can_short = True
- startup_candle_count = 0
- process_only_new_candles = True
- minimal_roi = {"0": 100.0}
- stoploss = -0.99
- use_exit_signal = True
- exit_profit_only = False
- ignore_roi_if_entry_signal = False
- def __init__(self, config: dict) -> None:
- super().__init__(config)
- self._stake_units: dict[tuple[pd.Timestamp, str], float] = {}
- def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
- trades_path = Path(
- os.environ.get(
- "ETH_NEXTGEN_MICRO_LEVERAGE_UNITS",
- "reports/eth-exploration/eth-nextgen-micro-leverage-units-trades.csv",
- )
- )
- trades = pd.read_csv(trades_path)
- trades["entry_date"] = pd.to_datetime(trades["entry_time"], utc=True) - pd.Timedelta(minutes=15)
- trades["exit_date_ts"] = pd.to_datetime(trades["exit_time"], utc=True) - pd.Timedelta(minutes=15)
- self._stake_units = {
- (row.entry_date, str(row.side)): float(row.position_unit)
- for row in trades.itertuples(index=False)
- }
- entries = trades[["entry_date", "side", "position_unit"]].rename(columns={"entry_date": "date"})
- exits = trades[["exit_date_ts"]].drop_duplicates().rename(columns={"exit_date_ts": "date"})
- entries["enter_long_unit"] = 0.0
- entries["enter_short_unit"] = 0.0
- entries.loc[entries["side"] == "long", "enter_long_unit"] = entries["position_unit"]
- entries.loc[entries["side"] == "short", "enter_short_unit"] = entries["position_unit"]
- entries = entries.groupby("date", as_index=False).agg(
- enter_long_unit=("enter_long_unit", "max"),
- enter_short_unit=("enter_short_unit", "max"),
- )
- exits["exit_signal"] = 1
- merged = dataframe.merge(entries, on="date", how="left").merge(exits, on="date", how="left")
- merged[["enter_long_unit", "enter_short_unit", "exit_signal"]] = merged[
- ["enter_long_unit", "enter_short_unit", "exit_signal"]
- ].fillna(0.0)
- return merged
- def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
- dataframe.loc[dataframe["enter_long_unit"] > 0.0, ["enter_long", "enter_tag"]] = (1, "shifted_unit_long")
- dataframe.loc[dataframe["enter_short_unit"] > 0.0, ["enter_short", "enter_tag"]] = (1, "shifted_unit_short")
- return dataframe
- def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame:
- dataframe.loc[dataframe["exit_signal"] > 0.0, ["exit_long", "exit_tag"]] = (1, "shifted_unit_exit")
- dataframe.loc[dataframe["exit_signal"] > 0.0, ["exit_short", "exit_tag"]] = (1, "shifted_unit_exit")
- return dataframe
- def custom_stake_amount(
- self,
- pair: str,
- current_time: datetime,
- current_rate: float,
- proposed_stake: float,
- min_stake: float | None,
- max_stake: float,
- leverage: float,
- entry_tag: str | None,
- side: str,
- **kwargs,
- ) -> float:
- timestamp = pd.Timestamp(current_time)
- timestamp = timestamp.tz_localize("UTC") if timestamp.tzinfo is None else timestamp.tz_convert("UTC")
- unit = self._stake_units.get((timestamp, side), 1.0)
- stake = max_stake * unit
- if min_stake is not None:
- stake = max(stake, min_stake)
- return min(stake, max_stake)
- def leverage(
- self,
- pair: str,
- current_time: datetime,
- current_rate: float,
- proposed_leverage: float,
- max_leverage: float,
- entry_tag: str | None,
- side: str,
- **kwargs,
- ) -> float:
- return min(3.0, max_leverage)
|