client implementation, minor fixes

This commit is contained in:
Timothy Pogue 2022-08-30 19:21:34 -06:00
parent 418bd26a80
commit 346e73dd75
22 changed files with 323 additions and 959 deletions

View File

@ -172,7 +172,18 @@
"jwt_secret_key": "somethingrandom",
"CORS_origins": [],
"username": "freqtrader",
"password": "SuperSecurePassword"
"password": "SuperSecurePassword",
"ws_token": "a_secret_ws_token",
"enable_message_ws": false
},
"external_message_consumer": {
"enabled": false,
"producers": [
{
"url": "ws://some.freqtrade.bot/api/v1/message/ws",
"ws_token": "a_secret_ws_token"
}
]
},
"bot_name": "freqtrade",
"db_url": "sqlite:///tradesv3.sqlite",

View File

@ -61,7 +61,6 @@ USERPATH_FREQAIMODELS = 'freqaimodels'
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
FOLLOWER_MODE_OPTIONS = ['follower', 'leader']
WAIT_DATA_POLICY_OPTIONS = ['none', 'first', 'all']
ENV_VAR_PREFIX = 'FREQTRADE__'
@ -246,7 +245,7 @@ CONF_SCHEMA = {
'exchange': {'$ref': '#/definitions/exchange'},
'edge': {'$ref': '#/definitions/edge'},
'freqai': {'$ref': '#/definitions/freqai'},
'external_signal': {'$ref': '#/definitions/external_signal'},
'external_message_consumer': {'$ref': '#/definitions/external_message_consumer'},
'experimental': {
'type': 'object',
'properties': {
@ -404,7 +403,8 @@ CONF_SCHEMA = {
},
'username': {'type': 'string'},
'password': {'type': 'string'},
'api_token': {'type': 'string'},
'ws_token': {'type': 'string'},
'enable_message_ws': {'type': 'boolean', 'default': False},
'jwt_secret_key': {'type': 'string'},
'CORS_origins': {'type': 'array', 'items': {'type': 'string'}},
'verbosity': {'type': 'string', 'enum': ['error', 'info']},
@ -489,35 +489,31 @@ CONF_SCHEMA = {
},
'required': ['process_throttle_secs', 'allowed_risk']
},
'external_signal': {
'external_message_consumer': {
'type': 'object',
'properties': {
'enabled': {'type': 'boolean', 'default': False},
'mode': {
'type': 'string',
'enum': FOLLOWER_MODE_OPTIONS
},
'api_token': {'type': 'string', 'default': ''},
'leaders': {
'producers': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'url': {'type': 'string', 'default': ''},
'api_token': {'type': 'string', 'default': ''},
'ws_token': {'type': 'string', 'default': ''},
}
}
},
'follower_reply_timeout': {'type': 'integer'},
'follower_sleep_time': {'type': 'integer'},
'follower_ping_timeout': {'type': 'integer'},
'reply_timeout': {'type': 'integer'},
'sleep_time': {'type': 'integer'},
'ping_timeout': {'type': 'integer'},
'wait_data_policy': {
'type': 'string',
'enum': WAIT_DATA_POLICY_OPTIONS
},
'wait_data_timeout': {'type': 'integer', 'default': 5},
'remove_signals_analyzed_df': {'type': 'boolean', 'default': False}
},
'required': ['mode']
'required': ['producers']
},
"freqai": {
"type": "object",

View File

@ -48,11 +48,13 @@ class DataProvider:
self.__msg_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._config.get('timeframe', '1h')))
self._num_sources = len(self._config.get('external_signal', {}).get('leader_list', []))
self._wait_data_policy = self._config.get('external_signal', {}).get(
self._num_sources = len(
self._config.get('external_message_consumer', {}).get('producers', [])
)
self._wait_data_policy = self._config.get('external_message_consumer', {}).get(
'wait_data_policy', WaitDataPolicy.all)
self._wait_data_timeout = self._config.get(
'external_signal', {}).get('wait_data_timeout', 5)
self._wait_data_timeout = self._config.get('external_message_consumer', {}).get(
'wait_data_timeout', 5)
def _set_dataframe_max_index(self, limit_index: int):
"""

View File

@ -3,7 +3,7 @@ from freqtrade.enums.backteststate import BacktestState
from freqtrade.enums.candletype import CandleType
from freqtrade.enums.exitchecktuple import ExitCheckTuple
from freqtrade.enums.exittype import ExitType
from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy
from freqtrade.enums.externalmessages import WaitDataPolicy
from freqtrade.enums.marginmode import MarginMode
from freqtrade.enums.ordertypevalue import OrderTypeValues
from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType

View File

@ -0,0 +1,7 @@
from enum import Enum
class WaitDataPolicy(str, Enum):
none = "none"
one = "one"
all = "all"

View File

@ -1,18 +0,0 @@
from enum import Enum
class ExternalSignalModeType(str, Enum):
leader = "leader"
follower = "follower"
class LeaderMessageType(str, Enum):
default = "default"
pairlist = "pairlist"
analyzed_df = "analyzed_df"
class WaitDataPolicy(str, Enum):
none = "none"
one = "one"
all = "all"

View File

@ -30,6 +30,7 @@ from freqtrade.plugins.pairlistmanager import PairListManager
from freqtrade.plugins.protectionmanager import ProtectionManager
from freqtrade.resolvers import ExchangeResolver, StrategyResolver
from freqtrade.rpc import RPCManager
from freqtrade.rpc.emc import ExternalMessageConsumer
from freqtrade.strategy.interface import IStrategy
from freqtrade.strategy.strategy_wrapper import strategy_safe_wrapper
from freqtrade.util import FtPrecise
@ -90,11 +91,17 @@ class FreqtradeBot(LoggingMixin):
self.strategy.dp = self.dataprovider
# Attach Wallets to strategy instance
self.strategy.wallets = self.wallets
# Attach rpc to strategy instance
self.strategy.rpc = self.rpc
# Initializing Edge only if enabled
self.edge = Edge(self.config, self.exchange, self.strategy) if \
self.config.get('edge', {}).get('enabled', False) else None
# Init ExternalMessageConsumer if enabled
self.emc = ExternalMessageConsumer(self.rpc._rpc, self.config) if \
self.config.get('external_message_consumer', {}).get('enabled', False) else None
self.active_pair_whitelist = self._refresh_active_whitelist()
# Set initial bot state from config
@ -150,6 +157,8 @@ class FreqtradeBot(LoggingMixin):
self.check_for_open_trades()
self.rpc.cleanup()
if self.emc:
self.emc.shutdown()
Trade.commit()
self.exchange.close()
@ -192,7 +201,11 @@ class FreqtradeBot(LoggingMixin):
strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)()
self.strategy.analyze(self.active_pair_whitelist)
if self.emc:
leader_pairs = self.pairlists._whitelist
self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs)
else:
self.strategy.analyze(self.active_pair_whitelist)
with self._exit_lock:
# Check for exchange cancelations, timeouts and user requested replace
@ -255,7 +268,7 @@ class FreqtradeBot(LoggingMixin):
self.pairlists.refresh_pairlist()
_whitelist = self.pairlists.whitelist
self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'msg': _whitelist})
self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist})
# Calculating Edge positioning
if self.edge:

View File

@ -26,7 +26,7 @@ async def message_endpoint(
# Return a channel ID, pass that instead of ws to the rest of the methods
channel = await channel_manager.on_connect(ws)
# Keep connection open until explicitly closed, and sleep
# Keep connection open until explicitly closed, and process requests
try:
while not channel.is_closed():
request = await channel.recv()

View File

@ -112,9 +112,6 @@ class ApiServer(RPCHandler):
# Cancel the queue task
self._background_task.cancel()
# Finally stop the loop
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
@classmethod
@ -127,7 +124,6 @@ class ApiServer(RPCHandler):
def send_msg(self, msg: Dict[str, str]) -> None:
if self._queue:
logger.info(f"Adding message to queue: {msg}")
sync_q = self._queue.sync_q
sync_q.put(msg)
@ -155,9 +151,11 @@ class ApiServer(RPCHandler):
app.include_router(api_backtest, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token)],
)
app.include_router(ws_router, prefix="/api/v1",
dependencies=[Depends(get_ws_token)]
)
if self._config.get('api_server', {}).get('enable_message_ws', False):
logger.info("Enabling Message WebSocket")
app.include_router(ws_router, prefix="/api/v1",
dependencies=[Depends(get_ws_token)]
)
app.include_router(router_login, prefix="/api/v1", tags=["auth"])
# UI Router MUST be last!
app.include_router(router_ui, prefix='')
@ -194,17 +192,19 @@ class ApiServer(RPCHandler):
try:
while True:
logger.debug("Getting queue data...")
logger.debug("Getting queue messages...")
# Get data from queue
data = await async_queue.get()
logger.debug(f"Found data: {data}")
message = await async_queue.get()
logger.debug(f"Found message of type: {message.get('type')}")
# Broadcast it
await self._channel_manager.broadcast(data)
await self._channel_manager.broadcast(message)
# Sleep, make this configurable?
await asyncio.sleep(0.1)
except asyncio.CancelledError:
# Silently stop
pass
# Disconnect channels and stop the loop on cancel
await self._channel_manager.disconnect_all()
self._loop.stop()
# For testing, shouldn't happen when stable
except Exception as e:
logger.info(f"Exception happened in background task: {e}")
@ -246,7 +246,8 @@ class ApiServer(RPCHandler):
if self._standalone:
self._server.run()
else:
self.start_message_queue()
if self._config.get('api_server', {}).get('enable_message_ws', False):
self.start_message_queue()
self._server.run_in_thread()
except Exception:
logger.exception("Api server failed to start.")

View File

@ -116,8 +116,6 @@ class ChannelManager:
with self._lock:
channel = self.channels.get(websocket)
if channel:
logger.debug(f"Disconnecting channel - {channel}")
if not channel.is_closed():
await channel.close()
@ -142,7 +140,6 @@ class ChannelManager:
"""
with self._lock:
message_type = data.get('type')
logger.debug(f"Broadcasting data: {message_type} - {data}")
for websocket, channel in self.channels.items():
try:
if channel.subscribed_to(message_type):

229
freqtrade/rpc/emc.py Normal file
View File

@ -0,0 +1,229 @@
"""
ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
import asyncio
import logging
import socket
from threading import Thread
from typing import Any, Dict
import websockets
from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.misc import json_to_dataframe, remove_entry_exit_signals
from freqtrade.rpc import RPC
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
logger = logging.getLogger(__name__)
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
other FreqTrade bot's
"""
def __init__(
self,
rpc: RPC,
config: Dict[str, Any],
):
self._rpc = rpc
self._config = config
self._running = False
self._thread = None
self._loop = None
self._main_task = None
self._sub_tasks = None
self._emc_config = self._config.get('external_message_consumer', {})
self.enabled = self._emc_config.get('enabled', False)
self.producers = self._emc_config.get('producers', [])
if self.enabled and len(self.producers) < 1:
raise ValueError("You must specify at least 1 Producer to connect to.")
self.reply_timeout = self._emc_config.get('reply_timeout', 10)
self.ping_timeout = self._emc_config.get('ping_timeout', 2)
self.sleep_time = self._emc_config.get('sleep_time', 5)
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
# callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
self.start()
def start(self):
"""
Start the main internal loop in another thread to run coroutines
"""
self._loop = asyncio.new_event_loop()
if not self._thread:
logger.info("Starting ExternalMessageConsumer")
self._thread = Thread(target=self._loop.run_forever)
self._thread.start()
self._running = True
else:
raise RuntimeError("A loop is already running")
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
def shutdown(self):
"""
Shutdown the loop, thread, and tasks
"""
if self._thread and self._loop:
logger.info("Stopping ExternalMessageConsumer")
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
if self._main_task:
# Cancel the main task
self._main_task.cancel()
self._thread.join()
async def _main(self):
"""
The main task coroutine
"""
rpc_lock = asyncio.Lock()
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, rpc_lock))
for producer in self.producers
]
await asyncio.gather(*self._sub_tasks)
except asyncio.CancelledError:
pass
finally:
# Stop the loop once we are done
self._loop.stop()
async def _handle_producer_connection(self, producer, lock):
"""
Main connection loop for the consumer
"""
try:
while True:
try:
url, token = producer['url'], producer['ws_token']
ws_url = f"{url}?token={token}"
async with websockets.connect(ws_url) as ws:
logger.info("Connection successful")
channel = WebSocketChannel(ws)
# Tell the producer we only want these topics
# Should always be the first thing we send
await channel.send(
self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics)
)
# Now receive data, if none is within the time limit, ping
while True:
try:
message = await asyncio.wait_for(
channel.recv(),
timeout=5
)
async with lock:
# Handle the data here
# We use a lock because it will call RPC methods
self.handle_producer_message(message)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
await asyncio.wait_for(ping, timeout=self.ping_timeout)
logger.debug(f"Connection to {url} still alive...")
continue
except Exception:
logger.info(
f"Ping error {url} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except (
socket.gaierror,
ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
except asyncio.CancelledError:
# Exit silently
pass
def compose_consumer_request(self, type_: str, data: Any) -> Dict[str, Any]:
"""
Create a request for sending to a producer
:param type_: The RPCRequestType
:param data: The data to send
:returns: Dict[str, Any]
"""
return {'type': type_, 'data': data}
# How we do things here isn't set in stone. There seems to be some interest
# in figuring out a better way, but we shall do this for now.
def handle_producer_message(self, message: Dict[str, Any]):
"""
Handles external messages from a Producer
"""
# Should we have a default message type?
message_type = message.get('type', RPCMessageType.STATUS)
message_data = message.get('data')
logger.debug(f"Received message of type {message_type}")
# Handle Whitelists
if message_type == RPCMessageType.WHITELIST:
pairlist = message_data
# Add the pairlist data to the ExternalPairlist plugin
external_pairlist = self._rpc._freqtrade.pairlists._pairlist_handlers[0]
external_pairlist.add_pairlist_data(pairlist)
# Handle analyzed dataframes
elif message_type == RPCMessageType.ANALYZED_DF:
# This shouldn't happen
if message_data is None:
return
key, value = message_data.get('key'), message_data.get('data')
pair, timeframe, candle_type = key
# Convert the JSON to a pandas DataFrame
dataframe = json_to_dataframe(value)
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
dataframe = remove_entry_exit_signals(dataframe)
# Add the dataframe to the dataprovider
dataprovider = self._rpc._freqtrade.dataprovider
dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)

View File

@ -1,5 +0,0 @@
# # flake8: noqa: F401
# from freqtrade.rpc.external_signal.controller import ExternalSignalController
#
#
# __all__ = ('ExternalSignalController')

View File

@ -1,145 +0,0 @@
# import logging
# from threading import RLock
# from typing import Type
#
# from freqtrade.rpc.external_signal.proxy import WebSocketProxy
# from freqtrade.rpc.external_signal.serializer import MsgPackWebSocketSerializer
# from freqtrade.rpc.external_signal.types import WebSocketType
#
#
# logger = logging.getLogger(__name__)
#
#
# class WebSocketChannel:
# """
# Object to help facilitate managing a websocket connection
# """
#
# def __init__(
# self,
# websocket: WebSocketType,
# serializer_cls: Type[WebSocketSerializer] = MsgPackWebSocketSerializer
# ):
# # The WebSocket object
# self._websocket = WebSocketProxy(websocket)
# # The Serializing class for the WebSocket object
# self._serializer_cls = serializer_cls
#
# # Internal event to signify a closed websocket
# self._closed = False
#
# # Wrap the WebSocket in the Serializing class
# self._wrapped_ws = self._serializer_cls(self._websocket)
#
# async def send(self, data):
# """
# Send data on the wrapped websocket
# """
# # logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}")
# await self._wrapped_ws.send(data)
#
# async def recv(self):
# """
# Receive data on the wrapped websocket
# """
# return await self._wrapped_ws.recv()
#
# async def ping(self):
# """
# Ping the websocket
# """
# return await self._websocket.ping()
#
# async def close(self):
# """
# Close the WebSocketChannel
# """
#
# self._closed = True
#
# def is_closed(self):
# return self._closed
#
#
# class ChannelManager:
# def __init__(self):
# self.channels = dict()
# self._lock = RLock() # Re-entrant Lock
#
# async def on_connect(self, websocket: WebSocketType):
# """
# Wrap websocket connection into Channel and add to list
#
# :param websocket: The WebSocket object to attach to the Channel
# """
# if hasattr(websocket, "accept"):
# try:
# await websocket.accept()
# except RuntimeError:
# # The connection was closed before we could accept it
# return
#
# ws_channel = WebSocketChannel(websocket)
#
# with self._lock:
# self.channels[websocket] = ws_channel
#
# return ws_channel
#
# async def on_disconnect(self, websocket: WebSocketType):
# """
# Call close on the channel if it's not, and remove from channel list
#
# :param websocket: The WebSocket objet attached to the Channel
# """
# with self._lock:
# channel = self.channels.get(websocket)
# if channel:
# logger.debug(f"Disconnecting channel - {channel}")
#
# if not channel.is_closed():
# await channel.close()
#
# del self.channels[websocket]
#
# async def disconnect_all(self):
# """
# Disconnect all Channels
# """
# with self._lock:
# for websocket, channel in self.channels.items():
# if not channel.is_closed():
# await channel.close()
#
# self.channels = dict()
#
# async def broadcast(self, data):
# """
# Broadcast data on all Channels
#
# :param data: The data to send
# """
# with self._lock:
# for websocket, channel in self.channels.items():
# try:
# await channel.send(data)
# except RuntimeError:
# # Handle cannot send after close cases
# await self.on_disconnect(websocket)
#
# async def send_direct(self, channel, data):
# """
# Send data directly through direct_channel only
#
# :param direct_channel: The WebSocketChannel object to send data through
# :param data: The data to send
# """
# # We iterate over the channels to get reference to the websocket object
# # so we can disconnect incase of failure
# await channel.send(data)
#
# def has_channels(self):
# """
# Flag for more than 0 channels
# """
# return len(self.channels) > 0

View File

@ -1,449 +0,0 @@
# """
# This module manages replicate mode communication
# """
# import asyncio
# import logging
# import secrets
# import socket
# from threading import Thread
# from typing import Any, Callable, Coroutine, Dict, Union
#
# import websockets
# from fastapi import Depends
# from fastapi import WebSocket as FastAPIWebSocket
# from fastapi import WebSocketDisconnect, status
# from janus import Queue as ThreadedQueue
#
# from freqtrade.enums import ExternalSignalModeType, LeaderMessageType, RPCMessageType
# from freqtrade.rpc import RPC, RPCHandler
# from freqtrade.rpc.external_signal.channel import ChannelManager
# from freqtrade.rpc.external_signal.types import MessageType
# from freqtrade.rpc.external_signal.utils import is_websocket_alive
#
#
# logger = logging.getLogger(__name__)
#
#
# class ExternalSignalController(RPCHandler):
# """ This class handles all websocket communication """
#
# def __init__(
# self,
# rpc: RPC,
# config: Dict[str, Any],
# api_server: Union[Any, None] = None
# ) -> None:
# """
# Init the ExternalSignalController class, and init the super class RPCHandler
# :param rpc: instance of RPC Helper class
# :param config: Configuration object
# :param api_server: The ApiServer object
# :return: None
# """
# super().__init__(rpc, config)
#
# self.freqtrade = rpc._freqtrade
# self.api_server = api_server
#
# if not self.api_server:
# raise RuntimeError("The API server must be enabled for external signals to work")
#
# self._loop = None
# self._running = False
# self._thread = None
# self._queue = None
#
# self._main_task = None
# self._sub_tasks = None
#
# self._message_handlers = {
# LeaderMessageType.pairlist: self._rpc._handle_pairlist_message,
# LeaderMessageType.analyzed_df: self._rpc._handle_analyzed_df_message,
# LeaderMessageType.default: self._rpc._handle_default_message
# }
#
# self.channel_manager = ChannelManager()
# self.external_signal_config = config.get('external_signal', {})
#
# # What the config should look like
# # "external_signal": {
# # "enabled": true,
# # "mode": "follower",
# # "leaders": [
# # {
# # "url": "ws://localhost:8080/signals/ws",
# # "api_token": "test"
# # }
# # ]
# # }
#
# # "external_signal": {
# # "enabled": true,
# # "mode": "leader",
# # "api_token": "test"
# # }
#
# self.mode = ExternalSignalModeType[
# self.external_signal_config.get('mode', 'leader').lower()
# ]
#
# self.leaders_list = self.external_signal_config.get('leaders', [])
# self.push_throttle_secs = self.external_signal_config.get('push_throttle_secs', 0.1)
#
# self.reply_timeout = self.external_signal_config.get('follower_reply_timeout', 10)
# self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2)
# self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5)
#
# # Validate external_signal_config here?
#
# if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0:
# raise ValueError("You must specify at least 1 leader in follower mode.")
#
# # This is only used by the leader, the followers use the tokens specified
# # in each of the leaders
# # If you do not specify an API key in the config, one will be randomly
# # generated and logged on startup
# default_api_key = secrets.token_urlsafe(16)
# self.secret_api_key = self.external_signal_config.get('api_token', default_api_key)
#
# self.start()
#
# def is_leader(self):
# """
# Leader flag
# """
# return self.enabled() and self.mode == ExternalSignalModeType.leader
#
# def enabled(self):
# """
# Enabled flag
# """
# return self.external_signal_config.get('enabled', False)
#
# def num_leaders(self):
# """
# The number of leaders we should be connected to
# """
# return len(self.leaders_list)
#
# def start_threaded_loop(self):
# """
# Start the main internal loop in another thread to run coroutines
# """
# self._loop = asyncio.new_event_loop()
#
# if not self._thread:
# self._thread = Thread(target=self._loop.run_forever)
# self._thread.start()
# self._running = True
# else:
# raise RuntimeError("A loop is already running")
#
# def submit_coroutine(self, coroutine: Coroutine):
# """
# Submit a coroutine to the threaded loop
# """
# if not self._running:
# raise RuntimeError("Cannot schedule new futures after shutdown")
#
# if not self._loop or not self._loop.is_running():
# raise RuntimeError("Loop must be started before any function can"
# " be submitted")
#
# return asyncio.run_coroutine_threadsafe(coroutine, self._loop)
#
# def start(self):
# """
# Start the controller main loop
# """
# self.start_threaded_loop()
# self._main_task = self.submit_coroutine(self.main())
#
# async def shutdown(self):
# """
# Shutdown all tasks and close up
# """
# logger.info("Stopping rpc.externalsignalcontroller")
#
# # Flip running flag
# self._running = False
#
# # Cancel sub tasks
# for task in self._sub_tasks:
# task.cancel()
#
# # Then disconnect all channels
# await self.channel_manager.disconnect_all()
#
# def cleanup(self) -> None:
# """
# Cleanup pending module resources.
# """
# if self._thread:
# if self._loop.is_running():
# self._main_task.cancel()
# self._thread.join()
#
# async def main(self):
# """
# Main coro
#
# Start the loop based on what mode we're in
# """
# try:
# if self.mode == ExternalSignalModeType.leader:
# logger.info("Starting rpc.externalsignalcontroller in Leader mode")
#
# await self.run_leader_mode()
# elif self.mode == ExternalSignalModeType.follower:
# logger.info("Starting rpc.externalsignalcontroller in Follower mode")
#
# await self.run_follower_mode()
#
# except asyncio.CancelledError:
# # We're cancelled
# await self.shutdown()
# except Exception as e:
# # Log the error
# logger.error(f"Exception occurred in main task: {e}")
# logger.exception(e)
# finally:
# # This coroutine is the last thing to be ended, so it should stop the loop
# self._loop.stop()
#
# def log_api_token(self):
# """
# Log the API token
# """
# logger.info("-" * 15)
# logger.info(f"API_KEY: {self.secret_api_key}")
# logger.info("-" * 15)
#
# def send_msg(self, msg: MessageType) -> None:
# """
# Support RPC calls
# """
# if msg["type"] == RPCMessageType.EMIT_DATA:
# message = msg.get("message")
# if message:
# self.send_message(message)
# else:
# logger.error(f"Message is empty! {msg}")
#
# def send_message(self, msg: MessageType) -> None:
# """
# Broadcast message over all channels if there are any
# """
#
# if self.channel_manager.has_channels():
# self._send_message(msg)
# else:
# logger.debug("No listening followers, skipping...")
# pass
#
# def _send_message(self, msg: MessageType):
# """
# Add data to the internal queue to be broadcasted. This func will block
# if the queue is full. This is meant to be called in the main thread.
# """
# if self._queue:
# queue = self._queue.sync_q
# queue.put(msg) # This will block if the queue is full
# else:
# logger.warning("Can not send data, leader loop has not started yet!")
#
# async def send_initial_data(self, channel):
# logger.info("Sending initial data through channel")
#
# data = self._rpc._initial_leader_data()
#
# for message in data:
# await channel.send(message)
#
# async def _handle_leader_message(self, message: MessageType):
# """
# Handle message received from a Leader
# """
# type = message.get("data_type", LeaderMessageType.default)
# data = message.get("data")
#
# handler: Callable = self._message_handlers[type]
# handler(type, data)
#
# # ----------------------------------------------------------------------
#
# async def run_leader_mode(self):
# """
# Main leader coroutine
#
# This starts all of the leader coros and registers the endpoint on
# the ApiServer
# """
# self.register_leader_endpoint()
# self.log_api_token()
#
# self._sub_tasks = [
# self._loop.create_task(self._broadcast_queue_data())
# ]
#
# return await asyncio.gather(*self._sub_tasks)
#
# async def run_follower_mode(self):
# """
# Main follower coroutine
#
# This starts all of the follower connection coros
# """
#
# rpc_lock = asyncio.Lock()
#
# self._sub_tasks = [
# self._loop.create_task(self._handle_leader_connection(leader, rpc_lock))
# for leader in self.leaders_list
# ]
#
# return await asyncio.gather(*self._sub_tasks)
#
# async def _broadcast_queue_data(self):
# """
# Loop over queue data and broadcast it
# """
# # Instantiate the queue in this coroutine so it's attached to our loop
# self._queue = ThreadedQueue()
# async_queue = self._queue.async_q
#
# try:
# while self._running:
# # Get data from queue
# data = await async_queue.get()
#
# # Broadcast it to everyone
# await self.channel_manager.broadcast(data)
#
# # Sleep
# await asyncio.sleep(self.push_throttle_secs)
#
# except asyncio.CancelledError:
# # Silently stop
# pass
#
# async def get_api_token(
# self,
# websocket: FastAPIWebSocket,
# token: Union[str, None] = None
# ):
# """
# Extract the API key from query param. Must match the
# set secret_api_key or the websocket connection will be closed.
# """
# if token == self.secret_api_key:
# return token
# else:
# logger.info("Denying websocket request...")
# await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
#
# def register_leader_endpoint(self, path: str = "/signals/ws"):
# """
# Attach and start the main leader loop to the ApiServer
#
# :param path: The endpoint path
# """
# if not self.api_server:
# raise RuntimeError("The leader needs the ApiServer to be active")
#
# # The endpoint function for running the main leader loop
# @self.api_server.app.websocket(path)
# async def leader_endpoint(
# websocket: FastAPIWebSocket,
# api_key: str = Depends(self.get_api_token)
# ):
# await self.leader_endpoint_loop(websocket)
#
# async def leader_endpoint_loop(self, websocket: FastAPIWebSocket):
# """
# The WebSocket endpoint served by the ApiServer. This handles connections,
# and adding them to the channel manager.
# """
# try:
# if is_websocket_alive(websocket):
# logger.info(f"Follower connected - {websocket.client}")
# channel = await self.channel_manager.on_connect(websocket)
#
# # Send initial data here
# # Data is being broadcasted right away as soon as startup,
# # we may not have to send initial data at all. Further testing
# # required.
# await self.send_initial_data(channel)
#
# # Keep connection open until explicitly closed, and sleep
# try:
# while not channel.is_closed():
# request = await channel.recv()
# logger.info(f"Follower request - {request}")
#
# except WebSocketDisconnect:
# # Handle client disconnects
# logger.info(f"Follower disconnected - {websocket.client}")
# await self.channel_manager.on_disconnect(websocket)
# except Exception as e:
# logger.info(f"Follower connection failed - {websocket.client}")
# logger.exception(e)
# # Handle cases like -
# # RuntimeError('Cannot call "send" once a closed message has been sent')
# await self.channel_manager.on_disconnect(websocket)
#
# except Exception:
# logger.error(f"Failed to serve - {websocket.client}")
# await self.channel_manager.on_disconnect(websocket)
#
# async def _handle_leader_connection(self, leader, lock):
# """
# Given a leader, connect and wait on data. If connection is lost,
# it will attempt to reconnect.
# """
# try:
# url, token = leader["url"], leader["api_token"]
# websocket_url = f"{url}?token={token}"
#
# logger.info(f"Attempting to connect to Leader at: {url}")
# while True:
# try:
# async with websockets.connect(websocket_url) as ws:
# channel = await self.channel_manager.on_connect(ws)
# logger.info(f"Connection to Leader at {url} successful")
# while True:
# try:
# data = await asyncio.wait_for(
# channel.recv(),
# timeout=self.reply_timeout
# )
# except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# # We haven't received data yet. Check the connection and continue.
# try:
# # ping
# ping = await channel.ping()
# await asyncio.wait_for(ping, timeout=self.ping_timeout)
# logger.debug(f"Connection to {url} still alive...")
# continue
# except Exception:
# logger.info(
# f"Ping error {url} - retrying in {self.sleep_time}s")
# asyncio.sleep(self.sleep_time)
# break
#
# async with lock:
# # Acquire lock so only 1 coro handling at a time
# # as we call the RPC module in the main thread
# await self._handle_leader_message(data)
#
# except (socket.gaierror, ConnectionRefusedError):
# logger.info(f"Connection Refused - retrying connection in {self.sleep_time}s")
# await asyncio.sleep(self.sleep_time)
# continue
# except websockets.exceptions.InvalidStatusCode as e:
# logger.error(f"Connection Refused - {e}")
# await asyncio.sleep(self.sleep_time)
# continue
#
# except asyncio.CancelledError:
# pass

View File

@ -1,61 +0,0 @@
# from typing import Union
#
# from fastapi import WebSocket as FastAPIWebSocket
# from websockets import WebSocketClientProtocol as WebSocket
#
# from freqtrade.rpc.external_signal.types import WebSocketType
#
#
# class WebSocketProxy:
# """
# WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol
# under the same API
# """
#
# def __init__(self, websocket: WebSocketType):
# self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket
#
# async def send(self, data):
# """
# Send data on the wrapped websocket
# """
# if isinstance(data, str):
# data = data.encode()
#
# if hasattr(self._websocket, "send_bytes"):
# await self._websocket.send_bytes(data)
# else:
# await self._websocket.send(data)
#
# async def recv(self):
# """
# Receive data on the wrapped websocket
# """
# if hasattr(self._websocket, "receive_bytes"):
# return await self._websocket.receive_bytes()
# else:
# return await self._websocket.recv()
#
# async def ping(self):
# """
# Ping the websocket, not supported by FastAPI WebSockets
# """
# if hasattr(self._websocket, "ping"):
# return await self._websocket.ping()
# return False
#
# async def close(self, code: int = 1000):
# """
# Close the websocket connection, only supported by FastAPI WebSockets
# """
# if hasattr(self._websocket, "close"):
# return await self._websocket.close(code)
# pass
#
# async def accept(self):
# """
# Accept the WebSocket connection, only support by FastAPI WebSockets
# """
# if hasattr(self._websocket, "accept"):
# return await self._websocket.accept()
# pass

View File

@ -1,65 +0,0 @@
# import json
# import logging
# from abc import ABC, abstractmethod
#
# import msgpack
# import orjson
#
# from freqtrade.rpc.external_signal.proxy import WebSocketProxy
#
#
# logger = logging.getLogger(__name__)
#
#
# class WebSocketSerializer(ABC):
# def __init__(self, websocket: WebSocketProxy):
# self._websocket: WebSocketProxy = websocket
#
# @abstractmethod
# def _serialize(self, data):
# raise NotImplementedError()
#
# @abstractmethod
# def _deserialize(self, data):
# raise NotImplementedError()
#
# async def send(self, data: bytes):
# await self._websocket.send(self._serialize(data))
#
# async def recv(self) -> bytes:
# data = await self._websocket.recv()
#
# return self._deserialize(data)
#
# async def close(self, code: int = 1000):
# await self._websocket.close(code)
#
# # Going to explore using MsgPack as the serialization,
# # as that might be the best method for sending pandas
# # dataframes over the wire
#
#
# class JSONWebSocketSerializer(WebSocketSerializer):
# def _serialize(self, data):
# return json.dumps(data)
#
# def _deserialize(self, data):
# return json.loads(data)
#
#
# class ORJSONWebSocketSerializer(WebSocketSerializer):
# ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY
#
# def _serialize(self, data):
# return orjson.dumps(data, option=self.ORJSON_OPTIONS)
#
# def _deserialize(self, data):
# return orjson.loads(data, option=self.ORJSON_OPTIONS)
#
#
# class MsgPackWebSocketSerializer(WebSocketSerializer):
# def _serialize(self, data):
# return msgpack.packb(data, use_bin_type=True)
#
# def _deserialize(self, data):
# return msgpack.unpackb(data, raw=False)

View File

@ -1,8 +0,0 @@
# from typing import Any, Dict, TypeVar
#
# from fastapi import WebSocket as FastAPIWebSocket
# from websockets import WebSocketClientProtocol as WebSocket
#
#
# WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket)
# MessageType = Dict[str, Any]

View File

@ -1,10 +0,0 @@
# from starlette.websockets import WebSocket, WebSocketState
#
#
# async def is_websocket_alive(ws: WebSocket) -> bool:
# if (
# ws.application_state == WebSocketState.CONNECTED and
# ws.client_state == WebSocketState.CONNECTED
# ):
# return True
# return False

View File

@ -1099,65 +1099,3 @@ class RPC:
if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
logger.debug(f"{channel} subscribed to topics: {data}")
channel.set_subscriptions(data)
#
# # ------------------------------ EXTERNAL SIGNALS -----------------------
#
# def _initial_leader_data(self):
# # We create a list of Messages to send to the follower on connect
# data = []
#
# # Send Pairlist data
# data.append({
# "data_type": LeaderMessageType.pairlist,
# "data": self._freqtrade.pairlists._whitelist
# })
#
# return data
#
# def _handle_pairlist_message(self, type, data):
# """
# Handles the emitted pairlists from the Leaders
#
# :param type: The data_type of the data
# :param data: The data
# """
# pairlist = data
#
# logger.debug(f"Handling Pairlist message: {pairlist}")
#
# external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0]
# external_pairlist.add_pairlist_data(pairlist)
#
# def _handle_analyzed_df_message(self, type, data):
# """
# Handles the analyzed dataframes from the Leaders
#
# :param type: The data_type of the data
# :param data: The data
# """
# key, value = data["key"], data["value"]
# pair, timeframe, candle_type = key
#
# # Skip any pairs that we don't have in the pairlist?
# # leader_pairlist = self._freqtrade.pairlists._whitelist
# # if pair not in leader_pairlist:
# # return
#
# dataframe = json_to_dataframe(value)
#
# if self._config.get('external_signal', {}).get('remove_signals_analyzed_df', False):
# dataframe = remove_entry_exit_signals(dataframe)
#
# logger.debug(f"Handling analyzed dataframe for {pair}")
# logger.debug(dataframe.tail())
#
# # Add the dataframe to the dataprovider
# dataprovider = self._freqtrade.dataprovider
# dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)
#
# def _handle_default_message(self, type, data):
# """
# Default leader message handler, just logs it. We should never have to
# run this unless the leader sends us some weird message.
# """
# logger.debug(f"Received message from Leader of type {type}: {data}")

View File

@ -78,7 +78,9 @@ class RPCManager:
'status': 'stopping bot'
}
"""
logger.info('Sending rpc message: %s', msg)
# Removed actually showing the message because the logs would be
# completely spammed of the json dataframe
logger.info('Sending rpc message of type: %s', msg.get('type'))
if 'pair' in msg:
msg.update({
'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair'])

View File

@ -5,19 +5,21 @@ This module defines the interface to apply for strategies
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
import arrow
from pandas import DataFrame
from freqtrade.constants import ListPairsWithTimeframes
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, SignalTagType,
SignalType, TradingMode)
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, SignalDirection,
SignalTagType, SignalType, TradingMode)
from freqtrade.enums.runmode import RunMode
from freqtrade.exceptions import OperationalException, StrategyError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds
from freqtrade.misc import dataframe_to_json, remove_entry_exit_signals
from freqtrade.persistence import Order, PairLocks, Trade
from freqtrade.rpc import RPCManager
from freqtrade.strategy.hyper import HyperStrategyMixin
from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators,
_create_and_merge_informative_pair,
@ -111,6 +113,7 @@ class IStrategy(ABC, HyperStrategyMixin):
# and wallets - access to the current balance.
dp: DataProvider
wallets: Optional[Wallets] = None
rpc: RPCManager
# Filled from configuration
stake_currency: str
# container variable for strategy source code
@ -702,8 +705,7 @@ class IStrategy(ABC, HyperStrategyMixin):
self,
dataframe: DataFrame,
metadata: dict,
external_data: bool = False,
finish_callback: Optional[Callable] = None,
external_data: bool = False
) -> DataFrame:
"""
Parses the given candle (OHLCV) data and returns a populated DataFrame
@ -729,17 +731,20 @@ class IStrategy(ABC, HyperStrategyMixin):
candle_type = self.config.get('candle_type_def', CandleType.SPOT)
self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type)
if finish_callback:
finish_callback(pair, dataframe, self.timeframe, candle_type)
if not external_data:
self.rpc.send_msg(
{
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': (pair, self.timeframe, candle_type),
'value': dataframe_to_json(dataframe)
}
}
)
else:
logger.debug("Skipping TA Analysis for already analyzed candle")
dataframe[SignalType.ENTER_LONG.value] = 0
dataframe[SignalType.EXIT_LONG.value] = 0
dataframe[SignalType.ENTER_SHORT.value] = 0
dataframe[SignalType.EXIT_SHORT.value] = 0
dataframe[SignalTagType.ENTER_TAG.value] = None
dataframe[SignalTagType.EXIT_TAG.value] = None
dataframe = remove_entry_exit_signals(dataframe)
logger.debug("Loop Analysis Launched")
@ -748,8 +753,7 @@ class IStrategy(ABC, HyperStrategyMixin):
def analyze_pair(
self,
pair: str,
external_data: bool = False,
finish_callback: Optional[Callable] = None,
external_data: bool = False
) -> None:
"""
Fetch data for this pair from dataprovider and analyze.
@ -773,7 +777,7 @@ class IStrategy(ABC, HyperStrategyMixin):
dataframe = strategy_safe_wrapper(
self._analyze_ticker_internal, message=""
)(dataframe, {'pair': pair}, external_data, finish_callback)
)(dataframe, {'pair': pair}, external_data)
self.assert_df(dataframe, df_len, df_close, df_date)
except StrategyError as error:
@ -786,15 +790,14 @@ class IStrategy(ABC, HyperStrategyMixin):
def analyze(
self,
pairs: List[str],
finish_callback: Optional[Callable] = None
pairs: List[str]
) -> None:
"""
Analyze all pairs using analyze_pair().
:param pairs: List of pairs to analyze
"""
for pair in pairs:
self.analyze_pair(pair, finish_callback=finish_callback)
self.analyze_pair(pair)
def analyze_external(self, pairs: List[str], leader_pairs: List[str]) -> None:
"""
@ -808,10 +811,10 @@ class IStrategy(ABC, HyperStrategyMixin):
# them normally.
# List order is not preserved when doing this!
# We use ^ instead of - for symmetric difference
# What do we do with these?
extra_pairs = list(set(pairs) ^ set(leader_pairs))
# These would be the pairs that we have trades in, which means
# we would have to analyze them normally
# Eventually maybe request data from the Leader if we don't have it?
for pair in leader_pairs:
# Analyze the pairs, but get the dataframe from the external data

View File

@ -1,74 +0,0 @@
import asyncio
import logging
import socket
from typing import Any
import websockets
from freqtrade.enums import RPCMessageType
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def compose_consumer_request(type_: str, data: Any):
return {"type": type_, "data": data}
async def _client():
# Trying to recreate multiple topic issue. Wait until first whitelist message,
# then CTRL-C to get the status message.
topics = [RPCMessageType.WHITELIST, RPCMessageType.STATUS]
try:
while True:
try:
url = "ws://localhost:8080/api/v1/message/ws?token=testtoken"
async with websockets.connect(url) as ws:
channel = WebSocketChannel(ws)
logger.info("Connection successful")
# Tell the producer we only want these topics
await channel.send(compose_consumer_request("subscribe", topics))
while True:
try:
data = await asyncio.wait_for(
channel.recv(),
timeout=5
)
logger.info(f"Data received - {data}")
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
await asyncio.wait_for(ping, timeout=2)
logger.debug(f"Connection to {url} still alive...")
continue
except Exception:
logger.info(
f"Ping error {url} - retrying in 5s")
await asyncio.sleep(2)
break
except (socket.gaierror, ConnectionRefusedError):
logger.info("Connection Refused - retrying connection in 5s")
await asyncio.sleep(2)
continue
except websockets.exceptions.InvalidStatusCode as e:
logger.error(f"Connection Refused - {e}")
await asyncio.sleep(2)
continue
except (asyncio.CancelledError, KeyboardInterrupt):
pass
def main():
asyncio.run(_client())
if __name__ == "__main__":
main()