Merge pull request #7303 from wizrds/feat/externalsignals
Producer/consumer mode
This commit is contained in:
@@ -211,6 +211,7 @@ def ask_user_config() -> Dict[str, Any]:
|
||||
)
|
||||
# Force JWT token to be a random string
|
||||
answers['api_server_jwt_key'] = secrets.token_hex()
|
||||
answers['api_server_ws_token'] = secrets.token_urlsafe(25)
|
||||
|
||||
return answers
|
||||
|
||||
|
@@ -243,6 +243,7 @@ CONF_SCHEMA = {
|
||||
'exchange': {'$ref': '#/definitions/exchange'},
|
||||
'edge': {'$ref': '#/definitions/edge'},
|
||||
'freqai': {'$ref': '#/definitions/freqai'},
|
||||
'external_message_consumer': {'$ref': '#/definitions/external_message_consumer'},
|
||||
'experimental': {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
@@ -404,6 +405,7 @@ CONF_SCHEMA = {
|
||||
},
|
||||
'username': {'type': 'string'},
|
||||
'password': {'type': 'string'},
|
||||
'ws_token': {'type': ['string', 'array'], 'items': {'type': 'string'}},
|
||||
'jwt_secret_key': {'type': 'string'},
|
||||
'CORS_origins': {'type': 'array', 'items': {'type': 'string'}},
|
||||
'verbosity': {'type': 'string', 'enum': ['error', 'info']},
|
||||
@@ -488,6 +490,47 @@ CONF_SCHEMA = {
|
||||
},
|
||||
'required': ['process_throttle_secs', 'allowed_risk']
|
||||
},
|
||||
'external_message_consumer': {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'enabled': {'type': 'boolean', 'default': False},
|
||||
'producers': {
|
||||
'type': 'array',
|
||||
'items': {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'name': {'type': 'string'},
|
||||
'host': {'type': 'string'},
|
||||
'port': {
|
||||
'type': 'integer',
|
||||
'default': 8080,
|
||||
'minimum': 0,
|
||||
'maximum': 65535
|
||||
},
|
||||
'ws_token': {'type': 'string'},
|
||||
},
|
||||
'required': ['name', 'host', 'ws_token']
|
||||
}
|
||||
},
|
||||
'wait_timeout': {'type': 'integer', 'minimum': 0},
|
||||
'sleep_time': {'type': 'integer', 'minimum': 0},
|
||||
'ping_timeout': {'type': 'integer', 'minimum': 0},
|
||||
'remove_entry_exit_signals': {'type': 'boolean', 'default': False},
|
||||
'initial_candle_limit': {
|
||||
'type': 'integer',
|
||||
'minimum': 0,
|
||||
'maximum': 1500,
|
||||
'default': 1500
|
||||
},
|
||||
'message_size_limit': { # In megabytes
|
||||
'type': 'integer',
|
||||
'minimum': 1,
|
||||
'maxmium': 20,
|
||||
'default': 8,
|
||||
}
|
||||
},
|
||||
'required': ['producers']
|
||||
},
|
||||
"freqai": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@@ -14,9 +14,10 @@ from pandas import DataFrame
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe
|
||||
from freqtrade.data.history import load_pair_history
|
||||
from freqtrade.enums import CandleType, RunMode
|
||||
from freqtrade.enums import CandleType, RPCMessageType, RunMode
|
||||
from freqtrade.exceptions import ExchangeError, OperationalException
|
||||
from freqtrade.exchange import Exchange, timeframe_to_seconds
|
||||
from freqtrade.rpc import RPCManager
|
||||
from freqtrade.util import PeriodicCache
|
||||
|
||||
|
||||
@@ -28,17 +29,33 @@ MAX_DATAFRAME_CANDLES = 1000
|
||||
|
||||
class DataProvider:
|
||||
|
||||
def __init__(self, config: Config, exchange: Optional[Exchange], pairlists=None) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
exchange: Optional[Exchange],
|
||||
pairlists=None,
|
||||
rpc: Optional[RPCManager] = None
|
||||
) -> None:
|
||||
self._config = config
|
||||
self._exchange = exchange
|
||||
self._pairlists = pairlists
|
||||
self.__rpc = rpc
|
||||
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
|
||||
self.__slice_index: Optional[int] = None
|
||||
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
|
||||
self.__producer_pairs_df: Dict[str,
|
||||
Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {}
|
||||
self.__producer_pairs: Dict[str, List[str]] = {}
|
||||
self._msg_queue: deque = deque()
|
||||
|
||||
self._default_candle_type = self._config.get('candle_type_def', CandleType.SPOT)
|
||||
self._default_timeframe = self._config.get('timeframe', '1h')
|
||||
|
||||
self.__msg_cache = PeriodicCache(
|
||||
maxsize=1000, ttl=timeframe_to_seconds(self._config.get('timeframe', '1h')))
|
||||
maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe))
|
||||
|
||||
self.producers = self._config.get('external_message_consumer', {}).get('producers', [])
|
||||
self.external_data_enabled = len(self.producers) > 0
|
||||
|
||||
def _set_dataframe_max_index(self, limit_index: int):
|
||||
"""
|
||||
@@ -63,9 +80,110 @@ class DataProvider:
|
||||
:param dataframe: analyzed dataframe
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
"""
|
||||
self.__cached_pairs[(pair, timeframe, candle_type)] = (
|
||||
pair_key = (pair, timeframe, candle_type)
|
||||
self.__cached_pairs[pair_key] = (
|
||||
dataframe, datetime.now(timezone.utc))
|
||||
|
||||
# For multiple producers we will want to merge the pairlists instead of overwriting
|
||||
def _set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"):
|
||||
"""
|
||||
Set the pairs received to later be used.
|
||||
|
||||
:param pairlist: List of pairs
|
||||
"""
|
||||
self.__producer_pairs[producer_name] = pairlist
|
||||
|
||||
def get_producer_pairs(self, producer_name: str = "default") -> List[str]:
|
||||
"""
|
||||
Get the pairs cached from the producer
|
||||
|
||||
:returns: List of pairs
|
||||
"""
|
||||
return self.__producer_pairs.get(producer_name, []).copy()
|
||||
|
||||
def _emit_df(
|
||||
self,
|
||||
pair_key: PairWithTimeframe,
|
||||
dataframe: DataFrame
|
||||
) -> None:
|
||||
"""
|
||||
Send this dataframe as an ANALYZED_DF message to RPC
|
||||
|
||||
:param pair_key: PairWithTimeframe tuple
|
||||
:param data: Tuple containing the DataFrame and the datetime it was cached
|
||||
"""
|
||||
if self.__rpc:
|
||||
self.__rpc.send_msg(
|
||||
{
|
||||
'type': RPCMessageType.ANALYZED_DF,
|
||||
'data': {
|
||||
'key': pair_key,
|
||||
'df': dataframe,
|
||||
'la': datetime.now(timezone.utc)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
def _add_external_df(
|
||||
self,
|
||||
pair: str,
|
||||
dataframe: DataFrame,
|
||||
last_analyzed: datetime,
|
||||
timeframe: str,
|
||||
candle_type: CandleType,
|
||||
producer_name: str = "default"
|
||||
) -> None:
|
||||
"""
|
||||
Add the pair data to this class from an external source.
|
||||
|
||||
:param pair: pair to get the data for
|
||||
:param timeframe: Timeframe to get data for
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
"""
|
||||
pair_key = (pair, timeframe, candle_type)
|
||||
|
||||
if producer_name not in self.__producer_pairs_df:
|
||||
self.__producer_pairs_df[producer_name] = {}
|
||||
|
||||
_last_analyzed = datetime.now(timezone.utc) if not last_analyzed else last_analyzed
|
||||
|
||||
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
|
||||
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
|
||||
|
||||
def get_producer_df(
|
||||
self,
|
||||
pair: str,
|
||||
timeframe: Optional[str] = None,
|
||||
candle_type: Optional[CandleType] = None,
|
||||
producer_name: str = "default"
|
||||
) -> Tuple[DataFrame, datetime]:
|
||||
"""
|
||||
Get the pair data from producers.
|
||||
|
||||
:param pair: pair to get the data for
|
||||
:param timeframe: Timeframe to get data for
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
:returns: Tuple of the DataFrame and last analyzed timestamp
|
||||
"""
|
||||
_timeframe = self._default_timeframe if not timeframe else timeframe
|
||||
_candle_type = self._default_candle_type if not candle_type else candle_type
|
||||
|
||||
pair_key = (pair, _timeframe, _candle_type)
|
||||
|
||||
# If we have no data from this Producer yet
|
||||
if producer_name not in self.__producer_pairs_df:
|
||||
# We don't have this data yet, return empty DataFrame and datetime (01-01-1970)
|
||||
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
|
||||
|
||||
# If we do have data from that Producer, but no data on this pair_key
|
||||
if pair_key not in self.__producer_pairs_df[producer_name]:
|
||||
# We don't have this data yet, return empty DataFrame and datetime (01-01-1970)
|
||||
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
|
||||
|
||||
# We have it, return this data
|
||||
df, la = self.__producer_pairs_df[producer_name][pair_key]
|
||||
return (df.copy(), la)
|
||||
|
||||
def add_pairlisthandler(self, pairlists) -> None:
|
||||
"""
|
||||
Allow adding pairlisthandler after initialization
|
||||
|
@@ -6,7 +6,7 @@ from freqtrade.enums.exittype import ExitType
|
||||
from freqtrade.enums.hyperoptstate import HyperoptState
|
||||
from freqtrade.enums.marginmode import MarginMode
|
||||
from freqtrade.enums.ordertypevalue import OrderTypeValues
|
||||
from freqtrade.enums.rpcmessagetype import RPCMessageType
|
||||
from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType
|
||||
from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode
|
||||
from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType
|
||||
from freqtrade.enums.state import State
|
||||
|
@@ -1,7 +1,7 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class RPCMessageType(Enum):
|
||||
class RPCMessageType(str, Enum):
|
||||
STATUS = 'status'
|
||||
WARNING = 'warning'
|
||||
STARTUP = 'startup'
|
||||
@@ -19,8 +19,19 @@ class RPCMessageType(Enum):
|
||||
|
||||
STRATEGY_MSG = 'strategy_msg'
|
||||
|
||||
WHITELIST = 'whitelist'
|
||||
ANALYZED_DF = 'analyzed_df'
|
||||
|
||||
def __repr__(self):
|
||||
return self.value
|
||||
|
||||
def __str__(self):
|
||||
return self.value
|
||||
|
||||
|
||||
# Enum for parsing requests from ws consumers
|
||||
class RPCRequestType(str, Enum):
|
||||
SUBSCRIBE = 'subscribe'
|
||||
|
||||
WHITELIST = 'whitelist'
|
||||
ANALYZED_DF = 'analyzed_df'
|
||||
|
@@ -29,6 +29,7 @@ from freqtrade.plugins.pairlistmanager import PairListManager
|
||||
from freqtrade.plugins.protectionmanager import ProtectionManager
|
||||
from freqtrade.resolvers import ExchangeResolver, StrategyResolver
|
||||
from freqtrade.rpc import RPCManager
|
||||
from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer
|
||||
from freqtrade.strategy.interface import IStrategy
|
||||
from freqtrade.strategy.strategy_wrapper import strategy_safe_wrapper
|
||||
from freqtrade.util import FtPrecise
|
||||
@@ -72,6 +73,8 @@ class FreqtradeBot(LoggingMixin):
|
||||
|
||||
PairLocks.timeframe = self.config['timeframe']
|
||||
|
||||
self.pairlists = PairListManager(self.exchange, self.config)
|
||||
|
||||
# RPC runs in separate threads, can start handling external commands just after
|
||||
# initialization, even before Freqtradebot has a chance to start its throttling,
|
||||
# so anything in the Freqtradebot instance should be ready (initialized), including
|
||||
@@ -79,9 +82,7 @@ class FreqtradeBot(LoggingMixin):
|
||||
# Keep this at the end of this initialization method.
|
||||
self.rpc: RPCManager = RPCManager(self)
|
||||
|
||||
self.pairlists = PairListManager(self.exchange, self.config)
|
||||
|
||||
self.dataprovider = DataProvider(self.config, self.exchange, self.pairlists)
|
||||
self.dataprovider = DataProvider(self.config, self.exchange, self.pairlists, self.rpc)
|
||||
|
||||
# Attach Dataprovider to strategy instance
|
||||
self.strategy.dp = self.dataprovider
|
||||
@@ -92,6 +93,10 @@ class FreqtradeBot(LoggingMixin):
|
||||
self.edge = Edge(self.config, self.exchange, self.strategy) if \
|
||||
self.config.get('edge', {}).get('enabled', False) else None
|
||||
|
||||
# Init ExternalMessageConsumer if enabled
|
||||
self.emc = ExternalMessageConsumer(self.config, self.dataprovider) if \
|
||||
self.config.get('external_message_consumer', {}).get('enabled', False) else None
|
||||
|
||||
self.active_pair_whitelist = self._refresh_active_whitelist()
|
||||
|
||||
# Set initial bot state from config
|
||||
@@ -151,9 +156,11 @@ class FreqtradeBot(LoggingMixin):
|
||||
finally:
|
||||
self.strategy.ft_bot_cleanup()
|
||||
|
||||
self.rpc.cleanup()
|
||||
Trade.commit()
|
||||
self.exchange.close()
|
||||
self.rpc.cleanup()
|
||||
if self.emc:
|
||||
self.emc.shutdown()
|
||||
Trade.commit()
|
||||
self.exchange.close()
|
||||
|
||||
def startup(self) -> None:
|
||||
"""
|
||||
@@ -254,6 +261,7 @@ class FreqtradeBot(LoggingMixin):
|
||||
pairs that have open trades.
|
||||
"""
|
||||
# Refresh whitelist
|
||||
_prev_whitelist = self.pairlists.whitelist
|
||||
self.pairlists.refresh_pairlist()
|
||||
_whitelist = self.pairlists.whitelist
|
||||
|
||||
@@ -266,6 +274,11 @@ class FreqtradeBot(LoggingMixin):
|
||||
# Extend active-pair whitelist with pairs of open trades
|
||||
# It ensures that candle (OHLCV) data are downloaded for open trades as well
|
||||
_whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist])
|
||||
|
||||
# Called last to include the included pairs
|
||||
if _prev_whitelist != _whitelist:
|
||||
self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist})
|
||||
|
||||
return _whitelist
|
||||
|
||||
def get_free_open_trades(self) -> int:
|
||||
|
@@ -10,9 +10,11 @@ from typing import Any, Iterator, List
|
||||
from typing.io import IO
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pandas
|
||||
import rapidjson
|
||||
|
||||
from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN
|
||||
from freqtrade.enums import SignalTagType, SignalType
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -249,3 +251,41 @@ def parse_db_uri_for_logging(uri: str):
|
||||
return uri
|
||||
pwd = parsed_db_uri.netloc.split(':')[1].split('@')[0]
|
||||
return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@')
|
||||
|
||||
|
||||
def dataframe_to_json(dataframe: pandas.DataFrame) -> str:
|
||||
"""
|
||||
Serialize a DataFrame for transmission over the wire using JSON
|
||||
:param dataframe: A pandas DataFrame
|
||||
:returns: A JSON string of the pandas DataFrame
|
||||
"""
|
||||
return dataframe.to_json(orient='split')
|
||||
|
||||
|
||||
def json_to_dataframe(data: str) -> pandas.DataFrame:
|
||||
"""
|
||||
Deserialize JSON into a DataFrame
|
||||
:param data: A JSON string
|
||||
:returns: A pandas DataFrame from the JSON string
|
||||
"""
|
||||
dataframe = pandas.read_json(data, orient='split')
|
||||
if 'date' in dataframe.columns:
|
||||
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True)
|
||||
|
||||
return dataframe
|
||||
|
||||
|
||||
def remove_entry_exit_signals(dataframe: pandas.DataFrame):
|
||||
"""
|
||||
Remove Entry and Exit signals from a DataFrame
|
||||
|
||||
:param dataframe: The DataFrame to remove signals from
|
||||
"""
|
||||
dataframe[SignalType.ENTER_LONG.value] = 0
|
||||
dataframe[SignalType.EXIT_LONG.value] = 0
|
||||
dataframe[SignalType.ENTER_SHORT.value] = 0
|
||||
dataframe[SignalType.EXIT_SHORT.value] = 0
|
||||
dataframe[SignalTagType.ENTER_TAG.value] = None
|
||||
dataframe[SignalTagType.EXIT_TAG.value] = None
|
||||
|
||||
return dataframe
|
||||
|
@@ -1,8 +1,10 @@
|
||||
import logging
|
||||
import secrets
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
import jwt
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from fastapi.security.http import HTTPBasic, HTTPBasicCredentials
|
||||
|
||||
@@ -10,6 +12,8 @@ from freqtrade.rpc.api_server.api_schemas import AccessAndRefreshToken, AccessTo
|
||||
from freqtrade.rpc.api_server.deps import get_api_config
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ALGORITHM = "HS256"
|
||||
|
||||
router_login = APIRouter()
|
||||
@@ -25,7 +29,7 @@ httpbasic = HTTPBasic(auto_error=False)
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
|
||||
|
||||
|
||||
def get_user_from_token(token, secret_key: str, token_type: str = "access"):
|
||||
def get_user_from_token(token, secret_key: str, token_type: str = "access") -> str:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Could not validate credentials",
|
||||
@@ -44,6 +48,45 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access"):
|
||||
return username
|
||||
|
||||
|
||||
# This should be reimplemented to better realign with the existing tools provided
|
||||
# by FastAPI regarding API Tokens
|
||||
# https://github.com/tiangolo/fastapi/blob/master/fastapi/security/api_key.py
|
||||
async def validate_ws_token(
|
||||
ws: WebSocket,
|
||||
ws_token: Union[str, None] = Query(default=None, alias="token"),
|
||||
api_config: Dict[str, Any] = Depends(get_api_config)
|
||||
):
|
||||
secret_ws_token = api_config.get('ws_token', None)
|
||||
secret_jwt_key = api_config.get('jwt_secret_key', 'super-secret')
|
||||
|
||||
# Check if ws_token is/in secret_ws_token
|
||||
if ws_token and secret_ws_token:
|
||||
is_valid_ws_token = False
|
||||
if isinstance(secret_ws_token, str):
|
||||
is_valid_ws_token = secrets.compare_digest(secret_ws_token, ws_token)
|
||||
elif isinstance(secret_ws_token, list):
|
||||
is_valid_ws_token = any([
|
||||
secrets.compare_digest(potential, ws_token)
|
||||
for potential in secret_ws_token
|
||||
])
|
||||
|
||||
if is_valid_ws_token:
|
||||
return ws_token
|
||||
|
||||
# Check if ws_token is a JWT
|
||||
try:
|
||||
user = get_user_from_token(ws_token, secret_jwt_key)
|
||||
return user
|
||||
# If the token is a jwt, and it's valid return the user
|
||||
except HTTPException:
|
||||
pass
|
||||
|
||||
# No checks passed, deny the connection
|
||||
logger.debug("Denying websocket request.")
|
||||
# If it doesn't match, close the websocket connection
|
||||
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
|
||||
|
||||
|
||||
def create_token(data: dict, secret_key: str, token_type: str = "access") -> str:
|
||||
to_encode = data.copy()
|
||||
if token_type == "access":
|
||||
|
@@ -38,7 +38,8 @@ logger = logging.getLogger(__name__)
|
||||
# 2.15: Add backtest history endpoints
|
||||
# 2.16: Additional daily metrics
|
||||
# 2.17: Forceentry - leverage, partial force_exit
|
||||
API_VERSION = 2.17
|
||||
# 2.20: Add websocket endpoints
|
||||
API_VERSION = 2.20
|
||||
|
||||
# Public API, requires no auth.
|
||||
router_public = APIRouter()
|
||||
|
140
freqtrade/rpc/api_server/api_ws.py
Normal file
140
freqtrade/rpc/api_server/api_ws.py
Normal file
@@ -0,0 +1,140 @@
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
|
||||
from fastapi import APIRouter, Depends, WebSocketDisconnect
|
||||
from fastapi.websockets import WebSocket, WebSocketState
|
||||
from pydantic import ValidationError
|
||||
|
||||
from freqtrade.enums import RPCMessageType, RPCRequestType
|
||||
from freqtrade.rpc.api_server.api_auth import validate_ws_token
|
||||
from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc
|
||||
from freqtrade.rpc.api_server.ws import WebSocketChannel
|
||||
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSMessageSchema,
|
||||
WSRequestSchema, WSWhitelistMessage)
|
||||
from freqtrade.rpc.rpc import RPC
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Private router, protected by API Key authentication
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
async def is_websocket_alive(ws: WebSocket) -> bool:
|
||||
"""
|
||||
Check if a FastAPI Websocket is still open
|
||||
"""
|
||||
if (
|
||||
ws.application_state == WebSocketState.CONNECTED and
|
||||
ws.client_state == WebSocketState.CONNECTED
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
async def _process_consumer_request(
|
||||
request: Dict[str, Any],
|
||||
channel: WebSocketChannel,
|
||||
rpc: RPC
|
||||
):
|
||||
"""
|
||||
Validate and handle a request from a websocket consumer
|
||||
"""
|
||||
# Validate the request, makes sure it matches the schema
|
||||
try:
|
||||
websocket_request = WSRequestSchema.parse_obj(request)
|
||||
except ValidationError as e:
|
||||
logger.error(f"Invalid request from {channel}: {e}")
|
||||
return
|
||||
|
||||
type, data = websocket_request.type, websocket_request.data
|
||||
response: WSMessageSchema
|
||||
|
||||
logger.debug(f"Request of type {type} from {channel}")
|
||||
|
||||
# If we have a request of type SUBSCRIBE, set the topics in this channel
|
||||
if type == RPCRequestType.SUBSCRIBE:
|
||||
# If the request is empty, do nothing
|
||||
if not data:
|
||||
return
|
||||
|
||||
# If all topics passed are a valid RPCMessageType, set subscriptions on channel
|
||||
if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
|
||||
channel.set_subscriptions(data)
|
||||
|
||||
# We don't send a response for subscriptions
|
||||
return
|
||||
|
||||
elif type == RPCRequestType.WHITELIST:
|
||||
# Get whitelist
|
||||
whitelist = rpc._ws_request_whitelist()
|
||||
|
||||
# Format response
|
||||
response = WSWhitelistMessage(data=whitelist)
|
||||
# Send it back
|
||||
await channel.send(response.dict(exclude_none=True))
|
||||
|
||||
elif type == RPCRequestType.ANALYZED_DF:
|
||||
limit = None
|
||||
|
||||
if data:
|
||||
# Limit the amount of candles per dataframe to 'limit' or 1500
|
||||
limit = max(data.get('limit', 1500), 1500)
|
||||
|
||||
# They requested the full historical analyzed dataframes
|
||||
analyzed_df = rpc._ws_request_analyzed_df(limit)
|
||||
|
||||
# For every dataframe, send as a separate message
|
||||
for _, message in analyzed_df.items():
|
||||
response = WSAnalyzedDFMessage(data=message)
|
||||
await channel.send(response.dict(exclude_none=True))
|
||||
|
||||
|
||||
@router.websocket("/message/ws")
|
||||
async def message_endpoint(
|
||||
ws: WebSocket,
|
||||
rpc: RPC = Depends(get_rpc),
|
||||
channel_manager=Depends(get_channel_manager),
|
||||
token: str = Depends(validate_ws_token)
|
||||
):
|
||||
"""
|
||||
Message WebSocket endpoint, facilitates sending RPC messages
|
||||
"""
|
||||
try:
|
||||
channel = await channel_manager.on_connect(ws)
|
||||
|
||||
if await is_websocket_alive(ws):
|
||||
|
||||
logger.info(f"Consumer connected - {channel}")
|
||||
|
||||
# Keep connection open until explicitly closed, and process requests
|
||||
try:
|
||||
while not channel.is_closed():
|
||||
request = await channel.recv()
|
||||
|
||||
# Process the request here
|
||||
await _process_consumer_request(request, channel, rpc)
|
||||
|
||||
except WebSocketDisconnect:
|
||||
# Handle client disconnects
|
||||
logger.info(f"Consumer disconnected - {channel}")
|
||||
await channel_manager.on_disconnect(ws)
|
||||
except Exception as e:
|
||||
logger.info(f"Consumer connection failed - {channel}")
|
||||
logger.exception(e)
|
||||
# Handle cases like -
|
||||
# RuntimeError('Cannot call "send" once a closed message has been sent')
|
||||
await channel_manager.on_disconnect(ws)
|
||||
|
||||
else:
|
||||
await ws.close()
|
||||
|
||||
except RuntimeError:
|
||||
# WebSocket was closed
|
||||
await channel_manager.on_disconnect(ws)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to serve - {ws.client}")
|
||||
# Log tracebacks to keep track of what errors are happening
|
||||
logger.exception(e)
|
||||
await channel_manager.on_disconnect(ws)
|
@@ -41,6 +41,10 @@ def get_exchange(config=Depends(get_config)):
|
||||
return ApiServer._exchange
|
||||
|
||||
|
||||
def get_channel_manager():
|
||||
return ApiServer._ws_channel_manager
|
||||
|
||||
|
||||
def is_webserver_mode(config=Depends(get_config)):
|
||||
if config['runmode'] != RunMode.WEBSERVER:
|
||||
raise RPCException('Bot is not in the correct state')
|
||||
|
@@ -1,16 +1,21 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from ipaddress import IPv4Address
|
||||
from threading import Thread
|
||||
from typing import Any, Dict
|
||||
|
||||
import orjson
|
||||
import uvicorn
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
# Look into alternatives
|
||||
from janus import Queue as ThreadedQueue
|
||||
from starlette.responses import JSONResponse
|
||||
|
||||
from freqtrade.constants import Config
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer
|
||||
from freqtrade.rpc.api_server.ws import ChannelManager
|
||||
from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler
|
||||
|
||||
|
||||
@@ -44,6 +49,10 @@ class ApiServer(RPCHandler):
|
||||
_config: Config = {}
|
||||
# Exchange - only available in webserver mode.
|
||||
_exchange = None
|
||||
# websocket message queue stuff
|
||||
_ws_channel_manager = None
|
||||
_ws_thread = None
|
||||
_ws_loop = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""
|
||||
@@ -61,17 +70,21 @@ class ApiServer(RPCHandler):
|
||||
return
|
||||
self._standalone: bool = standalone
|
||||
self._server = None
|
||||
self._ws_queue = None
|
||||
self._ws_background_task = None
|
||||
|
||||
ApiServer.__initialized = True
|
||||
|
||||
api_config = self._config['api_server']
|
||||
|
||||
ApiServer._ws_channel_manager = ChannelManager()
|
||||
|
||||
self.app = FastAPI(title="Freqtrade API",
|
||||
docs_url='/docs' if api_config.get('enable_openapi', False) else None,
|
||||
redoc_url=None,
|
||||
default_response_class=FTJSONResponse,
|
||||
)
|
||||
self.configure_app(self.app, self._config)
|
||||
|
||||
self.start_api()
|
||||
|
||||
def add_rpc_handler(self, rpc: RPC):
|
||||
@@ -93,6 +106,19 @@ class ApiServer(RPCHandler):
|
||||
logger.info("Stopping API Server")
|
||||
self._server.cleanup()
|
||||
|
||||
if self._ws_thread and self._ws_loop:
|
||||
logger.info("Stopping API Server background tasks")
|
||||
|
||||
if self._ws_background_task:
|
||||
# Cancel the queue task
|
||||
self._ws_background_task.cancel()
|
||||
|
||||
self._ws_thread.join()
|
||||
|
||||
self._ws_thread = None
|
||||
self._ws_loop = None
|
||||
self._ws_background_task = None
|
||||
|
||||
@classmethod
|
||||
def shutdown(cls):
|
||||
cls.__initialized = False
|
||||
@@ -102,7 +128,9 @@ class ApiServer(RPCHandler):
|
||||
cls._rpc = None
|
||||
|
||||
def send_msg(self, msg: Dict[str, str]) -> None:
|
||||
pass
|
||||
if self._ws_queue:
|
||||
sync_q = self._ws_queue.sync_q
|
||||
sync_q.put(msg)
|
||||
|
||||
def handle_rpc_exception(self, request, exc):
|
||||
logger.exception(f"API Error calling: {exc}")
|
||||
@@ -116,6 +144,7 @@ class ApiServer(RPCHandler):
|
||||
from freqtrade.rpc.api_server.api_backtest import router as api_backtest
|
||||
from freqtrade.rpc.api_server.api_v1 import router as api_v1
|
||||
from freqtrade.rpc.api_server.api_v1 import router_public as api_v1_public
|
||||
from freqtrade.rpc.api_server.api_ws import router as ws_router
|
||||
from freqtrade.rpc.api_server.web_ui import router_ui
|
||||
|
||||
app.include_router(api_v1_public, prefix="/api/v1")
|
||||
@@ -126,6 +155,7 @@ class ApiServer(RPCHandler):
|
||||
app.include_router(api_backtest, prefix="/api/v1",
|
||||
dependencies=[Depends(http_basic_or_jwt_token)],
|
||||
)
|
||||
app.include_router(ws_router, prefix="/api/v1")
|
||||
app.include_router(router_login, prefix="/api/v1", tags=["auth"])
|
||||
# UI Router MUST be last!
|
||||
app.include_router(router_ui, prefix='')
|
||||
@@ -140,6 +170,48 @@ class ApiServer(RPCHandler):
|
||||
|
||||
app.add_exception_handler(RPCException, self.handle_rpc_exception)
|
||||
|
||||
def start_message_queue(self):
|
||||
if self._ws_thread:
|
||||
return
|
||||
|
||||
# Create a new loop, as it'll be just for the background thread
|
||||
self._ws_loop = asyncio.new_event_loop()
|
||||
|
||||
# Start the thread
|
||||
self._ws_thread = Thread(target=self._ws_loop.run_forever)
|
||||
self._ws_thread.start()
|
||||
|
||||
# Finally, submit the coro to the thread
|
||||
self._ws_background_task = asyncio.run_coroutine_threadsafe(
|
||||
self._broadcast_queue_data(), loop=self._ws_loop)
|
||||
|
||||
async def _broadcast_queue_data(self):
|
||||
# Instantiate the queue in this coroutine so it's attached to our loop
|
||||
self._ws_queue = ThreadedQueue()
|
||||
async_queue = self._ws_queue.async_q
|
||||
|
||||
try:
|
||||
while True:
|
||||
logger.debug("Getting queue messages...")
|
||||
# Get data from queue
|
||||
message = await async_queue.get()
|
||||
logger.debug(f"Found message of type: {message.get('type')}")
|
||||
# Broadcast it
|
||||
await self._ws_channel_manager.broadcast(message)
|
||||
# Sleep, make this configurable?
|
||||
await asyncio.sleep(0.1)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# For testing, shouldn't happen when stable
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception happened in background task: {e}")
|
||||
|
||||
finally:
|
||||
# Disconnect channels and stop the loop on cancel
|
||||
await self._ws_channel_manager.disconnect_all()
|
||||
self._ws_loop.stop()
|
||||
|
||||
def start_api(self):
|
||||
"""
|
||||
Start API ... should be run in thread.
|
||||
@@ -177,6 +249,7 @@ class ApiServer(RPCHandler):
|
||||
if self._standalone:
|
||||
self._server.run()
|
||||
else:
|
||||
self.start_message_queue()
|
||||
self._server.run_in_thread()
|
||||
except Exception:
|
||||
logger.exception("Api server failed to start.")
|
||||
|
6
freqtrade/rpc/api_server/ws/__init__.py
Normal file
6
freqtrade/rpc/api_server/ws/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
# flake8: noqa: F401
|
||||
# isort: off
|
||||
from freqtrade.rpc.api_server.ws.types import WebSocketType
|
||||
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
|
||||
from freqtrade.rpc.api_server.ws.serializer import HybridJSONWebSocketSerializer
|
||||
from freqtrade.rpc.api_server.ws.channel import ChannelManager, WebSocketChannel
|
178
freqtrade/rpc/api_server/ws/channel.py
Normal file
178
freqtrade/rpc/api_server/ws/channel.py
Normal file
@@ -0,0 +1,178 @@
|
||||
import logging
|
||||
from threading import RLock
|
||||
from typing import List, Optional, Type
|
||||
from uuid import uuid4
|
||||
|
||||
from fastapi import WebSocket as FastAPIWebSocket
|
||||
|
||||
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
|
||||
from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer,
|
||||
WebSocketSerializer)
|
||||
from freqtrade.rpc.api_server.ws.types import WebSocketType
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebSocketChannel:
|
||||
"""
|
||||
Object to help facilitate managing a websocket connection
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
websocket: WebSocketType,
|
||||
channel_id: Optional[str] = None,
|
||||
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer
|
||||
):
|
||||
|
||||
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
|
||||
|
||||
# The WebSocket object
|
||||
self._websocket = WebSocketProxy(websocket)
|
||||
# The Serializing class for the WebSocket object
|
||||
self._serializer_cls = serializer_cls
|
||||
|
||||
self._subscriptions: List[str] = []
|
||||
|
||||
# Internal event to signify a closed websocket
|
||||
self._closed = False
|
||||
|
||||
# Wrap the WebSocket in the Serializing class
|
||||
self._wrapped_ws = self._serializer_cls(self._websocket)
|
||||
|
||||
def __repr__(self):
|
||||
return f"WebSocketChannel({self.channel_id}, {self.remote_addr})"
|
||||
|
||||
@property
|
||||
def remote_addr(self):
|
||||
return self._websocket.remote_addr
|
||||
|
||||
async def send(self, data):
|
||||
"""
|
||||
Send data on the wrapped websocket
|
||||
"""
|
||||
await self._wrapped_ws.send(data)
|
||||
|
||||
async def recv(self):
|
||||
"""
|
||||
Receive data on the wrapped websocket
|
||||
"""
|
||||
return await self._wrapped_ws.recv()
|
||||
|
||||
async def ping(self):
|
||||
"""
|
||||
Ping the websocket
|
||||
"""
|
||||
return await self._websocket.ping()
|
||||
|
||||
async def close(self):
|
||||
"""
|
||||
Close the WebSocketChannel
|
||||
"""
|
||||
|
||||
self._closed = True
|
||||
|
||||
def is_closed(self) -> bool:
|
||||
"""
|
||||
Closed flag
|
||||
"""
|
||||
return self._closed
|
||||
|
||||
def set_subscriptions(self, subscriptions: List[str] = []) -> None:
|
||||
"""
|
||||
Set which subscriptions this channel is subscribed to
|
||||
|
||||
:param subscriptions: List of subscriptions, List[str]
|
||||
"""
|
||||
self._subscriptions = subscriptions
|
||||
|
||||
def subscribed_to(self, message_type: str) -> bool:
|
||||
"""
|
||||
Check if this channel is subscribed to the message_type
|
||||
|
||||
:param message_type: The message type to check
|
||||
"""
|
||||
return message_type in self._subscriptions
|
||||
|
||||
|
||||
class ChannelManager:
|
||||
def __init__(self):
|
||||
self.channels = dict()
|
||||
self._lock = RLock() # Re-entrant Lock
|
||||
|
||||
async def on_connect(self, websocket: WebSocketType):
|
||||
"""
|
||||
Wrap websocket connection into Channel and add to list
|
||||
|
||||
:param websocket: The WebSocket object to attach to the Channel
|
||||
"""
|
||||
if isinstance(websocket, FastAPIWebSocket):
|
||||
try:
|
||||
await websocket.accept()
|
||||
except RuntimeError:
|
||||
# The connection was closed before we could accept it
|
||||
return
|
||||
|
||||
ws_channel = WebSocketChannel(websocket)
|
||||
|
||||
with self._lock:
|
||||
self.channels[websocket] = ws_channel
|
||||
|
||||
return ws_channel
|
||||
|
||||
async def on_disconnect(self, websocket: WebSocketType):
|
||||
"""
|
||||
Call close on the channel if it's not, and remove from channel list
|
||||
|
||||
:param websocket: The WebSocket objet attached to the Channel
|
||||
"""
|
||||
with self._lock:
|
||||
channel = self.channels.get(websocket)
|
||||
if channel:
|
||||
if not channel.is_closed():
|
||||
await channel.close()
|
||||
|
||||
del self.channels[websocket]
|
||||
|
||||
async def disconnect_all(self):
|
||||
"""
|
||||
Disconnect all Channels
|
||||
"""
|
||||
with self._lock:
|
||||
for websocket, channel in self.channels.items():
|
||||
if not channel.is_closed():
|
||||
await channel.close()
|
||||
|
||||
self.channels = dict()
|
||||
|
||||
async def broadcast(self, data):
|
||||
"""
|
||||
Broadcast data on all Channels
|
||||
|
||||
:param data: The data to send
|
||||
"""
|
||||
with self._lock:
|
||||
message_type = data.get('type')
|
||||
for websocket, channel in self.channels.items():
|
||||
try:
|
||||
if channel.subscribed_to(message_type):
|
||||
await channel.send(data)
|
||||
except RuntimeError:
|
||||
# Handle cannot send after close cases
|
||||
await self.on_disconnect(websocket)
|
||||
|
||||
async def send_direct(self, channel, data):
|
||||
"""
|
||||
Send data directly through direct_channel only
|
||||
|
||||
:param direct_channel: The WebSocketChannel object to send data through
|
||||
:param data: The data to send
|
||||
"""
|
||||
await channel.send(data)
|
||||
|
||||
def has_channels(self):
|
||||
"""
|
||||
Flag for more than 0 channels
|
||||
"""
|
||||
return len(self.channels) > 0
|
69
freqtrade/rpc/api_server/ws/proxy.py
Normal file
69
freqtrade/rpc/api_server/ws/proxy.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from typing import Any, Tuple, Union
|
||||
|
||||
from fastapi import WebSocket as FastAPIWebSocket
|
||||
from websockets.client import WebSocketClientProtocol as WebSocket
|
||||
|
||||
from freqtrade.rpc.api_server.ws.types import WebSocketType
|
||||
|
||||
|
||||
class WebSocketProxy:
|
||||
"""
|
||||
WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol
|
||||
under the same API
|
||||
"""
|
||||
|
||||
def __init__(self, websocket: WebSocketType):
|
||||
self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket
|
||||
|
||||
@property
|
||||
def remote_addr(self) -> Tuple[Any, ...]:
|
||||
if isinstance(self._websocket, WebSocket):
|
||||
return self._websocket.remote_address
|
||||
elif isinstance(self._websocket, FastAPIWebSocket):
|
||||
if self._websocket.client:
|
||||
client, port = self._websocket.client.host, self._websocket.client.port
|
||||
return (client, port)
|
||||
return ("unknown", 0)
|
||||
|
||||
async def send(self, data):
|
||||
"""
|
||||
Send data on the wrapped websocket
|
||||
"""
|
||||
if hasattr(self._websocket, "send_text"):
|
||||
await self._websocket.send_text(data)
|
||||
else:
|
||||
await self._websocket.send(data)
|
||||
|
||||
async def recv(self):
|
||||
"""
|
||||
Receive data on the wrapped websocket
|
||||
"""
|
||||
if hasattr(self._websocket, "receive_text"):
|
||||
return await self._websocket.receive_text()
|
||||
else:
|
||||
return await self._websocket.recv()
|
||||
|
||||
async def ping(self):
|
||||
"""
|
||||
Ping the websocket, not supported by FastAPI WebSockets
|
||||
"""
|
||||
if hasattr(self._websocket, "ping"):
|
||||
return await self._websocket.ping()
|
||||
return False
|
||||
|
||||
async def close(self, code: int = 1000):
|
||||
"""
|
||||
Close the websocket connection, only supported by FastAPI WebSockets
|
||||
"""
|
||||
if hasattr(self._websocket, "close"):
|
||||
try:
|
||||
return await self._websocket.close(code)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
async def accept(self):
|
||||
"""
|
||||
Accept the WebSocket connection, only support by FastAPI WebSockets
|
||||
"""
|
||||
if hasattr(self._websocket, "accept"):
|
||||
return await self._websocket.accept()
|
62
freqtrade/rpc/api_server/ws/serializer.py
Normal file
62
freqtrade/rpc/api_server/ws/serializer.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import orjson
|
||||
import rapidjson
|
||||
from pandas import DataFrame
|
||||
|
||||
from freqtrade.misc import dataframe_to_json, json_to_dataframe
|
||||
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebSocketSerializer(ABC):
|
||||
def __init__(self, websocket: WebSocketProxy):
|
||||
self._websocket: WebSocketProxy = websocket
|
||||
|
||||
@abstractmethod
|
||||
def _serialize(self, data):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def _deserialize(self, data):
|
||||
raise NotImplementedError()
|
||||
|
||||
async def send(self, data: bytes):
|
||||
await self._websocket.send(self._serialize(data))
|
||||
|
||||
async def recv(self) -> bytes:
|
||||
data = await self._websocket.recv()
|
||||
|
||||
return self._deserialize(data)
|
||||
|
||||
async def close(self, code: int = 1000):
|
||||
await self._websocket.close(code)
|
||||
|
||||
|
||||
class HybridJSONWebSocketSerializer(WebSocketSerializer):
|
||||
def _serialize(self, data) -> str:
|
||||
return str(orjson.dumps(data, default=_json_default), "utf-8")
|
||||
|
||||
def _deserialize(self, data: str):
|
||||
# RapidJSON expects strings
|
||||
return rapidjson.loads(data, object_hook=_json_object_hook)
|
||||
|
||||
|
||||
# Support serializing pandas DataFrames
|
||||
def _json_default(z):
|
||||
if isinstance(z, DataFrame):
|
||||
return {
|
||||
'__type__': 'dataframe',
|
||||
'__value__': dataframe_to_json(z)
|
||||
}
|
||||
raise TypeError
|
||||
|
||||
|
||||
# Support deserializing JSON to pandas DataFrames
|
||||
def _json_object_hook(z):
|
||||
if z.get('__type__') == 'dataframe':
|
||||
return json_to_dataframe(z.get('__value__'))
|
||||
return z
|
8
freqtrade/rpc/api_server/ws/types.py
Normal file
8
freqtrade/rpc/api_server/ws/types.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from typing import Any, Dict, TypeVar
|
||||
|
||||
from fastapi import WebSocket as FastAPIWebSocket
|
||||
from websockets.client import WebSocketClientProtocol as WebSocket
|
||||
|
||||
|
||||
WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket)
|
||||
MessageType = Dict[str, Any]
|
63
freqtrade/rpc/api_server/ws_schemas.py
Normal file
63
freqtrade/rpc/api_server/ws_schemas.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pandas import DataFrame
|
||||
from pydantic import BaseModel
|
||||
|
||||
from freqtrade.constants import PairWithTimeframe
|
||||
from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType
|
||||
|
||||
|
||||
class BaseArbitraryModel(BaseModel):
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class WSRequestSchema(BaseArbitraryModel):
|
||||
type: RPCRequestType
|
||||
data: Optional[Any] = None
|
||||
|
||||
|
||||
class WSMessageSchema(BaseArbitraryModel):
|
||||
type: RPCMessageType
|
||||
data: Optional[Any] = None
|
||||
|
||||
class Config:
|
||||
extra = 'allow'
|
||||
|
||||
|
||||
# ------------------------------ REQUEST SCHEMAS ----------------------------
|
||||
|
||||
|
||||
class WSSubscribeRequest(WSRequestSchema):
|
||||
type: RPCRequestType = RPCRequestType.SUBSCRIBE
|
||||
data: List[RPCMessageType]
|
||||
|
||||
|
||||
class WSWhitelistRequest(WSRequestSchema):
|
||||
type: RPCRequestType = RPCRequestType.WHITELIST
|
||||
data: None = None
|
||||
|
||||
|
||||
class WSAnalyzedDFRequest(WSRequestSchema):
|
||||
type: RPCRequestType = RPCRequestType.ANALYZED_DF
|
||||
data: Dict[str, Any] = {"limit": 1500}
|
||||
|
||||
|
||||
# ------------------------------ MESSAGE SCHEMAS ----------------------------
|
||||
|
||||
class WSWhitelistMessage(WSMessageSchema):
|
||||
type: RPCMessageType = RPCMessageType.WHITELIST
|
||||
data: List[str]
|
||||
|
||||
|
||||
class WSAnalyzedDFMessage(WSMessageSchema):
|
||||
class AnalyzedDFData(BaseArbitraryModel):
|
||||
key: PairWithTimeframe
|
||||
df: DataFrame
|
||||
la: datetime
|
||||
|
||||
type: RPCMessageType = RPCMessageType.ANALYZED_DF
|
||||
data: AnalyzedDFData
|
||||
|
||||
# --------------------------------------------------------------------------
|
341
freqtrade/rpc/external_message_consumer.py
Normal file
341
freqtrade/rpc/external_message_consumer.py
Normal file
@@ -0,0 +1,341 @@
|
||||
"""
|
||||
ExternalMessageConsumer module
|
||||
|
||||
Main purpose is to connect to external bot's message websocket to consume data
|
||||
from it
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
from threading import Thread
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List
|
||||
|
||||
import websockets
|
||||
from pydantic import ValidationError
|
||||
|
||||
from freqtrade.data.dataprovider import DataProvider
|
||||
from freqtrade.enums import RPCMessageType
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.misc import remove_entry_exit_signals
|
||||
from freqtrade.rpc.api_server.ws import WebSocketChannel
|
||||
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
|
||||
WSMessageSchema, WSRequestSchema,
|
||||
WSSubscribeRequest, WSWhitelistMessage,
|
||||
WSWhitelistRequest)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import websockets.connect
|
||||
import websockets.exceptions
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExternalMessageConsumer:
|
||||
"""
|
||||
The main controller class for consuming external messages from
|
||||
other freqtrade bot's
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Dict[str, Any],
|
||||
dataprovider: DataProvider
|
||||
):
|
||||
self._config = config
|
||||
self._dp = dataprovider
|
||||
|
||||
self._running = False
|
||||
self._thread = None
|
||||
self._loop = None
|
||||
self._main_task = None
|
||||
self._sub_tasks = None
|
||||
|
||||
self._emc_config = self._config.get('external_message_consumer', {})
|
||||
|
||||
self.enabled = self._emc_config.get('enabled', False)
|
||||
self.producers = self._emc_config.get('producers', [])
|
||||
|
||||
self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds
|
||||
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
|
||||
self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds
|
||||
|
||||
# The amount of candles per dataframe on the initial request
|
||||
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500)
|
||||
|
||||
# Message size limit, in megabytes. Default 8mb, Use bitwise operator << 20 to convert
|
||||
# as the websockets client expects bytes.
|
||||
self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20)
|
||||
|
||||
self.validate_config()
|
||||
|
||||
# Setting these explicitly as they probably shouldn't be changed by a user
|
||||
# Unless we somehow integrate this with the strategy to allow creating
|
||||
# callbacks for the messages
|
||||
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
|
||||
|
||||
# Allow setting data for each initial request
|
||||
self._initial_requests: List[WSRequestSchema] = [
|
||||
WSSubscribeRequest(data=self.topics),
|
||||
WSWhitelistRequest(),
|
||||
WSAnalyzedDFRequest()
|
||||
]
|
||||
|
||||
# Specify which function to use for which RPCMessageType
|
||||
self._message_handlers: Dict[str, Callable[[str, WSMessageSchema], None]] = {
|
||||
RPCMessageType.WHITELIST: self._consume_whitelist_message,
|
||||
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
|
||||
}
|
||||
|
||||
self.start()
|
||||
|
||||
def validate_config(self):
|
||||
"""
|
||||
Make sure values are what they are supposed to be
|
||||
"""
|
||||
if self.enabled and len(self.producers) < 1:
|
||||
raise OperationalException("You must specify at least 1 Producer to connect to.")
|
||||
|
||||
if self.enabled and self._config.get('process_only_new_candles', True):
|
||||
# Warning here or require it?
|
||||
logger.warning("To receive best performance with external data, "
|
||||
"please set `process_only_new_candles` to False")
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the main internal loop in another thread to run coroutines
|
||||
"""
|
||||
if self._thread and self._loop:
|
||||
return
|
||||
|
||||
logger.info("Starting ExternalMessageConsumer")
|
||||
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self._thread = Thread(target=self._loop.run_forever)
|
||||
self._running = True
|
||||
self._thread.start()
|
||||
|
||||
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Shutdown the loop, thread, and tasks
|
||||
"""
|
||||
if self._thread and self._loop:
|
||||
logger.info("Stopping ExternalMessageConsumer")
|
||||
self._running = False
|
||||
|
||||
if self._sub_tasks:
|
||||
# Cancel sub tasks
|
||||
for task in self._sub_tasks:
|
||||
task.cancel()
|
||||
|
||||
if self._main_task:
|
||||
# Cancel the main task
|
||||
self._main_task.cancel()
|
||||
|
||||
self._thread.join()
|
||||
|
||||
self._thread = None
|
||||
self._loop = None
|
||||
self._sub_tasks = None
|
||||
self._main_task = None
|
||||
|
||||
async def _main(self):
|
||||
"""
|
||||
The main task coroutine
|
||||
"""
|
||||
lock = asyncio.Lock()
|
||||
|
||||
try:
|
||||
# Create a connection to each producer
|
||||
self._sub_tasks = [
|
||||
self._loop.create_task(self._handle_producer_connection(producer, lock))
|
||||
for producer in self.producers
|
||||
]
|
||||
|
||||
await asyncio.gather(*self._sub_tasks)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
# Stop the loop once we are done
|
||||
self._loop.stop()
|
||||
|
||||
async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
|
||||
"""
|
||||
Main connection loop for the consumer
|
||||
|
||||
:param producer: Dictionary containing producer info
|
||||
:param lock: An asyncio Lock
|
||||
"""
|
||||
try:
|
||||
await self._create_connection(producer, lock)
|
||||
except asyncio.CancelledError:
|
||||
# Exit silently
|
||||
pass
|
||||
|
||||
async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
|
||||
"""
|
||||
Actually creates and handles the websocket connection, pinging on timeout
|
||||
and handling connection errors.
|
||||
|
||||
:param producer: Dictionary containing producer info
|
||||
:param lock: An asyncio Lock
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
host, port = producer['host'], producer['port']
|
||||
token = producer['ws_token']
|
||||
name = producer['name']
|
||||
ws_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}"
|
||||
|
||||
# This will raise InvalidURI if the url is bad
|
||||
async with websockets.connect(ws_url, max_size=self.message_size_limit) as ws:
|
||||
channel = WebSocketChannel(ws, channel_id=name)
|
||||
|
||||
logger.info(f"Producer connection success - {channel}")
|
||||
|
||||
# Now request the initial data from this Producer
|
||||
for request in self._initial_requests:
|
||||
await channel.send(
|
||||
request.dict(exclude_none=True)
|
||||
)
|
||||
|
||||
# Now receive data, if none is within the time limit, ping
|
||||
await self._receive_messages(channel, producer, lock)
|
||||
|
||||
except (websockets.exceptions.InvalidURI, ValueError) as e:
|
||||
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
|
||||
break
|
||||
|
||||
except (
|
||||
socket.gaierror,
|
||||
ConnectionRefusedError,
|
||||
websockets.exceptions.InvalidStatusCode,
|
||||
websockets.exceptions.InvalidMessage
|
||||
) as e:
|
||||
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
|
||||
continue
|
||||
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
# Successfully closed, just keep trying to connect again indefinitely
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
# An unforseen error has occurred, log and continue
|
||||
logger.error("Unexpected error has occurred:")
|
||||
logger.exception(e)
|
||||
continue
|
||||
|
||||
async def _receive_messages(
|
||||
self,
|
||||
channel: WebSocketChannel,
|
||||
producer: Dict[str, Any],
|
||||
lock: asyncio.Lock
|
||||
):
|
||||
"""
|
||||
Loop to handle receiving messages from a Producer
|
||||
|
||||
:param channel: The WebSocketChannel object for the WebSocket
|
||||
:param producer: Dictionary containing producer info
|
||||
:param lock: An asyncio Lock
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
channel.recv(),
|
||||
timeout=self.wait_timeout
|
||||
)
|
||||
|
||||
try:
|
||||
async with lock:
|
||||
# Handle the message
|
||||
self.handle_producer_message(producer, message)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error handling producer message: {e}")
|
||||
|
||||
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
|
||||
# We haven't received data yet. Check the connection and continue.
|
||||
try:
|
||||
# ping
|
||||
ping = await channel.ping()
|
||||
|
||||
await asyncio.wait_for(ping, timeout=self.ping_timeout)
|
||||
logger.debug(f"Connection to {channel} still alive...")
|
||||
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s")
|
||||
logger.debug(e, exc_info=e)
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
|
||||
break
|
||||
|
||||
def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]):
|
||||
"""
|
||||
Handles external messages from a Producer
|
||||
"""
|
||||
producer_name = producer.get('name', 'default')
|
||||
|
||||
try:
|
||||
producer_message = WSMessageSchema.parse_obj(message)
|
||||
except ValidationError as e:
|
||||
logger.error(f"Invalid message from `{producer_name}`: {e}")
|
||||
return
|
||||
|
||||
if not producer_message.data:
|
||||
logger.error(f"Empty message received from `{producer_name}`")
|
||||
return
|
||||
|
||||
logger.info(f"Received message of type `{producer_message.type}` from `{producer_name}`")
|
||||
|
||||
message_handler = self._message_handlers.get(producer_message.type)
|
||||
|
||||
if not message_handler:
|
||||
logger.info(f"Received unhandled message: `{producer_message.data}`, ignoring...")
|
||||
return
|
||||
|
||||
message_handler(producer_name, producer_message)
|
||||
|
||||
def _consume_whitelist_message(self, producer_name: str, message: WSMessageSchema):
|
||||
try:
|
||||
# Validate the message
|
||||
whitelist_message = WSWhitelistMessage.parse_obj(message)
|
||||
except ValidationError as e:
|
||||
logger.error(f"Invalid message from `{producer_name}`: {e}")
|
||||
return
|
||||
|
||||
# Add the pairlist data to the DataProvider
|
||||
self._dp._set_producer_pairs(whitelist_message.data, producer_name=producer_name)
|
||||
|
||||
logger.debug(f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`")
|
||||
|
||||
def _consume_analyzed_df_message(self, producer_name: str, message: WSMessageSchema):
|
||||
try:
|
||||
df_message = WSAnalyzedDFMessage.parse_obj(message)
|
||||
except ValidationError as e:
|
||||
logger.error(f"Invalid message from `{producer_name}`: {e}")
|
||||
return
|
||||
|
||||
key = df_message.data.key
|
||||
df = df_message.data.df
|
||||
la = df_message.data.la
|
||||
|
||||
pair, timeframe, candle_type = key
|
||||
|
||||
# If set, remove the Entry and Exit signals from the Producer
|
||||
if self._emc_config.get('remove_entry_exit_signals', False):
|
||||
df = remove_entry_exit_signals(df)
|
||||
|
||||
# Add the dataframe to the dataprovider
|
||||
self._dp._add_external_df(pair, df,
|
||||
last_analyzed=la,
|
||||
timeframe=timeframe,
|
||||
candle_type=candle_type,
|
||||
producer_name=producer_name)
|
||||
|
||||
logger.debug(
|
||||
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`")
|
@@ -1039,14 +1039,52 @@ class RPC:
|
||||
|
||||
def _rpc_analysed_dataframe(self, pair: str, timeframe: str,
|
||||
limit: Optional[int]) -> Dict[str, Any]:
|
||||
""" Analyzed dataframe in Dict form """
|
||||
|
||||
_data, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit)
|
||||
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'],
|
||||
pair, timeframe, _data, last_analyzed)
|
||||
|
||||
def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str,
|
||||
limit: Optional[int]) -> Tuple[DataFrame, datetime]:
|
||||
""" Get the dataframe and last analyze from the dataprovider """
|
||||
_data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe(
|
||||
pair, timeframe)
|
||||
_data = _data.copy()
|
||||
|
||||
if limit:
|
||||
_data = _data.iloc[-limit:]
|
||||
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'],
|
||||
pair, timeframe, _data, last_analyzed)
|
||||
return _data, last_analyzed
|
||||
|
||||
def _ws_all_analysed_dataframes(
|
||||
self,
|
||||
pairlist: List[str],
|
||||
limit: Optional[int]
|
||||
) -> Dict[str, Any]:
|
||||
""" Get the analysed dataframes of each pair in the pairlist """
|
||||
timeframe = self._freqtrade.config['timeframe']
|
||||
candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT)
|
||||
_data = {}
|
||||
|
||||
for pair in pairlist:
|
||||
dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit)
|
||||
|
||||
_data[pair] = {
|
||||
"key": (pair, timeframe, candle_type),
|
||||
"df": dataframe,
|
||||
"la": last_analyzed
|
||||
}
|
||||
|
||||
return _data
|
||||
|
||||
def _ws_request_analyzed_df(self, limit: Optional[int]):
|
||||
""" Historical Analyzed Dataframes for WebSocket """
|
||||
whitelist = self._freqtrade.active_pair_whitelist
|
||||
return self._ws_all_analysed_dataframes(whitelist, limit)
|
||||
|
||||
def _ws_request_whitelist(self):
|
||||
""" Whitelist data for WebSocket """
|
||||
return self._freqtrade.active_pair_whitelist
|
||||
|
||||
@staticmethod
|
||||
def _rpc_analysed_history_full(config, pair: str, timeframe: str,
|
||||
|
@@ -67,7 +67,8 @@ class RPCManager:
|
||||
'status': 'stopping bot'
|
||||
}
|
||||
"""
|
||||
logger.info('Sending rpc message: %s', msg)
|
||||
if msg.get('type') is not RPCMessageType.ANALYZED_DF:
|
||||
logger.info('Sending rpc message: %s', msg)
|
||||
if 'pair' in msg:
|
||||
msg.update({
|
||||
'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair'])
|
||||
|
@@ -16,6 +16,7 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RunMode, Sign
|
||||
SignalTagType, SignalType, TradingMode)
|
||||
from freqtrade.exceptions import OperationalException, StrategyError
|
||||
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds
|
||||
from freqtrade.misc import remove_entry_exit_signals
|
||||
from freqtrade.persistence import Order, PairLocks, Trade
|
||||
from freqtrade.strategy.hyper import HyperStrategyMixin
|
||||
from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators,
|
||||
@@ -742,20 +743,19 @@ class IStrategy(ABC, HyperStrategyMixin):
|
||||
# always run if process_only_new_candles is set to false
|
||||
if (not self.process_only_new_candles or
|
||||
self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]['date']):
|
||||
|
||||
# Defs that only make change on new candle data.
|
||||
dataframe = self.analyze_ticker(dataframe, metadata)
|
||||
|
||||
self._last_candle_seen_per_pair[pair] = dataframe.iloc[-1]['date']
|
||||
self.dp._set_cached_df(
|
||||
pair, self.timeframe, dataframe,
|
||||
candle_type=self.config.get('candle_type_def', CandleType.SPOT))
|
||||
|
||||
candle_type = self.config.get('candle_type_def', CandleType.SPOT)
|
||||
self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type)
|
||||
self.dp._emit_df((pair, self.timeframe, candle_type), dataframe)
|
||||
|
||||
else:
|
||||
logger.debug("Skipping TA Analysis for already analyzed candle")
|
||||
dataframe[SignalType.ENTER_LONG.value] = 0
|
||||
dataframe[SignalType.EXIT_LONG.value] = 0
|
||||
dataframe[SignalType.ENTER_SHORT.value] = 0
|
||||
dataframe[SignalType.EXIT_SHORT.value] = 0
|
||||
dataframe[SignalTagType.ENTER_TAG.value] = None
|
||||
dataframe[SignalTagType.EXIT_TAG.value] = None
|
||||
dataframe = remove_entry_exit_signals(dataframe)
|
||||
|
||||
logger.debug("Loop Analysis Launched")
|
||||
|
||||
|
@@ -67,6 +67,7 @@
|
||||
"verbosity": "error",
|
||||
"enable_openapi": false,
|
||||
"jwt_secret_key": "{{ api_server_jwt_key }}",
|
||||
"ws_token": "{{ api_server_ws_token }}",
|
||||
"CORS_origins": [],
|
||||
"username": "{{ api_server_username }}",
|
||||
"password": "{{ api_server_password }}"
|
||||
|
Reference in New Issue
Block a user