remove old constant, add initial_data requesting, minor changes

This commit is contained in:
Timothy Pogue 2022-09-01 20:06:36 -06:00
parent 57e9078727
commit 00f35f4870
10 changed files with 136 additions and 56 deletions

View File

@ -61,8 +61,6 @@ USERPATH_FREQAIMODELS = 'freqaimodels'
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
WAIT_DATA_POLICY_OPTIONS = ['none', 'first', 'all']
ENV_VAR_PREFIX = 'FREQTRADE__' ENV_VAR_PREFIX = 'FREQTRADE__'
NON_OPEN_EXCHANGE_STATES = ('cancelled', 'canceled', 'closed', 'expired') NON_OPEN_EXCHANGE_STATES = ('cancelled', 'canceled', 'closed', 'expired')
@ -404,7 +402,6 @@ CONF_SCHEMA = {
'username': {'type': 'string'}, 'username': {'type': 'string'},
'password': {'type': 'string'}, 'password': {'type': 'string'},
'ws_token': {'type': 'string'}, 'ws_token': {'type': 'string'},
'enable_message_ws': {'type': 'boolean', 'default': False},
'jwt_secret_key': {'type': 'string'}, 'jwt_secret_key': {'type': 'string'},
'CORS_origins': {'type': 'array', 'items': {'type': 'string'}}, 'CORS_origins': {'type': 'array', 'items': {'type': 'string'}},
'verbosity': {'type': 'string', 'enum': ['error', 'info']}, 'verbosity': {'type': 'string', 'enum': ['error', 'info']},

View File

@ -144,7 +144,6 @@ class DataProvider:
if producer_name not in self.__producer_pairs_df: if producer_name not in self.__producer_pairs_df:
self.__producer_pairs_df[producer_name] = {} self.__producer_pairs_df[producer_name] = {}
# For multiple leaders, if the data already exists, we'd merge
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, datetime.now(timezone.utc)) self.__producer_pairs_df[producer_name][pair_key] = (dataframe, datetime.now(timezone.utc))
def get_external_df( def get_external_df(

View File

@ -33,3 +33,4 @@ class RPCMessageType(str, Enum):
# Enum for parsing requests from ws consumers # Enum for parsing requests from ws consumers
class RPCRequestType(str, Enum): class RPCRequestType(str, Enum):
SUBSCRIBE = 'subscribe' SUBSCRIBE = 'subscribe'
INITIAL_DATA = 'initial_data'

View File

@ -203,8 +203,7 @@ class FreqtradeBot(LoggingMixin):
# Doesn't necessarily NEED to be this way, as maybe we'd like to broadcast # Doesn't necessarily NEED to be this way, as maybe we'd like to broadcast
# even if we are using external dataframes in the future. # even if we are using external dataframes in the future.
self.strategy.analyze(self.active_pair_whitelist, self.strategy.analyze(self.active_pair_whitelist)
emit_df=self.dataprovider.external_data_enabled)
with self._exit_lock: with self._exit_lock:
# Check for exchange cancelations, timeouts and user requested replace # Check for exchange cancelations, timeouts and user requested replace
@ -264,11 +263,10 @@ class FreqtradeBot(LoggingMixin):
pairs that have open trades. pairs that have open trades.
""" """
# Refresh whitelist # Refresh whitelist
_prev_whitelist = self.pairlists.whitelist
self.pairlists.refresh_pairlist() self.pairlists.refresh_pairlist()
_whitelist = self.pairlists.whitelist _whitelist = self.pairlists.whitelist
self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist})
# Calculating Edge positioning # Calculating Edge positioning
if self.edge: if self.edge:
self.edge.calculate(_whitelist) self.edge.calculate(_whitelist)
@ -279,6 +277,10 @@ class FreqtradeBot(LoggingMixin):
# It ensures that candle (OHLCV) data are downloaded for open trades as well # It ensures that candle (OHLCV) data are downloaded for open trades as well
_whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist]) _whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist])
# Called last to include the included pairs
if _prev_whitelist != _whitelist:
self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist})
return _whitelist return _whitelist
def get_free_open_trades(self) -> int: def get_free_open_trades(self) -> int:

View File

@ -4,9 +4,10 @@ from typing import Any, Dict
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from freqtrade.enums import RPCMessageType, RPCRequestType from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.rpc.api_server.deps import get_channel_manager from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
from freqtrade.rpc.api_server.ws.utils import is_websocket_alive from freqtrade.rpc.api_server.ws.utils import is_websocket_alive
from freqtrade.rpc.rpc import RPC
# from typing import Any, Dict # from typing import Any, Dict
@ -18,17 +19,20 @@ logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
# We are passed a Channel object, we can only do sync functions on that channel object async def _process_consumer_request(
def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel): request: Dict[str, Any],
channel: WebSocketChannel,
rpc: RPC
):
type, data = request.get('type'), request.get('data') type, data = request.get('type'), request.get('data')
# If we have a request of type SUBSCRIBE, set the topics in this channel
if type == RPCRequestType.SUBSCRIBE:
# If the request is empty, do nothing # If the request is empty, do nothing
if not data: if not data:
return return
# If we have a request of type SUBSCRIBE, set the topics in this channel if not isinstance(data, list):
if type == RPCRequestType.SUBSCRIBE:
if isinstance(data, list):
logger.error(f"Improper request from channel: {channel} - {request}") logger.error(f"Improper request from channel: {channel} - {request}")
return return
@ -38,11 +42,26 @@ def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel
logger.debug(f"{channel} subscribed to topics: {data}") logger.debug(f"{channel} subscribed to topics: {data}")
channel.set_subscriptions(data) channel.set_subscriptions(data)
elif type == RPCRequestType.INITIAL_DATA:
# Acquire the data
initial_data = rpc._ws_initial_data()
# We now loop over it sending it in pieces
whitelist_data, analyzed_df = initial_data.get('whitelist'), initial_data.get('analyzed_df')
if whitelist_data:
await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist_data})
if analyzed_df:
for pair, message in analyzed_df.items():
await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message})
@router.websocket("/message/ws") @router.websocket("/message/ws")
async def message_endpoint( async def message_endpoint(
ws: WebSocket, ws: WebSocket,
channel_manager=Depends(get_channel_manager) rpc: RPC = Depends(get_rpc),
channel_manager=Depends(get_channel_manager),
): ):
try: try:
if is_websocket_alive(ws): if is_websocket_alive(ws):
@ -59,7 +78,7 @@ async def message_endpoint(
# Process the request here. Should this be a method of RPC? # Process the request here. Should this be a method of RPC?
logger.info(f"Request: {request}") logger.info(f"Request: {request}")
_process_consumer_request(request, channel) await _process_consumer_request(request, channel, rpc)
except WebSocketDisconnect: except WebSocketDisconnect:
# Handle client disconnects # Handle client disconnects

View File

@ -151,8 +151,6 @@ class ApiServer(RPCHandler):
app.include_router(api_backtest, prefix="/api/v1", app.include_router(api_backtest, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token)], dependencies=[Depends(http_basic_or_jwt_token)],
) )
if self._config.get('api_server', {}).get('enable_message_ws', False):
logger.info("Enabling Message WebSocket")
app.include_router(ws_router, prefix="/api/v1", app.include_router(ws_router, prefix="/api/v1",
dependencies=[Depends(get_ws_token)] dependencies=[Depends(get_ws_token)]
) )

View File

@ -8,7 +8,7 @@ import asyncio
import logging import logging
import socket import socket
from threading import Thread from threading import Thread
from typing import Any, Dict from typing import Any, Dict, Optional
import websockets import websockets
@ -58,6 +58,11 @@ class ExternalMessageConsumer:
# callbacks for the messages # callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF] self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
self._message_handlers = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self.start() self.start()
def start(self): def start(self):
@ -152,6 +157,11 @@ class ExternalMessageConsumer:
self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics)
) )
# Now request the initial data from this Producer
await channel.send(
self.compose_consumer_request(RPCRequestType.INITIAL_DATA)
)
# Now receive data, if none is within the time limit, ping # Now receive data, if none is within the time limit, ping
while True: while True:
try: try:
@ -198,7 +208,11 @@ class ExternalMessageConsumer:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break break
def compose_consumer_request(self, type_: RPCRequestType, data: Any) -> Dict[str, Any]: def compose_consumer_request(
self,
type_: RPCRequestType,
data: Optional[Any] = None
) -> Dict[str, Any]:
""" """
Create a request for sending to a producer Create a request for sending to a producer
@ -208,8 +222,6 @@ class ExternalMessageConsumer:
""" """
return {'type': type_, 'data': data} return {'type': type_, 'data': data}
# How we do things here isn't set in stone. There seems to be some interest
# in figuring out a better way, but we shall do this for now.
def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]): def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]):
""" """
Handles external messages from a Producer Handles external messages from a Producer
@ -225,15 +237,29 @@ class ExternalMessageConsumer:
logger.debug(f"Received message of type {message_type}") logger.debug(f"Received message of type {message_type}")
# Handle Whitelists message_handler = self._message_handlers.get(message_type)
if message_type == RPCMessageType.WHITELIST:
pairlist = message_data if not message_handler:
logger.info(f"Received unhandled message: {message_data}, ignoring...")
return
message_handler(producer_name, message_data)
def _consume_whitelist_message(self, producer_name: str, message_data: Any):
# We expect List[str]
if not isinstance(message_data, list):
return
# Add the pairlist data to the DataProvider # Add the pairlist data to the DataProvider
self._dp.set_producer_pairs(pairlist, producer_name=producer_name) self._dp.set_producer_pairs(message_data, producer_name=producer_name)
logger.debug(f"Consumed message from {producer_name} of type RPCMessageType.WHITELIST")
def _consume_analyzed_df_message(self, producer_name: str, message_data: Any):
# We expect a Dict[str, Any]
if not isinstance(message_data, dict):
return
# Handle analyzed dataframes
elif message_type == RPCMessageType.ANALYZED_DF:
key, value = message_data.get('key'), message_data.get('value') key, value = message_data.get('key'), message_data.get('value')
if key and value: if key and value:
@ -249,3 +275,6 @@ class ExternalMessageConsumer:
# Add the dataframe to the dataprovider # Add the dataframe to the dataprovider
self._dp.add_external_df(pair, timeframe, dataframe, self._dp.add_external_df(pair, timeframe, dataframe,
candle_type, producer_name=producer_name) candle_type, producer_name=producer_name)
logger.debug(
f"Consumed message from {producer_name} of type RPCMessageType.ANALYZED_DF")

View File

@ -24,7 +24,7 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirecti
from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exceptions import ExchangeError, PricingError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs
from freqtrade.loggers import bufferHandler from freqtrade.loggers import bufferHandler
from freqtrade.misc import decimals_per_coin, shorten_date from freqtrade.misc import dataframe_to_json, decimals_per_coin, shorten_date
from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence import PairLocks, Trade
from freqtrade.persistence.models import PairLock from freqtrade.persistence.models import PairLock
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
@ -1035,14 +1035,49 @@ class RPC:
def _rpc_analysed_dataframe(self, pair: str, timeframe: str, def _rpc_analysed_dataframe(self, pair: str, timeframe: str,
limit: Optional[int]) -> Dict[str, Any]: limit: Optional[int]) -> Dict[str, Any]:
""" Analyzed dataframe in Dict form """
_data, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit)
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'],
pair, timeframe, _data, last_analyzed)
def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str,
limit: Optional[int]) -> Tuple[DataFrame, datetime]:
""" Get the dataframe and last analyze from the dataprovider """
_data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe(
pair, timeframe) pair, timeframe)
_data = _data.copy() _data = _data.copy()
if limit: if limit:
_data = _data.iloc[-limit:] _data = _data.iloc[-limit:]
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'], return _data, last_analyzed
pair, timeframe, _data, last_analyzed)
def _ws_all_analysed_dataframes(
self,
pairlist: List[str],
limit: Optional[int]
) -> Dict[str, Any]:
""" Get the analysed dataframes of each pair in the pairlist """
timeframe = self._freqtrade.config['timeframe']
candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT)
_data = {}
for pair in pairlist:
dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit)
_data[pair] = {
"key": (pair, timeframe, candle_type),
"value": dataframe_to_json(dataframe)
}
return _data
def _ws_initial_data(self):
""" Websocket friendly initial data, whitelists and all analyzed dataframes """
whitelist = self._freqtrade.active_pair_whitelist
# We only get the last 500 candles, should we remove the limit?
analyzed_df = self._ws_all_analysed_dataframes(whitelist, 500)
return {"whitelist": whitelist, "analyzed_df": analyzed_df}
@ staticmethod @ staticmethod
def _rpc_analysed_history_full(config, pair: str, timeframe: str, def _rpc_analysed_history_full(config, pair: str, timeframe: str,

View File

@ -81,6 +81,8 @@ class RPCManager:
# Removed actually showing the message because the logs would be # Removed actually showing the message because the logs would be
# completely spammed of the json dataframe # completely spammed of the json dataframe
logger.info('Sending rpc message of type: %s', msg.get('type')) logger.info('Sending rpc message of type: %s', msg.get('type'))
# Log actual message in debug?
# logger.debug(msg)
if 'pair' in msg: if 'pair' in msg:
msg.update({ msg.update({
'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair']) 'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair'])

View File

@ -725,8 +725,6 @@ class IStrategy(ABC, HyperStrategyMixin):
candle_type = self.config.get('candle_type_def', CandleType.SPOT) candle_type = self.config.get('candle_type_def', CandleType.SPOT)
self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type)
if emit_df:
self.dp.emit_df((pair, self.timeframe, candle_type), dataframe) self.dp.emit_df((pair, self.timeframe, candle_type), dataframe)
else: else: