diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 416b4646f..55363cca1 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -33,7 +33,8 @@ HYPEROPT_LOSS_BUILTIN = ['ShortTradeDurHyperOptLoss', 'OnlyProfitHyperOptLoss', AVAILABLE_PAIRLISTS = ['StaticPairList', 'VolumePairList', 'AgeFilter', 'OffsetFilter', 'PerformanceFilter', 'PrecisionFilter', 'PriceFilter', 'RangeStabilityFilter', - 'ShuffleFilter', 'SpreadFilter', 'VolatilityFilter'] + 'ShuffleFilter', 'SpreadFilter', 'VolatilityFilter', + 'ExternalPairList'] AVAILABLE_PROTECTIONS = ['CooldownPeriod', 'LowProfitPairs', 'MaxDrawdown', 'StoplossGuard'] AVAILABLE_DATAHANDLERS = ['json', 'jsongz', 'hdf5'] BACKTEST_BREAKDOWNS = ['day', 'week', 'month'] diff --git a/freqtrade/enums/replicate.py b/freqtrade/enums/replicate.py index 501d119f3..73be996c0 100644 --- a/freqtrade/enums/replicate.py +++ b/freqtrade/enums/replicate.py @@ -7,4 +7,4 @@ class ReplicateModeType(str, Enum): class LeaderMessageType(str, Enum): - whitelist = "whitelist" + pairlist = "pairlist" diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index ac6a998c5..b2ec1448e 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -17,8 +17,8 @@ from freqtrade.constants import BuySell, LongShort from freqtrade.data.converter import order_book_to_dataframe from freqtrade.data.dataprovider import DataProvider from freqtrade.edge import Edge -from freqtrade.enums import (ExitCheckTuple, ExitType, RPCMessageType, RunMode, SignalDirection, - State, TradingMode) +from freqtrade.enums import (ExitCheckTuple, ExitType, LeaderMessageType, RPCMessageType, RunMode, + SignalDirection, State, TradingMode) from freqtrade.exceptions import (DependencyException, ExchangeError, InsufficientFundsError, InvalidOrderException, PricingError) from freqtrade.exchange import timeframe_to_minutes, timeframe_to_seconds @@ -257,6 +257,22 @@ class FreqtradeBot(LoggingMixin): self.pairlists.refresh_pairlist() _whitelist = self.pairlists.whitelist + # If replicate leader, broadcast whitelist data + # Should we broadcast before trade pairs are added? What if + # the follower doesn't have trades with those pairs. They would be added for + # no reason. + + # Or should this class be made available to the PairListManager and ran + # when filter_pairlist is called? + if self.replicate_controller: + if self.replicate_controller.is_leader(): + self.replicate_controller.send_message( + { + "data_type": LeaderMessageType.pairlist, + "data": _whitelist + } + ) + # Calculating Edge positioning if self.edge: self.edge.calculate(_whitelist) @@ -267,16 +283,6 @@ class FreqtradeBot(LoggingMixin): # It ensures that candle (OHLCV) data are downloaded for open trades as well _whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist]) - # If replicate leader, broadcast whitelist data - if self.replicate_controller: - if self.replicate_controller.is_leader(): - self.replicate_controller.send_message( - { - "data_type": "whitelist", - "data": _whitelist - } - ) - return _whitelist def get_free_open_trades(self) -> int: diff --git a/freqtrade/plugins/pairlist/ExternalPairList.py b/freqtrade/plugins/pairlist/ExternalPairList.py index 832c3d5eb..82fc12ff9 100644 --- a/freqtrade/plugins/pairlist/ExternalPairList.py +++ b/freqtrade/plugins/pairlist/ExternalPairList.py @@ -4,6 +4,7 @@ External Pair List provider Provides pair list from Leader data """ import logging +from threading import Event from typing import Any, Dict, List from freqtrade.plugins.pairlist.IPairList import IPairList @@ -13,16 +14,41 @@ logger = logging.getLogger(__name__) class ExternalPairList(IPairList): + """ + PairList plugin for use with replicate follower mode. + Will use pairs given from leader data. + + Usage: + "pairlists": [ + { + "method": "ExternalPairList", + "number_assets": 5, # We can limit the amount of pairs to use from leader + } + ], + """ def __init__(self, exchange, pairlistmanager, config: Dict[str, Any], pairlistconfig: Dict[str, Any], pairlist_pos: int) -> None: super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos) - self._num_assets = self._pairlistconfig.get('num_assets') - self._allow_inactive = self._pairlistconfig.get('allow_inactive', False) + # Not sure how to enforce ExternalPairList as the only PairList + + self._num_assets = self._pairlistconfig.get('number_assets') self._leader_pairs: List[str] = [] + self._has_data = Event() + + def _clamped_pairlist(self): + """ + Return the self._leader_pairs pairlist limited to the maximum set num_assets + or the length of it. + """ + length = len(self._leader_pairs) + if self._num_assets: + return self._leader_pairs[:min(length, self._num_assets)] + else: + return self._leader_pairs @property def needstickers(self) -> bool: @@ -40,13 +66,40 @@ class ExternalPairList(IPairList): """ return f"{self.name}" + def add_pairlist_data(self, pairlist: List[str]): + """ + Add pairs from Leader + """ + + # If some pairs were removed on Leader, remove them here + for pair in self._leader_pairs: + if pair not in pairlist: + logger.debug(f"Leader removed pair: {pair}") + self._leader_pairs.remove(pair) + + # Only add new pairs + seen = set(self._leader_pairs) + for pair in pairlist: + if pair in seen: + logger.debug(f"Encountered already existing pair {pair}") + continue + self._leader_pairs.append(pair) + + if not self._has_data.is_set(): + self._has_data.set() + def gen_pairlist(self, tickers: Dict) -> List[str]: """ Generate the pairlist :param tickers: Tickers (from exchange.get_tickers()). May be cached. :return: List of pairs """ - pass + if not self._has_data.is_set(): + logger.info("Waiting on pairlists from Leaders...") + self._has_data.wait() + logger.info("Pairlist data received...") + + return self._clamped_pairlist() def filter_pairlist(self, pairlist: List[str], tickers: Dict) -> List[str]: """ @@ -56,4 +109,4 @@ class ExternalPairList(IPairList): :param tickers: Tickers (from exchange.get_tickers()). May be cached. :return: new whitelist """ - pass + return self._clamped_pairlist() diff --git a/freqtrade/rpc/replicate/__init__.py b/freqtrade/rpc/replicate/__init__.py index 80ac0836c..fd718197e 100644 --- a/freqtrade/rpc/replicate/__init__.py +++ b/freqtrade/rpc/replicate/__init__.py @@ -40,6 +40,7 @@ class ReplicateController(RPCHandler): """ super().__init__(rpc, config) + self.freqtrade = rpc._freqtrade self.api_server = api_server if not self.api_server: @@ -122,7 +123,6 @@ class ReplicateController(RPCHandler): raise RuntimeError("Loop must be started before any function can" " be submitted") - logger.debug(f"Running coroutine {repr(coroutine)} in loop") try: return asyncio.run_coroutine_threadsafe(coroutine, self._loop) except Exception as e: @@ -185,6 +185,8 @@ class ReplicateController(RPCHandler): def send_message(self, msg: Dict[str, Any]) -> None: """ Push message through """ + # We should probably do some type of schema validation here + if self.channel_manager.has_channels(): self._send_message(msg) else: @@ -199,7 +201,7 @@ class ReplicateController(RPCHandler): if self._queue: queue = self._queue.sync_q - queue.put(msg) + queue.put(msg) # This will block if the queue is full else: logger.warning("Can not send data, leader loop has not started yet!") @@ -235,7 +237,7 @@ class ReplicateController(RPCHandler): try: await self._broadcast_queue_data() except Exception as e: - logger.error("Exception occurred in leader loop: ") + logger.error("Exception occurred in Leader loop: ") logger.exception(e) async def _broadcast_queue_data(self): @@ -342,10 +344,14 @@ class ReplicateController(RPCHandler): logger.info("Starting rpc.replicate in Follower mode") try: - await self._connect_to_leaders() + results = await self._connect_to_leaders() except Exception as e: - logger.error("Exception occurred in follower loop: ") + logger.error("Exception occurred in Follower loop: ") logger.exception(e) + finally: + for result in results: + if isinstance(result, Exception): + logger.debug(f"Exception in Follower loop: {result}") async def _connect_to_leaders(self): """ @@ -372,7 +378,7 @@ class ReplicateController(RPCHandler): websocket_url = f"{url}?token={token}" - logger.info(f"Attempting to connect to leader at: {url}") + logger.info(f"Attempting to connect to Leader at: {url}") # TODO: limit the amount of connection retries while True: try: @@ -415,9 +421,12 @@ class ReplicateController(RPCHandler): except asyncio.CancelledError: pass - async def _handle_leader_message(self, message): + async def _handle_leader_message(self, message: Dict[str, Any]): type = message.get('data_type') data = message.get('data') - if type == LeaderMessageType.whitelist: - logger.info(f"Received whitelist from Leader: {data}") + logger.info(f"Received message from Leader: {type} - {data}") + + if type == LeaderMessageType.pairlist: + # Add the data to the ExternalPairlist + self.freqtrade.pairlists._pairlist_handlers[0].add_pairlist_data(data)