Renamed to external signals, controller class refactored

This commit is contained in:
Timothy Pogue 2022-08-24 22:42:29 -06:00
parent 592373f096
commit d474111a65
17 changed files with 203 additions and 194 deletions

View File

@ -245,7 +245,7 @@ CONF_SCHEMA = {
'exchange': {'$ref': '#/definitions/exchange'},
'edge': {'$ref': '#/definitions/edge'},
'freqai': {'$ref': '#/definitions/freqai'},
'replicate': {'$ref': '#/definitions/replicate'},
'external_signal': {'$ref': '#/definitions/external_signal'},
'experimental': {
'type': 'object',
'properties': {
@ -487,7 +487,7 @@ CONF_SCHEMA = {
},
'required': ['process_throttle_secs', 'allowed_risk']
},
'replicate': {
'external_signal': {
'type': 'object',
'properties': {
'enabled': {'type': 'boolean', 'default': False},
@ -495,14 +495,14 @@ CONF_SCHEMA = {
'type': 'string',
'enum': FOLLOWER_MODE_OPTIONS
},
'api_key': {'type': 'string', 'default': ''},
'api_token': {'type': 'string', 'default': ''},
'leaders': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'url': {'type': 'string', 'default': ''},
'token': {'type': 'string', 'default': ''},
'api_token': {'type': 'string', 'default': ''},
}
}
},

View File

@ -29,8 +29,7 @@ MAX_DATAFRAME_CANDLES = 1000
class DataProvider:
def __init__(self, config: dict, exchange: Optional[Exchange],
pairlists=None, replicate_controller=None) -> None:
def __init__(self, config: dict, exchange: Optional[Exchange], pairlists=None) -> None:
self._config = config
self._exchange = exchange
self._pairlists = pairlists
@ -99,25 +98,33 @@ class DataProvider:
self,
pair: str,
timeframe: str,
candle_type: CandleType
candle_type: CandleType,
wait: bool = True
) -> DataFrame:
"""
If the pair exists in __external_pairs_df, return it. If it doesn't,
create a new threading Event in __external_pairs_event and wait on it.
If the pair exists in __external_pairs_df, return it.
If it doesn't, and wait is False, then return an empty df with the columns filled.
If it doesn't, and wait is True (default) create a new threading Event
in __external_pairs_event and wait on it.
"""
pair_key = (pair, timeframe, candle_type)
if pair_key not in self.__external_pairs_df:
if wait:
pair_event = Event()
self.__external_pairs_event[pair] = pair_event
logger.debug(f"Waiting on Leader data for: {pair_key}")
self.__external_pairs_event[pair].wait()
self.__external_pairs_event[pair].wait(timeout=5)
if pair_key in self.__external_pairs_df:
return self.__external_pairs_df[pair_key]
# Because of the waiting mechanism, this should never return
if pair_key not in self.__external_pairs_df:
# Return empty dataframe but with expected columns merged and filled with NaN
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
else:
# Return empty dataframe but with expected columns merged and filled with NaN
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
return self.__external_pairs_df[pair_key]
def add_pairlisthandler(self, pairlists) -> None:
"""

View File

@ -3,9 +3,9 @@ from freqtrade.enums.backteststate import BacktestState
from freqtrade.enums.candletype import CandleType
from freqtrade.enums.exitchecktuple import ExitCheckTuple
from freqtrade.enums.exittype import ExitType
from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType
from freqtrade.enums.marginmode import MarginMode
from freqtrade.enums.ordertypevalue import OrderTypeValues
from freqtrade.enums.replicate import LeaderMessageType, ReplicateModeType
from freqtrade.enums.rpcmessagetype import RPCMessageType
from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode
from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType

View File

@ -1,7 +1,7 @@
from enum import Enum
class ReplicateModeType(str, Enum):
class ExternalSignalModeType(str, Enum):
leader = "leader"
follower = "follower"

View File

@ -75,7 +75,7 @@ class FreqtradeBot(LoggingMixin):
PairLocks.timeframe = self.config['timeframe']
self.replicate_controller = None
self.external_signal_controller = None
self.pairlists = PairListManager(self.exchange, self.config)
@ -93,9 +93,6 @@ class FreqtradeBot(LoggingMixin):
# Attach Wallets to strategy instance
self.strategy.wallets = self.wallets
# Attach ReplicateController to the strategy
# self.strategy.replicate_controller = self.replicate_controller
# Initializing Edge only if enabled
self.edge = Edge(self.config, self.exchange, self.strategy) if \
self.config.get('edge', {}).get('enabled', False) else None
@ -197,8 +194,8 @@ class FreqtradeBot(LoggingMixin):
strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)()
if self.replicate_controller:
if not self.replicate_controller.is_leader():
if self.external_signal_controller:
if not self.external_signal_controller.is_leader():
# Run Follower mode analyzing
leader_pairs = self.pairlists._whitelist
self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs)
@ -281,16 +278,14 @@ class FreqtradeBot(LoggingMixin):
self.pairlists.refresh_pairlist()
_whitelist = self.pairlists.whitelist
# If replicate leader, broadcast whitelist data
# Should we broadcast before trade pairs are added? What if
# the follower doesn't have trades with those pairs. They would be added for
# no reason.
# If external signal leader, broadcast whitelist data
# Should we broadcast before trade pairs are added?
# Or should this class be made available to the PairListManager and ran
# when filter_pairlist is called?
if self.replicate_controller:
if self.replicate_controller.is_leader():
if self.external_signal_controller:
if self.external_signal_controller.is_leader():
self.rpc.emit_data({
"data_type": LeaderMessageType.pairlist,
"data": _whitelist

View File

@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class ExternalPairList(IPairList):
"""
PairList plugin for use with replicate follower mode.
PairList plugin for use with external signal follower mode.
Will use pairs given from leader data.
Usage:
@ -67,6 +67,8 @@ class ExternalPairList(IPairList):
def add_pairlist_data(self, pairlist: List[str]):
"""
Add pairs from Leader
:param pairlist: List of pairs
"""
# If some pairs were removed on Leader, remove them here

View File

@ -0,0 +1,5 @@
# flake8: noqa: F401
from freqtrade.rpc.external_signal.controller import ExternalSignalController
__all__ = ('ExternalSignalController')

View File

@ -1,9 +1,9 @@
import logging
from typing import Type
from freqtrade.rpc.replicate.proxy import WebSocketProxy
from freqtrade.rpc.replicate.serializer import MsgPackWebSocketSerializer, WebSocketSerializer
from freqtrade.rpc.replicate.types import WebSocketType
from freqtrade.rpc.external_signal.proxy import WebSocketProxy
from freqtrade.rpc.external_signal.serializer import MsgPackWebSocketSerializer, WebSocketSerializer
from freqtrade.rpc.external_signal.types import WebSocketType
logger = logging.getLogger(__name__)

View File

@ -5,8 +5,7 @@ import asyncio
import logging
import secrets
import socket
import traceback
from threading import Event, Thread
from threading import Thread
from typing import Any, Coroutine, Dict, Union
import websockets
@ -14,18 +13,18 @@ from fastapi import Depends
from fastapi import WebSocket as FastAPIWebSocket
from fastapi import WebSocketDisconnect, status
from freqtrade.enums import LeaderMessageType, ReplicateModeType, RPCMessageType
from freqtrade.enums import ExternalSignalModeType, LeaderMessageType, RPCMessageType
from freqtrade.rpc import RPC, RPCHandler
from freqtrade.rpc.replicate.channel import ChannelManager
from freqtrade.rpc.replicate.thread_queue import Queue as ThreadedQueue
from freqtrade.rpc.replicate.types import MessageType
from freqtrade.rpc.replicate.utils import is_websocket_alive
from freqtrade.rpc.external_signal.channel import ChannelManager
from freqtrade.rpc.external_signal.thread_queue import Queue as ThreadedQueue
from freqtrade.rpc.external_signal.types import MessageType
from freqtrade.rpc.external_signal.utils import is_websocket_alive
logger = logging.getLogger(__name__)
class ReplicateController(RPCHandler):
class ExternalSignalController(RPCHandler):
""" This class handles all websocket communication """
def __init__(
@ -35,9 +34,10 @@ class ReplicateController(RPCHandler):
api_server: Union[Any, None] = None
) -> None:
"""
Init the ReplicateRPC class, and init the super class RPCHandler
Init the ExternalSignalController class, and init the super class RPCHandler
:param rpc: instance of RPC Helper class
:param config: Configuration object
:param api_server: The ApiServer object
:return: None
"""
super().__init__(rpc, config)
@ -46,48 +46,50 @@ class ReplicateController(RPCHandler):
self.api_server = api_server
if not self.api_server:
raise RuntimeError("The API server must be enabled for replicate to work")
raise RuntimeError("The API server must be enabled for external signals to work")
self._loop = None
self._running = False
self._thread = None
self._queue = None
self._stop_event = Event()
self._follower_tasks = None
self._main_task = None
self._sub_tasks = None
self.channel_manager = ChannelManager()
self.replicate_config = config.get('replicate', {})
self.external_signal_config = config.get('external_signal', {})
# What the config should look like
# "replicate": {
# "external_signal": {
# "enabled": true,
# "mode": "follower",
# "leaders": [
# {
# "url": "ws://localhost:8080/replicate/ws",
# "token": "test"
# "url": "ws://localhost:8080/signals/ws",
# "api_token": "test"
# }
# ]
# }
# "replicate": {
# "external_signal": {
# "enabled": true,
# "mode": "leader",
# "api_key": "test"
# "api_token": "test"
# }
self.mode = ReplicateModeType[self.replicate_config.get('mode', 'leader').lower()]
self.mode = ExternalSignalModeType[
self.external_signal_config.get('mode', 'leader').lower()
]
self.leaders_list = self.replicate_config.get('leaders', [])
self.push_throttle_secs = self.replicate_config.get('push_throttle_secs', 0.1)
self.leaders_list = self.external_signal_config.get('leaders', [])
self.push_throttle_secs = self.external_signal_config.get('push_throttle_secs', 0.1)
self.reply_timeout = self.replicate_config.get('follower_reply_timeout', 10)
self.ping_timeout = self.replicate_config.get('follower_ping_timeout', 2)
self.sleep_time = self.replicate_config.get('follower_sleep_time', 5)
self.reply_timeout = self.external_signal_config.get('follower_reply_timeout', 10)
self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2)
self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5)
if self.mode == ReplicateModeType.follower and len(self.leaders_list) == 0:
if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0:
raise ValueError("You must specify at least 1 leader in follower mode.")
# This is only used by the leader, the followers use the tokens specified
@ -95,12 +97,23 @@ class ReplicateController(RPCHandler):
# If you do not specify an API key in the config, one will be randomly
# generated and logged on startup
default_api_key = secrets.token_urlsafe(16)
self.secret_api_key = self.replicate_config.get('api_key', default_api_key)
self.secret_api_key = self.external_signal_config.get('api_token', default_api_key)
self.start_threaded_loop()
self.start()
def is_leader(self):
"""
Leader flag
"""
return self.enabled() and self.mode == ExternalSignalModeType.leader
def enabled(self):
"""
Enabled flag
"""
return self.external_signal_config.get('enabled', False)
def start_threaded_loop(self):
"""
Start the main internal loop in another thread to run coroutines
@ -125,36 +138,29 @@ class ReplicateController(RPCHandler):
raise RuntimeError("Loop must be started before any function can"
" be submitted")
try:
return asyncio.run_coroutine_threadsafe(coroutine, self._loop)
except Exception as e:
logger.error(f"Error running coroutine - {str(e)}")
return None
async def main_loop(self):
"""
Main loop coro
Start the loop based on what mode we're in
"""
try:
if self.mode == ReplicateModeType.leader:
await self.leader_loop()
elif self.mode == ReplicateModeType.follower:
await self.follower_loop()
except asyncio.CancelledError:
pass
except Exception:
pass
finally:
self._loop.stop()
def start(self):
"""
Start the controller main loop
"""
self.submit_coroutine(self.main_loop())
self._main_task = self.submit_coroutine(self.main())
async def shutdown(self):
"""
Shutdown all tasks and close up
"""
logger.info("Stopping rpc.externalsignalcontroller")
# Flip running flag
self._running = False
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
# Then disconnect all channels
await self.channel_manager.disconnect_all()
def cleanup(self) -> None:
"""
@ -162,18 +168,44 @@ class ReplicateController(RPCHandler):
"""
if self._thread:
if self._loop.is_running():
self._running = False
# Tell all coroutines submitted to the loop they're cancelled
pending = asyncio.all_tasks(loop=self._loop)
for task in pending:
task.cancel()
self._loop.call_soon_threadsafe(self.channel_manager.disconnect_all)
self._main_task.cancel()
self._thread.join()
async def main(self):
"""
Main coro
Start the loop based on what mode we're in
"""
try:
if self.mode == ExternalSignalModeType.leader:
logger.info("Starting rpc.externalsignalcontroller in Leader mode")
await self.run_leader_mode()
elif self.mode == ExternalSignalModeType.follower:
logger.info("Starting rpc.externalsignalcontroller in Follower mode")
await self.run_follower_mode()
except asyncio.CancelledError:
# We're cancelled
await self.shutdown()
except Exception as e:
# Log the error
logger.error(f"Exception occurred in main task: {e}")
logger.exception(e)
finally:
# This coroutine is the last thing to be ended, so it should stop the loop
self._loop.stop()
def log_api_token(self):
"""
Log the API token
"""
logger.info("-" * 15)
logger.info(f"API_KEY: {self.secret_api_key}")
logger.info("-" * 15)
def send_msg(self, msg: MessageType) -> None:
"""
Support RPC calls
@ -186,7 +218,9 @@ class ReplicateController(RPCHandler):
logger.error(f"Message is empty! {msg}")
def send_message(self, msg: MessageType) -> None:
""" Broadcast message over all channels if there are any """
"""
Broadcast message over all channels if there are any
"""
if self.channel_manager.has_channels():
self._send_message(msg)
@ -205,39 +239,60 @@ class ReplicateController(RPCHandler):
else:
logger.warning("Can not send data, leader loop has not started yet!")
def is_leader(self):
"""
Leader flag
"""
return self.enabled() and self.mode == ReplicateModeType.leader
async def send_initial_data(self, channel):
logger.info("Sending initial data through channel")
def enabled(self):
"""
Enabled flag
"""
return self.replicate_config.get('enabled', False)
# We first send pairlist data
# We should move this to a func in the RPC object
initial_data = {
"data_type": LeaderMessageType.pairlist,
"data": self.freqtrade.pairlists.whitelist
}
# ----------------------- LEADER LOGIC ------------------------------
await channel.send(initial_data)
async def leader_loop(self):
async def _handle_leader_message(self, message: MessageType):
"""
Handle message received from a Leader
"""
type = message.get("data_type")
data = message.get("data")
self._rpc._handle_emitted_data(type, data)
# ----------------------------------------------------------------------
async def run_leader_mode(self):
"""
Main leader coroutine
This starts all of the leader coros and registers the endpoint on
the ApiServer
"""
logger.info("Running rpc.replicate in Leader mode")
logger.info("-" * 15)
logger.info(f"API_KEY: {self.secret_api_key}")
logger.info("-" * 15)
self.register_leader_endpoint()
self.log_api_token()
try:
await self._broadcast_queue_data()
except Exception as e:
logger.error("Exception occurred in Leader loop: ")
logger.exception(e)
self._sub_tasks = [
self._loop.create_task(self._broadcast_queue_data())
]
return await asyncio.gather(*self._sub_tasks)
async def run_follower_mode(self):
"""
Main follower coroutine
This starts all of the follower connection coros
"""
rpc_lock = asyncio.Lock()
self._sub_tasks = [
self._loop.create_task(self._handle_leader_connection(leader, rpc_lock))
for leader in self.leaders_list
]
return await asyncio.gather(*self._sub_tasks)
async def _broadcast_queue_data(self):
"""
@ -261,8 +316,6 @@ class ReplicateController(RPCHandler):
except asyncio.CancelledError:
# Silently stop
pass
except Exception as e:
logger.exception(e)
async def get_api_token(
self,
@ -279,7 +332,7 @@ class ReplicateController(RPCHandler):
logger.info("Denying websocket request...")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
def register_leader_endpoint(self, path: str = "/replicate/ws"):
def register_leader_endpoint(self, path: str = "/signals/ws"):
"""
Attach and start the main leader loop to the ApiServer
@ -334,73 +387,16 @@ class ReplicateController(RPCHandler):
logger.error(f"Failed to serve - {websocket.client}")
await self.channel_manager.on_disconnect(websocket)
async def send_initial_data(self, channel):
logger.info("Sending initial data through channel")
# We first send pairlist data
initial_data = {
"data_type": LeaderMessageType.pairlist,
"data": self.freqtrade.pairlists.whitelist
}
await channel.send(initial_data)
# -------------------------------FOLLOWER LOGIC----------------------------
async def follower_loop(self):
"""
Main follower coroutine
This starts all of the follower connection coros
"""
logger.info("Starting rpc.replicate in Follower mode")
responses = await self._connect_to_leaders()
# Eventually add the ability to send requests to the Leader
# await self._send_requests()
for result in responses:
if isinstance(result, Exception):
logger.debug(f"Exception in Follower loop: {result}")
traceback_message = ''.join(traceback.format_tb(result.__traceback__))
logger.error(traceback_message)
async def _handle_leader_message(self, message: MessageType):
"""
Handle message received from a Leader
"""
type = message.get("data_type")
data = message.get("data")
self._rpc._handle_emitted_data(type, data)
async def _connect_to_leaders(self):
"""
For each leader in `self.leaders_list` create a connection and
listen for data.
"""
rpc_lock = asyncio.Lock()
logger.info("Starting connections to Leaders...")
self.follower_tasks = [
self._loop.create_task(self._handle_leader_connection(leader, rpc_lock))
for leader in self.leaders_list
]
return await asyncio.gather(*self.follower_tasks, return_exceptions=True)
async def _handle_leader_connection(self, leader, lock):
"""
Given a leader, connect and wait on data. If connection is lost,
it will attempt to reconnect.
"""
try:
url, token = leader["url"], leader["token"]
url, token = leader["url"], leader["api_token"]
websocket_url = f"{url}?token={token}"
logger.info(f"Attempting to connect to Leader at: {url}")
# TODO: limit the amount of connection retries
while True:
try:
async with websockets.connect(websocket_url) as ws:

View File

@ -3,7 +3,7 @@ from typing import Union
from fastapi import WebSocket as FastAPIWebSocket
from websockets import WebSocketClientProtocol as WebSocket
from freqtrade.rpc.replicate.types import WebSocketType
from freqtrade.rpc.external_signal.types import WebSocketType
class WebSocketProxy:

View File

@ -5,7 +5,7 @@ from abc import ABC, abstractmethod
import msgpack
import orjson
from freqtrade.rpc.replicate.proxy import WebSocketProxy
from freqtrade.rpc.external_signal.proxy import WebSocketProxy
logger = logging.getLogger(__name__)

View File

@ -1109,16 +1109,22 @@ class RPC:
external_pairlist.add_pairlist_data(pairlist)
elif type == LeaderMessageType.analyzed_df:
# Convert the dataframe back from json
key, value = data["key"], data["value"]
pair, timeframe, candle_type = key
dataframe = json_to_dataframe(value)
dataprovider = self._freqtrade.dataprovider
# Skip any pairs that we don't have in the pairlist?
# leader_pairlist = self._freqtrade.pairlists._whitelist
# if pair not in leader_pairlist:
# return
dataframe = json_to_dataframe(value)
logger.debug(f"Received analyzed dataframe for {pair}")
logger.debug(dataframe.tail())
# Add the dataframe to the dataprovider
dataprovider = self._freqtrade.dataprovider
dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)

View File

@ -54,14 +54,14 @@ class RPCManager:
# Enable Replicate mode
# For this to be enabled, the API server must also be enabled
if config.get('replicate', {}).get('enabled', False):
logger.info('Enabling rpc.replicate')
from freqtrade.rpc.replicate import ReplicateController
replicate_rpc = ReplicateController(self._rpc, config, apiserver)
self.registered_modules.append(replicate_rpc)
if config.get('external_signal', {}).get('enabled', False):
logger.info('Enabling RPC.ExternalSignalController')
from freqtrade.rpc.external_signal import ExternalSignalController
external_signal_rpc = ExternalSignalController(self._rpc, config, apiserver)
self.registered_modules.append(external_signal_rpc)
# Attach the controller to FreqTrade
freqtrade.replicate_controller = replicate_rpc
freqtrade.external_signal_controller = external_signal_rpc
apiserver.start_api()

View File

@ -18,7 +18,6 @@ from freqtrade.enums.runmode import RunMode
from freqtrade.exceptions import OperationalException, StrategyError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds
from freqtrade.persistence import Order, PairLocks, Trade
from freqtrade.rpc.replicate import ReplicateController
from freqtrade.strategy.hyper import HyperStrategyMixin
from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators,
_create_and_merge_informative_pair,
@ -111,7 +110,6 @@ class IStrategy(ABC, HyperStrategyMixin):
# the dataprovider (dp) (access to other candles, historic data, ...)
# and wallets - access to the current balance.
dp: DataProvider
replicate_controller: Optional[ReplicateController]
wallets: Optional[Wallets] = None
# Filled from configuration
stake_currency: str
@ -764,7 +762,7 @@ class IStrategy(ABC, HyperStrategyMixin):
if not external_data:
dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type)
else:
dataframe, last_analyzed = self.dp.get_external_df(pair, self.timeframe, candle_type)
dataframe, _ = self.dp.get_external_df(pair, self.timeframe, candle_type)
if not isinstance(dataframe, DataFrame) or dataframe.empty:
logger.warning('Empty candle (OHLCV) data for pair %s', pair)