ExternalPairList plugin

This commit is contained in:
Timothy Pogue 2022-08-19 22:40:01 -06:00
parent 6834db11f3
commit 739b68f8fd
5 changed files with 96 additions and 27 deletions

View File

@ -33,7 +33,8 @@ HYPEROPT_LOSS_BUILTIN = ['ShortTradeDurHyperOptLoss', 'OnlyProfitHyperOptLoss',
AVAILABLE_PAIRLISTS = ['StaticPairList', 'VolumePairList', AVAILABLE_PAIRLISTS = ['StaticPairList', 'VolumePairList',
'AgeFilter', 'OffsetFilter', 'PerformanceFilter', 'AgeFilter', 'OffsetFilter', 'PerformanceFilter',
'PrecisionFilter', 'PriceFilter', 'RangeStabilityFilter', 'PrecisionFilter', 'PriceFilter', 'RangeStabilityFilter',
'ShuffleFilter', 'SpreadFilter', 'VolatilityFilter'] 'ShuffleFilter', 'SpreadFilter', 'VolatilityFilter',
'ExternalPairList']
AVAILABLE_PROTECTIONS = ['CooldownPeriod', 'LowProfitPairs', 'MaxDrawdown', 'StoplossGuard'] AVAILABLE_PROTECTIONS = ['CooldownPeriod', 'LowProfitPairs', 'MaxDrawdown', 'StoplossGuard']
AVAILABLE_DATAHANDLERS = ['json', 'jsongz', 'hdf5'] AVAILABLE_DATAHANDLERS = ['json', 'jsongz', 'hdf5']
BACKTEST_BREAKDOWNS = ['day', 'week', 'month'] BACKTEST_BREAKDOWNS = ['day', 'week', 'month']

View File

@ -7,4 +7,4 @@ class ReplicateModeType(str, Enum):
class LeaderMessageType(str, Enum): class LeaderMessageType(str, Enum):
whitelist = "whitelist" pairlist = "pairlist"

View File

@ -17,8 +17,8 @@ from freqtrade.constants import BuySell, LongShort
from freqtrade.data.converter import order_book_to_dataframe from freqtrade.data.converter import order_book_to_dataframe
from freqtrade.data.dataprovider import DataProvider from freqtrade.data.dataprovider import DataProvider
from freqtrade.edge import Edge from freqtrade.edge import Edge
from freqtrade.enums import (ExitCheckTuple, ExitType, RPCMessageType, RunMode, SignalDirection, from freqtrade.enums import (ExitCheckTuple, ExitType, LeaderMessageType, RPCMessageType, RunMode,
State, TradingMode) SignalDirection, State, TradingMode)
from freqtrade.exceptions import (DependencyException, ExchangeError, InsufficientFundsError, from freqtrade.exceptions import (DependencyException, ExchangeError, InsufficientFundsError,
InvalidOrderException, PricingError) InvalidOrderException, PricingError)
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_seconds from freqtrade.exchange import timeframe_to_minutes, timeframe_to_seconds
@ -257,6 +257,22 @@ class FreqtradeBot(LoggingMixin):
self.pairlists.refresh_pairlist() self.pairlists.refresh_pairlist()
_whitelist = self.pairlists.whitelist _whitelist = self.pairlists.whitelist
# If replicate leader, broadcast whitelist data
# Should we broadcast before trade pairs are added? What if
# the follower doesn't have trades with those pairs. They would be added for
# no reason.
# Or should this class be made available to the PairListManager and ran
# when filter_pairlist is called?
if self.replicate_controller:
if self.replicate_controller.is_leader():
self.replicate_controller.send_message(
{
"data_type": LeaderMessageType.pairlist,
"data": _whitelist
}
)
# Calculating Edge positioning # Calculating Edge positioning
if self.edge: if self.edge:
self.edge.calculate(_whitelist) self.edge.calculate(_whitelist)
@ -267,16 +283,6 @@ class FreqtradeBot(LoggingMixin):
# It ensures that candle (OHLCV) data are downloaded for open trades as well # It ensures that candle (OHLCV) data are downloaded for open trades as well
_whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist]) _whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist])
# If replicate leader, broadcast whitelist data
if self.replicate_controller:
if self.replicate_controller.is_leader():
self.replicate_controller.send_message(
{
"data_type": "whitelist",
"data": _whitelist
}
)
return _whitelist return _whitelist
def get_free_open_trades(self) -> int: def get_free_open_trades(self) -> int:

View File

@ -4,6 +4,7 @@ External Pair List provider
Provides pair list from Leader data Provides pair list from Leader data
""" """
import logging import logging
from threading import Event
from typing import Any, Dict, List from typing import Any, Dict, List
from freqtrade.plugins.pairlist.IPairList import IPairList from freqtrade.plugins.pairlist.IPairList import IPairList
@ -13,16 +14,41 @@ logger = logging.getLogger(__name__)
class ExternalPairList(IPairList): class ExternalPairList(IPairList):
"""
PairList plugin for use with replicate follower mode.
Will use pairs given from leader data.
Usage:
"pairlists": [
{
"method": "ExternalPairList",
"number_assets": 5, # We can limit the amount of pairs to use from leader
}
],
"""
def __init__(self, exchange, pairlistmanager, def __init__(self, exchange, pairlistmanager,
config: Dict[str, Any], pairlistconfig: Dict[str, Any], config: Dict[str, Any], pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None: pairlist_pos: int) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos) super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._num_assets = self._pairlistconfig.get('num_assets') # Not sure how to enforce ExternalPairList as the only PairList
self._allow_inactive = self._pairlistconfig.get('allow_inactive', False)
self._num_assets = self._pairlistconfig.get('number_assets')
self._leader_pairs: List[str] = [] self._leader_pairs: List[str] = []
self._has_data = Event()
def _clamped_pairlist(self):
"""
Return the self._leader_pairs pairlist limited to the maximum set num_assets
or the length of it.
"""
length = len(self._leader_pairs)
if self._num_assets:
return self._leader_pairs[:min(length, self._num_assets)]
else:
return self._leader_pairs
@property @property
def needstickers(self) -> bool: def needstickers(self) -> bool:
@ -40,13 +66,40 @@ class ExternalPairList(IPairList):
""" """
return f"{self.name}" return f"{self.name}"
def add_pairlist_data(self, pairlist: List[str]):
"""
Add pairs from Leader
"""
# If some pairs were removed on Leader, remove them here
for pair in self._leader_pairs:
if pair not in pairlist:
logger.debug(f"Leader removed pair: {pair}")
self._leader_pairs.remove(pair)
# Only add new pairs
seen = set(self._leader_pairs)
for pair in pairlist:
if pair in seen:
logger.debug(f"Encountered already existing pair {pair}")
continue
self._leader_pairs.append(pair)
if not self._has_data.is_set():
self._has_data.set()
def gen_pairlist(self, tickers: Dict) -> List[str]: def gen_pairlist(self, tickers: Dict) -> List[str]:
""" """
Generate the pairlist Generate the pairlist
:param tickers: Tickers (from exchange.get_tickers()). May be cached. :param tickers: Tickers (from exchange.get_tickers()). May be cached.
:return: List of pairs :return: List of pairs
""" """
pass if not self._has_data.is_set():
logger.info("Waiting on pairlists from Leaders...")
self._has_data.wait()
logger.info("Pairlist data received...")
return self._clamped_pairlist()
def filter_pairlist(self, pairlist: List[str], tickers: Dict) -> List[str]: def filter_pairlist(self, pairlist: List[str], tickers: Dict) -> List[str]:
""" """
@ -56,4 +109,4 @@ class ExternalPairList(IPairList):
:param tickers: Tickers (from exchange.get_tickers()). May be cached. :param tickers: Tickers (from exchange.get_tickers()). May be cached.
:return: new whitelist :return: new whitelist
""" """
pass return self._clamped_pairlist()

View File

@ -40,6 +40,7 @@ class ReplicateController(RPCHandler):
""" """
super().__init__(rpc, config) super().__init__(rpc, config)
self.freqtrade = rpc._freqtrade
self.api_server = api_server self.api_server = api_server
if not self.api_server: if not self.api_server:
@ -122,7 +123,6 @@ class ReplicateController(RPCHandler):
raise RuntimeError("Loop must be started before any function can" raise RuntimeError("Loop must be started before any function can"
" be submitted") " be submitted")
logger.debug(f"Running coroutine {repr(coroutine)} in loop")
try: try:
return asyncio.run_coroutine_threadsafe(coroutine, self._loop) return asyncio.run_coroutine_threadsafe(coroutine, self._loop)
except Exception as e: except Exception as e:
@ -185,6 +185,8 @@ class ReplicateController(RPCHandler):
def send_message(self, msg: Dict[str, Any]) -> None: def send_message(self, msg: Dict[str, Any]) -> None:
""" Push message through """ """ Push message through """
# We should probably do some type of schema validation here
if self.channel_manager.has_channels(): if self.channel_manager.has_channels():
self._send_message(msg) self._send_message(msg)
else: else:
@ -199,7 +201,7 @@ class ReplicateController(RPCHandler):
if self._queue: if self._queue:
queue = self._queue.sync_q queue = self._queue.sync_q
queue.put(msg) queue.put(msg) # This will block if the queue is full
else: else:
logger.warning("Can not send data, leader loop has not started yet!") logger.warning("Can not send data, leader loop has not started yet!")
@ -235,7 +237,7 @@ class ReplicateController(RPCHandler):
try: try:
await self._broadcast_queue_data() await self._broadcast_queue_data()
except Exception as e: except Exception as e:
logger.error("Exception occurred in leader loop: ") logger.error("Exception occurred in Leader loop: ")
logger.exception(e) logger.exception(e)
async def _broadcast_queue_data(self): async def _broadcast_queue_data(self):
@ -342,10 +344,14 @@ class ReplicateController(RPCHandler):
logger.info("Starting rpc.replicate in Follower mode") logger.info("Starting rpc.replicate in Follower mode")
try: try:
await self._connect_to_leaders() results = await self._connect_to_leaders()
except Exception as e: except Exception as e:
logger.error("Exception occurred in follower loop: ") logger.error("Exception occurred in Follower loop: ")
logger.exception(e) logger.exception(e)
finally:
for result in results:
if isinstance(result, Exception):
logger.debug(f"Exception in Follower loop: {result}")
async def _connect_to_leaders(self): async def _connect_to_leaders(self):
""" """
@ -372,7 +378,7 @@ class ReplicateController(RPCHandler):
websocket_url = f"{url}?token={token}" websocket_url = f"{url}?token={token}"
logger.info(f"Attempting to connect to leader at: {url}") logger.info(f"Attempting to connect to Leader at: {url}")
# TODO: limit the amount of connection retries # TODO: limit the amount of connection retries
while True: while True:
try: try:
@ -415,9 +421,12 @@ class ReplicateController(RPCHandler):
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
async def _handle_leader_message(self, message): async def _handle_leader_message(self, message: Dict[str, Any]):
type = message.get('data_type') type = message.get('data_type')
data = message.get('data') data = message.get('data')
if type == LeaderMessageType.whitelist: logger.info(f"Received message from Leader: {type} - {data}")
logger.info(f"Received whitelist from Leader: {data}")
if type == LeaderMessageType.pairlist:
# Add the data to the ExternalPairlist
self.freqtrade.pairlists._pairlist_handlers[0].add_pairlist_data(data)