Refactoring, minor improvements, data provider improvements

This commit is contained in:
Timothy Pogue 2022-08-26 23:40:13 -06:00
parent a998d6d773
commit 2b5f067877
14 changed files with 218 additions and 98 deletions

View File

@ -62,6 +62,7 @@ TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
FOLLOWER_MODE_OPTIONS = ['follower', 'leader']
WAIT_DATA_POLICY_OPTIONS = ['none', 'first', 'all']
ENV_VAR_PREFIX = 'FREQTRADE__'
@ -509,6 +510,11 @@ CONF_SCHEMA = {
'follower_reply_timeout': {'type': 'integer'},
'follower_sleep_time': {'type': 'integer'},
'follower_ping_timeout': {'type': 'integer'},
'wait_data_policy': {
'type': 'string',
'enum': WAIT_DATA_POLICY_OPTIONS
},
'remove_signals_analyzed_df': {'type': 'boolean', 'default': False}
},
'required': ['mode']
},

View File

@ -15,7 +15,7 @@ from pandas import DataFrame
from freqtrade.configuration import TimeRange
from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe
from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RunMode
from freqtrade.enums import CandleType, RunMode, WaitDataPolicy
from freqtrade.exceptions import ExchangeError, OperationalException
from freqtrade.exchange import Exchange, timeframe_to_seconds
from freqtrade.util import PeriodicCache
@ -29,7 +29,12 @@ MAX_DATAFRAME_CANDLES = 1000
class DataProvider:
def __init__(self, config: dict, exchange: Optional[Exchange], pairlists=None) -> None:
def __init__(
self,
config: dict,
exchange: Optional[Exchange],
pairlists=None
) -> None:
self._config = config
self._exchange = exchange
self._pairlists = pairlists
@ -37,12 +42,18 @@ class DataProvider:
self.__slice_index: Optional[int] = None
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
self.__external_pairs_df: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
self.__external_pairs_event: Dict[str, Event] = {}
self.__external_pairs_event: Dict[PairWithTimeframe, Tuple[int, Event]] = {}
self._msg_queue: deque = deque()
self.__msg_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._config.get('timeframe', '1h')))
self._num_sources = len(self._config.get('external_signal', {}).get('leader_list', []))
self._wait_data_policy = self._config.get('external_signal', {}).get(
'wait_data_policy', WaitDataPolicy.all)
self._wait_data_timeout = self._config.get(
'external_signal', {}).get('wait_data_timeout', 5)
def _set_dataframe_max_index(self, limit_index: int):
"""
Limit analyzed dataframe to max specified index.
@ -75,57 +86,88 @@ class DataProvider:
pair: str,
timeframe: str,
dataframe: DataFrame,
candle_type: CandleType
candle_type: CandleType,
) -> None:
"""
Add the DataFrame to the __external_pairs_df. If a pair event exists,
set it to release the main thread from waiting.
Add the pair data to this class from an external source.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
pair_key = (pair, timeframe, candle_type)
# Delete stale data
if pair_key in self.__external_pairs_df:
del self.__external_pairs_df[pair_key]
# For multiple leaders, if the data already exists, we'd merge
self.__external_pairs_df[pair_key] = (dataframe, datetime.now(timezone.utc))
pair_event = self.__external_pairs_event.get(pair)
if pair_event:
logger.debug(f"Leader data for pair {pair_key} has been added")
pair_event.set()
self._set_data_event(pair_key)
def get_external_df(
self,
pair: str,
timeframe: str,
candle_type: CandleType,
wait: bool = True
candle_type: CandleType
) -> DataFrame:
"""
If the pair exists in __external_pairs_df, return it.
If it doesn't, and wait is False, then return an empty df with the columns filled.
If it doesn't, and wait is True (default) create a new threading Event
in __external_pairs_event and wait on it.
Get the pair data from the external sources. Will wait if the policy is
set to, and data is not available.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
pair_key = (pair, timeframe, candle_type)
if pair_key not in self.__external_pairs_df:
if wait:
pair_event = Event()
self.__external_pairs_event[pair] = pair_event
self._wait_on_data(pair_key)
logger.debug(f"Waiting on Leader data for: {pair_key}")
self.__external_pairs_event[pair].wait(timeout=5)
if pair_key not in self.__external_pairs_df:
# Return empty dataframe but with expected columns merged and filled with NaN
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
else:
# Return empty dataframe but with expected columns merged and filled with NaN
if pair_key not in self.__external_pairs_df:
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
return self.__external_pairs_df[pair_key]
def _set_data_event(self, key: PairWithTimeframe):
"""
Depending on the WaitDataPolicy, if an event exists for this PairWithTimeframe
then set the event to release main thread from waiting.
:param key: PairWithTimeframe
"""
pair_event = self.__external_pairs_event.get(key)
if pair_event:
num_concat, event = pair_event
self.__external_pairs_event[key] = (num_concat + 1, event)
if self._wait_data_policy == WaitDataPolicy.one:
logger.debug("Setting Data as policy is One")
event.set()
elif self._wait_data_policy == WaitDataPolicy.all and num_concat == self._num_sources:
logger.debug("Setting Data as policy is all, and is complete")
event.set()
del self.__external_pairs_event[key]
def _wait_on_data(self, key: PairWithTimeframe):
"""
Depending on the WaitDataPolicy, we will create and wait on an event until
set that determines the full amount of data is available
:param key: PairWithTimeframe
"""
if self._wait_data_policy is not WaitDataPolicy.none:
pair, timeframe, candle_type = key
pair_event = Event()
self.__external_pairs_event[key] = (0, pair_event)
timeout = self._wait_data_timeout \
if self._wait_data_policy is not WaitDataPolicy.all else 0
timeout_str = f"for {timeout} seconds" if timeout > 0 else "indefinitely"
logger.debug(f"Waiting for external data on {pair} for {timeout_str}")
pair_event.wait(timeout=timeout)
def add_pairlisthandler(self, pairlists) -> None:
"""
Allow adding pairlisthandler after initialization

View File

@ -3,7 +3,7 @@ from freqtrade.enums.backteststate import BacktestState
from freqtrade.enums.candletype import CandleType
from freqtrade.enums.exitchecktuple import ExitCheckTuple
from freqtrade.enums.exittype import ExitType
from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType
from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy
from freqtrade.enums.marginmode import MarginMode
from freqtrade.enums.ordertypevalue import OrderTypeValues
from freqtrade.enums.rpcmessagetype import RPCMessageType

View File

@ -7,5 +7,12 @@ class ExternalSignalModeType(str, Enum):
class LeaderMessageType(str, Enum):
default = "default"
pairlist = "pairlist"
analyzed_df = "analyzed_df"
class WaitDataPolicy(str, Enum):
none = "none"
one = "one"
all = "all"

View File

@ -281,9 +281,6 @@ class FreqtradeBot(LoggingMixin):
# If external signal leader, broadcast whitelist data
# Should we broadcast before trade pairs are added?
# Or should this class be made available to the PairListManager and ran
# when filter_pairlist is called?
if self.external_signal_controller:
if self.external_signal_controller.is_leader():
self.rpc.emit_data({

View File

@ -14,6 +14,7 @@ import pandas
import rapidjson
from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN
from freqtrade.enums.signaltype import SignalTagType, SignalType
logger = logging.getLogger(__name__)
@ -271,3 +272,19 @@ def json_to_dataframe(data: str) -> pandas.DataFrame:
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True)
return dataframe
def remove_entry_exit_signals(dataframe: pandas.DataFrame):
"""
Remove Entry and Exit signals from a DataFrame
:param dataframe: The DataFrame to remove signals from
"""
dataframe[SignalType.ENTER_LONG.value] = 0
dataframe[SignalType.EXIT_LONG.value] = 0
dataframe[SignalType.ENTER_SHORT.value] = 0
dataframe[SignalType.EXIT_SHORT.value] = 0
dataframe[SignalTagType.ENTER_TAG.value] = None
dataframe[SignalTagType.EXIT_TAG.value] = None
return dataframe

View File

@ -74,6 +74,7 @@ class ApiServer(RPCHandler):
default_response_class=FTJSONResponse,
)
self.configure_app(self.app, self._config)
self.start_api()
def add_rpc_handler(self, rpc: RPC):
"""

View File

@ -1,4 +1,5 @@
import logging
from threading import RLock
from typing import Type
from freqtrade.rpc.external_signal.proxy import WebSocketProxy
@ -63,6 +64,7 @@ class WebSocketChannel:
class ChannelManager:
def __init__(self):
self.channels = dict()
self._lock = RLock() # Re-entrant Lock
async def on_connect(self, websocket: WebSocketType):
"""
@ -78,7 +80,9 @@ class ChannelManager:
return
ws_channel = WebSocketChannel(websocket)
self.channels[websocket] = ws_channel
with self._lock:
self.channels[websocket] = ws_channel
return ws_channel
@ -88,21 +92,26 @@ class ChannelManager:
:param websocket: The WebSocket objet attached to the Channel
"""
if websocket in self.channels.keys():
channel = self.channels[websocket]
with self._lock:
channel = self.channels.get(websocket)
if channel:
logger.debug(f"Disconnecting channel - {channel}")
logger.debug(f"Disconnecting channel - {channel}")
if not channel.is_closed():
await channel.close()
if not channel.is_closed():
await channel.close()
del self.channels[websocket]
del self.channels[websocket]
async def disconnect_all(self):
"""
Disconnect all Channels
"""
for websocket in self.channels.keys():
await self.on_disconnect(websocket)
with self._lock:
for websocket, channel in self.channels.items():
if not channel.is_closed():
await channel.close()
self.channels = dict()
async def broadcast(self, data):
"""
@ -110,12 +119,13 @@ class ChannelManager:
:param data: The data to send
"""
for websocket, channel in self.channels.items():
try:
await channel.send(data)
except RuntimeError:
# Handle cannot send after close cases
await self.on_disconnect(websocket)
with self._lock:
for websocket, channel in self.channels.items():
try:
await channel.send(data)
except RuntimeError:
# Handle cannot send after close cases
await self.on_disconnect(websocket)
async def send_direct(self, channel, data):
"""

View File

@ -6,7 +6,7 @@ import logging
import secrets
import socket
from threading import Thread
from typing import Any, Coroutine, Dict, Union
from typing import Any, Callable, Coroutine, Dict, Union
import websockets
from fastapi import Depends
@ -56,8 +56,13 @@ class ExternalSignalController(RPCHandler):
self._main_task = None
self._sub_tasks = None
self.channel_manager = ChannelManager()
self._message_handlers = {
LeaderMessageType.pairlist: self._rpc._handle_pairlist_message,
LeaderMessageType.analyzed_df: self._rpc._handle_analyzed_df_message,
LeaderMessageType.default: self._rpc._handle_default_message
}
self.channel_manager = ChannelManager()
self.external_signal_config = config.get('external_signal', {})
# What the config should look like
@ -89,6 +94,8 @@ class ExternalSignalController(RPCHandler):
self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2)
self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5)
# Validate external_signal_config here?
if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0:
raise ValueError("You must specify at least 1 leader in follower mode.")
@ -99,7 +106,6 @@ class ExternalSignalController(RPCHandler):
default_api_key = secrets.token_urlsafe(16)
self.secret_api_key = self.external_signal_config.get('api_token', default_api_key)
self.start_threaded_loop()
self.start()
def is_leader(self):
@ -114,6 +120,12 @@ class ExternalSignalController(RPCHandler):
"""
return self.external_signal_config.get('enabled', False)
def num_leaders(self):
"""
The number of leaders we should be connected to
"""
return len(self.leaders_list)
def start_threaded_loop(self):
"""
Start the main internal loop in another thread to run coroutines
@ -144,6 +156,7 @@ class ExternalSignalController(RPCHandler):
"""
Start the controller main loop
"""
self.start_threaded_loop()
self._main_task = self.submit_coroutine(self.main())
async def shutdown(self):
@ -242,23 +255,20 @@ class ExternalSignalController(RPCHandler):
async def send_initial_data(self, channel):
logger.info("Sending initial data through channel")
# We first send pairlist data
# We should move this to a func in the RPC object
initial_data = {
"data_type": LeaderMessageType.pairlist,
"data": self.freqtrade.pairlists.whitelist
}
data = self._rpc._initial_leader_data()
await channel.send(initial_data)
for message in data:
await channel.send(message)
async def _handle_leader_message(self, message: MessageType):
"""
Handle message received from a Leader
"""
type = message.get("data_type")
type = message.get("data_type", LeaderMessageType.default)
data = message.get("data")
self._rpc._handle_emitted_data(type, data)
handler: Callable = self._message_handlers[type]
handler(type, data)
# ----------------------------------------------------------------------

View File

@ -1,5 +1,8 @@
from pandas import DataFrame
from starlette.websockets import WebSocket, WebSocketState
from freqtrade.enums.signaltype import SignalTagType, SignalType
async def is_websocket_alive(ws: WebSocket) -> bool:
if (
@ -8,3 +11,12 @@ async def is_websocket_alive(ws: WebSocket) -> bool:
):
return True
return False
def remove_entry_exit_signals(dataframe: DataFrame):
dataframe[SignalType.ENTER_LONG.value] = 0
dataframe[SignalType.EXIT_LONG.value] = 0
dataframe[SignalType.ENTER_SHORT.value] = 0
dataframe[SignalType.EXIT_SHORT.value] = 0
dataframe[SignalTagType.ENTER_TAG.value] = None
dataframe[SignalTagType.EXIT_TAG.value] = None

View File

@ -24,7 +24,8 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, LeaderMessage
from freqtrade.exceptions import ExchangeError, PricingError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs
from freqtrade.loggers import bufferHandler
from freqtrade.misc import decimals_per_coin, json_to_dataframe, shorten_date
from freqtrade.misc import (decimals_per_coin, json_to_dataframe, remove_entry_exit_signals,
shorten_date)
from freqtrade.persistence import PairLocks, Trade
from freqtrade.persistence.models import PairLock
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
@ -1090,41 +1091,64 @@ class RPC:
'last_process_ts': int(last_p.timestamp()),
}
def _handle_emitted_data(self, type, data):
# ------------------------------ EXTERNAL SIGNALS -----------------------
def _initial_leader_data(self):
# We create a list of Messages to send to the follower on connect
data = []
# Send Pairlist data
data.append({
"data_type": LeaderMessageType.pairlist,
"data": self._freqtrade.pairlists._whitelist
})
return data
def _handle_pairlist_message(self, type, data):
"""
Handles the emitted data from the Leaders
Handles the emitted pairlists from the Leaders
:param type: The data_type of the data
:param data: The data
"""
logger.debug(f"Handling emitted data of type ({type})")
pairlist = data
if type == LeaderMessageType.pairlist:
pairlist = data
logger.debug(f"Handling Pairlist message: {pairlist}")
logger.debug(pairlist)
external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0]
external_pairlist.add_pairlist_data(pairlist)
# Add the pairlist data to the ExternalPairList object
external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0]
external_pairlist.add_pairlist_data(pairlist)
def _handle_analyzed_df_message(self, type, data):
"""
Handles the analyzed dataframes from the Leaders
elif type == LeaderMessageType.analyzed_df:
:param type: The data_type of the data
:param data: The data
"""
key, value = data["key"], data["value"]
pair, timeframe, candle_type = key
# Convert the dataframe back from json
key, value = data["key"], data["value"]
# Skip any pairs that we don't have in the pairlist?
# leader_pairlist = self._freqtrade.pairlists._whitelist
# if pair not in leader_pairlist:
# return
pair, timeframe, candle_type = key
dataframe = json_to_dataframe(value)
# Skip any pairs that we don't have in the pairlist?
# leader_pairlist = self._freqtrade.pairlists._whitelist
# if pair not in leader_pairlist:
# return
if self._config.get('external_signal', {}).get('remove_signals_analyzed_df', False):
dataframe = remove_entry_exit_signals(dataframe)
dataframe = json_to_dataframe(value)
logger.debug(f"Handling analyzed dataframe for {pair}")
logger.debug(dataframe.tail())
logger.debug(f"Received analyzed dataframe for {pair}")
logger.debug(dataframe.tail())
# Add the dataframe to the dataprovider
dataprovider = self._freqtrade.dataprovider
dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)
# Add the dataframe to the dataprovider
dataprovider = self._freqtrade.dataprovider
dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)
def _handle_default_message(self, type, data):
"""
Default leader message handler, just logs it. We should never have to
run this unless the leader sends us some weird message.
"""
logger.debug(f"Received message from Leader of type {type}: {data}")

View File

@ -45,25 +45,20 @@ class RPCManager:
if config.get('api_server', {}).get('enabled', False):
logger.info('Enabling rpc.api_server')
from freqtrade.rpc.api_server import ApiServer
# Pass replicate_rpc as param or defer starting api_server
# until we register the replicate rpc enpoint?
apiserver = ApiServer(config)
apiserver.add_rpc_handler(self._rpc)
self.registered_modules.append(apiserver)
# Enable Replicate mode
# Enable External Signals mode
# For this to be enabled, the API server must also be enabled
if config.get('external_signal', {}).get('enabled', False):
logger.info('Enabling RPC.ExternalSignalController')
from freqtrade.rpc.external_signal import ExternalSignalController
external_signal_rpc = ExternalSignalController(self._rpc, config, apiserver)
self.registered_modules.append(external_signal_rpc)
external_signals = ExternalSignalController(self._rpc, config, apiserver)
self.registered_modules.append(external_signals)
# Attach the controller to FreqTrade
freqtrade.external_signal_controller = external_signal_rpc
apiserver.start_api()
freqtrade.external_signal_controller = external_signals
def cleanup(self) -> None:
""" Stops all enabled rpc modules """

View File

@ -4,3 +4,4 @@
# Required for follower
websockets
msgpack
janus

View File

@ -52,7 +52,6 @@ def botclient(default_conf, mocker):
try:
apiserver = ApiServer(default_conf)
apiserver.add_rpc_handler(rpc)
apiserver.start_api()
yield ftbot, TestClient(apiserver.app)
# Cleanup ... ?
finally:
@ -333,7 +332,6 @@ def test_api_run(default_conf, mocker, caplog):
apiserver = ApiServer(default_conf)
apiserver.add_rpc_handler(RPC(get_patched_freqtradebot(mocker, default_conf)))
apiserver.start_api()
assert server_mock.call_count == 1
assert apiserver._config == default_conf
apiserver.start_api()