diff --git a/.gitignore b/.gitignore index b8c4c3846..6a47a7f81 100644 --- a/.gitignore +++ b/.gitignore @@ -115,6 +115,3 @@ target/ !config_examples/config_freqai.example.json !config_examples/config_leader.example.json !config_examples/config_follower.example.json - -*-config.json -*.db* diff --git a/config_examples/config_follower.example.json b/config_examples/config_follower.example.json deleted file mode 100644 index 646310d9a..000000000 --- a/config_examples/config_follower.example.json +++ /dev/null @@ -1,87 +0,0 @@ - -{ - "db_url": "sqlite:///follower.db", - "strategy": "SampleStrategy", - "max_open_trades": 3, - "stake_currency": "USDT", - "stake_amount": 100, - "tradable_balance_ratio": 0.99, - "fiat_display_currency": "USD", - "dry_run": true, - "cancel_open_orders_on_exit": false, - "trading_mode": "spot", - "margin_mode": "", - "unfilledtimeout": { - "entry": 10, - "exit": 10, - "exit_timeout_count": 0, - "unit": "minutes" - }, - "entry_pricing": { - "price_side": "same", - "use_order_book": true, - "order_book_top": 1, - "price_last_balance": 0.0, - "check_depth_of_market": { - "enabled": false, - "bids_to_ask_delta": 1 - } - }, - "exit_pricing":{ - "price_side": "same", - "use_order_book": true, - "order_book_top": 1 - }, - "exchange": { - "name": "kucoin", - "key": "", - "secret": "", - "password": "", - "ccxt_config": {}, - "ccxt_async_config": {}, - "pair_whitelist": [ - ], - "pair_blacklist": [ - ] - }, - "pairlists": [ - { - "method": "ExternalPairList", // ExternalPairList is required in follower mode - "number_assets": 5, // We can limit the amount of pairs to use from the leaders - } - ], - "telegram": { - "enabled": false, - "token": "", - "chat_id": "" - }, - "api_server": { - "enabled": true, - "listen_ip_address": "127.0.0.1", - "listen_port": 8081, - "verbosity": "error", - "enable_openapi": false, - "jwt_secret_key": "fcc24d31d6581ad2c90c3fc438c8a8b2ccce1393126959934568707f0bd2d647", - "CORS_origins": [], - "username": "freqtrader", - "password": "testing123" - }, - "external_signal": { - "enabled": true, - "mode": "follower", - "leaders": [ - { - "url": "ws://localhost:8080/signals/ws", - "api_token": "testtoken" - } - ], - "wait_data_policy": "all", // ['all', 'first', none] defaults to all - "remove_signals_analyzed_df": true, // Remove entry/exit signals from Leader df, Defaults to false - }, - "bot_name": "freqtrade", - "initial_state": "running", - "force_entry_enable": false, - "internals": { - "process_throttle_secs": 5, - } -} diff --git a/config_examples/config_leader.example.json b/config_examples/config_leader.example.json deleted file mode 100644 index 5103fdbd4..000000000 --- a/config_examples/config_leader.example.json +++ /dev/null @@ -1,97 +0,0 @@ - -{ - "db_url": "sqlite:///leader.db", - "strategy": "SampleStrategy", - "max_open_trades": 3, - "stake_currency": "USDT", - "stake_amount": 100, - "tradable_balance_ratio": 0.99, - "fiat_display_currency": "USD", - "dry_run": true, - "cancel_open_orders_on_exit": false, - "trading_mode": "spot", - "margin_mode": "", - "unfilledtimeout": { - "entry": 10, - "exit": 10, - "exit_timeout_count": 0, - "unit": "minutes" - }, - "entry_pricing": { - "price_side": "same", - "use_order_book": true, - "order_book_top": 1, - "price_last_balance": 0.0, - "check_depth_of_market": { - "enabled": false, - "bids_to_ask_delta": 1 - } - }, - "exit_pricing":{ - "price_side": "same", - "use_order_book": true, - "order_book_top": 1 - }, - "exchange": { - "name": "kucoin", - "key": "", - "secret": "", - "password": "", - "ccxt_config": {}, - "ccxt_async_config": {}, - "pair_whitelist": [ - ], - "pair_blacklist": [ - ] - }, - "pairlists": [ - { - "method": "VolumePairList", - "number_assets": 20, - "sort_key": "quoteVolume", - "min_value": 0, - "refresh_period": 1800 - } - ], - "edge": { - "enabled": false, - "process_throttle_secs": 3600, - "calculate_since_number_of_days": 7, - "allowed_risk": 0.01, - "stoploss_range_min": -0.01, - "stoploss_range_max": -0.1, - "stoploss_range_step": -0.01, - "minimum_winrate": 0.60, - "minimum_expectancy": 0.20, - "min_trade_number": 10, - "max_trade_duration_minute": 1440, - "remove_pumps": false - }, - "telegram": { - "enabled": false, - "token": "", - "chat_id": "" - }, - "api_server": { - "enabled": true, - "listen_ip_address": "127.0.0.1", - "listen_port": 8080, - "verbosity": "error", - "enable_openapi": false, - "jwt_secret_key": "fcc24d31d6581ad2c90c3fc438c8a8b2ccce1393126959934568707f0bd2d647", - "CORS_origins": [], - "username": "freqtrader", - "password": "testing123" - }, - "external_signal": { - "enabled": true, - "mode": "leader", - "api_token": "testtoken", - }, - "bot_name": "freqtrade", - "initial_state": "running", - "force_entry_enable": false, - "internals": { - "process_throttle_secs": 5, - } -} diff --git a/freqtrade/constants.py b/freqtrade/constants.py index b1f189093..96f8413b0 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -404,6 +404,7 @@ CONF_SCHEMA = { }, 'username': {'type': 'string'}, 'password': {'type': 'string'}, + 'api_token': {'type': 'string'}, 'jwt_secret_key': {'type': 'string'}, 'CORS_origins': {'type': 'array', 'items': {'type': 'string'}}, 'verbosity': {'type': 'string', 'enum': ['error', 'info']}, diff --git a/freqtrade/enums/rpcmessagetype.py b/freqtrade/enums/rpcmessagetype.py index d5b3ce89c..8e4182b33 100644 --- a/freqtrade/enums/rpcmessagetype.py +++ b/freqtrade/enums/rpcmessagetype.py @@ -1,7 +1,8 @@ from enum import Enum -class RPCMessageType(Enum): +# We need to inherit from str so we can use as a str +class RPCMessageType(str, Enum): STATUS = 'status' WARNING = 'warning' STARTUP = 'startup' @@ -19,7 +20,7 @@ class RPCMessageType(Enum): STRATEGY_MSG = 'strategy_msg' - EMIT_DATA = 'emit_data' + WHITELIST = 'whitelist' def __repr__(self): return self.value diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 6aee3d104..c9caaace6 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -17,13 +17,13 @@ from freqtrade.constants import BuySell, LongShort from freqtrade.data.converter import order_book_to_dataframe from freqtrade.data.dataprovider import DataProvider from freqtrade.edge import Edge -from freqtrade.enums import (ExitCheckTuple, ExitType, LeaderMessageType, RPCMessageType, RunMode, - SignalDirection, State, TradingMode) +from freqtrade.enums import (ExitCheckTuple, ExitType, RPCMessageType, RunMode, SignalDirection, + State, TradingMode) from freqtrade.exceptions import (DependencyException, ExchangeError, InsufficientFundsError, InvalidOrderException, PricingError) from freqtrade.exchange import timeframe_to_minutes, timeframe_to_seconds from freqtrade.exchange.exchange import timeframe_to_next_date -from freqtrade.misc import dataframe_to_json, safe_value_fallback, safe_value_fallback2 +from freqtrade.misc import safe_value_fallback, safe_value_fallback2 from freqtrade.mixins import LoggingMixin from freqtrade.persistence import Order, PairLocks, Trade, init_db from freqtrade.plugins.pairlistmanager import PairListManager @@ -75,8 +75,6 @@ class FreqtradeBot(LoggingMixin): PairLocks.timeframe = self.config['timeframe'] - self.external_signal_controller = None - self.pairlists = PairListManager(self.exchange, self.config) # RPC runs in separate threads, can start handling external commands just after @@ -194,28 +192,7 @@ class FreqtradeBot(LoggingMixin): strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)() - if self.external_signal_controller: - if not self.external_signal_controller.is_leader(): - # Run Follower mode analyzing - leader_pairs = self.pairlists._whitelist - self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs) - else: - # We are leader, make sure to pass callback func to emit data - def emit_on_finish(pair, dataframe, timeframe, candle_type): - logger.debug(f"Emitting dataframe for {pair}") - return self.rpc.emit_data( - { - "data_type": LeaderMessageType.analyzed_df, - "data": { - "key": (pair, timeframe, candle_type), - "value": dataframe_to_json(dataframe) - } - } - ) - - self.strategy.analyze(self.active_pair_whitelist, finish_callback=emit_on_finish) - else: - self.strategy.analyze(self.active_pair_whitelist) + self.strategy.analyze(self.active_pair_whitelist) with self._exit_lock: # Check for exchange cancelations, timeouts and user requested replace @@ -278,15 +255,7 @@ class FreqtradeBot(LoggingMixin): self.pairlists.refresh_pairlist() _whitelist = self.pairlists.whitelist - # If external signal leader, broadcast whitelist data - # Should we broadcast before trade pairs are added? - - if self.external_signal_controller: - if self.external_signal_controller.is_leader(): - self.rpc.emit_data({ - "data_type": LeaderMessageType.pairlist, - "data": _whitelist - }) + self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'msg': _whitelist}) # Calculating Edge positioning if self.edge: diff --git a/freqtrade/rpc/api_server/api_auth.py b/freqtrade/rpc/api_server/api_auth.py index a39e31b85..fd90918e1 100644 --- a/freqtrade/rpc/api_server/api_auth.py +++ b/freqtrade/rpc/api_server/api_auth.py @@ -1,8 +1,10 @@ +import logging import secrets from datetime import datetime, timedelta +from typing import Any, Dict, Union import jwt -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, WebSocket, status from fastapi.security import OAuth2PasswordBearer from fastapi.security.http import HTTPBasic, HTTPBasicCredentials @@ -10,6 +12,8 @@ from freqtrade.rpc.api_server.api_schemas import AccessAndRefreshToken, AccessTo from freqtrade.rpc.api_server.deps import get_api_config +logger = logging.getLogger(__name__) + ALGORITHM = "HS256" router_login = APIRouter() @@ -44,6 +48,24 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access"): return username +# This should be reimplemented to better realign with the existing tools provided +# by FastAPI regarding API Tokens +async def get_ws_token( + ws: WebSocket, + token: Union[str, None] = None, + api_config: Dict[str, Any] = Depends(get_api_config) +): + secret_ws_token = api_config['ws_token'] + + if token == secret_ws_token: + # Just return the token if it matches + return token + else: + logger.debug("Denying websocket request") + # If it doesn't match, close the websocket connection + await ws.close(code=status.WS_1008_POLICY_VIOLATION) + + def create_token(data: dict, secret_key: str, token_type: str = "access") -> str: to_encode = data.copy() if token_type == "access": diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py new file mode 100644 index 000000000..464ea22b2 --- /dev/null +++ b/freqtrade/rpc/api_server/api_ws.py @@ -0,0 +1,52 @@ +import logging + +from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect + +from freqtrade.rpc.api_server.deps import get_channel_manager +from freqtrade.rpc.api_server.ws.utils import is_websocket_alive + + +logger = logging.getLogger(__name__) + +# Private router, protected by API Key authentication +router = APIRouter() + + +@router.websocket("/message/ws") +async def message_endpoint( + ws: WebSocket, + channel_manager=Depends(get_channel_manager) +): + try: + if is_websocket_alive(ws): + logger.info(f"Consumer connected - {ws.client}") + + # TODO: + # Return a channel ID, pass that instead of ws to the rest of the methods + channel = await channel_manager.on_connect(ws) + + # Keep connection open until explicitly closed, and sleep + try: + while not channel.is_closed(): + request = await channel.recv() + + # This is where we'd parse the request. For now this should only + # be a list of topics to subscribe too. List[str] + # Maybe allow the consumer to update the topics subscribed + # during runtime? + logger.info(f"Consumer request - {request}") + + except WebSocketDisconnect: + # Handle client disconnects + logger.info(f"Consumer disconnected - {ws.client}") + await channel_manager.on_disconnect(ws) + except Exception as e: + logger.info(f"Consumer connection failed - {ws.client}") + logger.exception(e) + # Handle cases like - + # RuntimeError('Cannot call "send" once a closed message has been sent') + await channel_manager.on_disconnect(ws) + + except Exception: + logger.error(f"Failed to serve - {ws.client}") + await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/deps.py b/freqtrade/rpc/api_server/deps.py index 66654c0b1..360771d77 100644 --- a/freqtrade/rpc/api_server/deps.py +++ b/freqtrade/rpc/api_server/deps.py @@ -41,6 +41,10 @@ def get_exchange(config=Depends(get_config)): return ApiServer._exchange +def get_channel_manager(): + return ApiServer._channel_manager + + def is_webserver_mode(config=Depends(get_config)): if config['runmode'] != RunMode.WEBSERVER: raise RPCException('Bot is not in the correct state') diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 049e7dbc2..94cb8cd45 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -1,15 +1,20 @@ +import asyncio import logging from ipaddress import IPv4Address +from threading import Thread from typing import Any, Dict import orjson import uvicorn from fastapi import Depends, FastAPI from fastapi.middleware.cors import CORSMiddleware +# Look into alternatives +from janus import Queue as ThreadedQueue from starlette.responses import JSONResponse from freqtrade.exceptions import OperationalException from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer +from freqtrade.rpc.api_server.ws.channel import ChannelManager from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler @@ -43,6 +48,10 @@ class ApiServer(RPCHandler): _config: Dict[str, Any] = {} # Exchange - only available in webserver mode. _exchange = None + # websocket message queue stuff + _channel_manager = None + _thread = None + _loop = None def __new__(cls, *args, **kwargs): """ @@ -64,10 +73,15 @@ class ApiServer(RPCHandler): return self._standalone: bool = standalone self._server = None + self._queue = None + self._background_task = None + ApiServer.__initialized = True api_config = self._config['api_server'] + ApiServer._channel_manager = ChannelManager() + self.app = FastAPI(title="Freqtrade API", docs_url='/docs' if api_config.get('enable_openapi', False) else None, redoc_url=None, @@ -95,6 +109,18 @@ class ApiServer(RPCHandler): logger.info("Stopping API Server") self._server.cleanup() + if self._thread and self._loop: + logger.info("Stopping API Server background tasks") + + if self._background_task: + # Cancel the queue task + self._background_task.cancel() + + # Finally stop the loop + self._loop.call_soon_threadsafe(self._loop.stop) + + self._thread.join() + @classmethod def shutdown(cls): cls.__initialized = False @@ -104,7 +130,10 @@ class ApiServer(RPCHandler): cls._rpc = None def send_msg(self, msg: Dict[str, str]) -> None: - pass + if self._queue: + logger.info(f"Adding message to queue: {msg}") + sync_q = self._queue.sync_q + sync_q.put(msg) def handle_rpc_exception(self, request, exc): logger.exception(f"API Error calling: {exc}") @@ -114,10 +143,12 @@ class ApiServer(RPCHandler): ) def configure_app(self, app: FastAPI, config): - from freqtrade.rpc.api_server.api_auth import http_basic_or_jwt_token, router_login + from freqtrade.rpc.api_server.api_auth import (get_ws_token, http_basic_or_jwt_token, + router_login) from freqtrade.rpc.api_server.api_backtest import router as api_backtest from freqtrade.rpc.api_server.api_v1 import router as api_v1 from freqtrade.rpc.api_server.api_v1 import router_public as api_v1_public + from freqtrade.rpc.api_server.api_ws import router as ws_router from freqtrade.rpc.api_server.web_ui import router_ui app.include_router(api_v1_public, prefix="/api/v1") @@ -128,6 +159,9 @@ class ApiServer(RPCHandler): app.include_router(api_backtest, prefix="/api/v1", dependencies=[Depends(http_basic_or_jwt_token)], ) + app.include_router(ws_router, prefix="/api/v1", + dependencies=[Depends(get_ws_token)] + ) app.include_router(router_login, prefix="/api/v1", tags=["auth"]) # UI Router MUST be last! app.include_router(router_ui, prefix='') @@ -142,6 +176,43 @@ class ApiServer(RPCHandler): app.add_exception_handler(RPCException, self.handle_rpc_exception) + def start_message_queue(self): + # Create a new loop, as it'll be just for the background thread + self._loop = asyncio.new_event_loop() + + # Start the thread + if not self._thread: + self._thread = Thread(target=self._loop.run_forever) + self._thread.start() + else: + raise RuntimeError("Threaded loop is already running") + + # Finally, submit the coro to the thread + self._background_task = asyncio.run_coroutine_threadsafe( + self._broadcast_queue_data(), loop=self._loop) + + async def _broadcast_queue_data(self): + # Instantiate the queue in this coroutine so it's attached to our loop + self._queue = ThreadedQueue() + async_queue = self._queue.async_q + + try: + while True: + logger.debug("Getting queue data...") + # Get data from queue + data = await async_queue.get() + logger.debug(f"Found data: {data}") + # Broadcast it + await self._channel_manager.broadcast(data) + # Sleep, make this configurable? + await asyncio.sleep(0.1) + except asyncio.CancelledError: + # Silently stop + pass + # For testing, shouldn't happen when stable + except Exception as e: + logger.info(f"Exception happened in background task: {e}") + def start_api(self): """ Start API ... should be run in thread. @@ -179,6 +250,7 @@ class ApiServer(RPCHandler): if self._standalone: self._server.run() else: + self.start_message_queue() self._server.run_in_thread() except Exception: logger.exception("Api server failed to start.") diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py new file mode 100644 index 000000000..486e8657b --- /dev/null +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -0,0 +1,146 @@ +import logging +from threading import RLock +from typing import Type + +from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy +from freqtrade.rpc.api_server.ws.serializer import ORJSONWebSocketSerializer, WebSocketSerializer +from freqtrade.rpc.api_server.ws.types import WebSocketType + + +logger = logging.getLogger(__name__) + + +class WebSocketChannel: + """ + Object to help facilitate managing a websocket connection + """ + + def __init__( + self, + websocket: WebSocketType, + serializer_cls: Type[WebSocketSerializer] = ORJSONWebSocketSerializer + ): + # The WebSocket object + self._websocket = WebSocketProxy(websocket) + # The Serializing class for the WebSocket object + self._serializer_cls = serializer_cls + + # Internal event to signify a closed websocket + self._closed = False + + # Wrap the WebSocket in the Serializing class + self._wrapped_ws = self._serializer_cls(self._websocket) + + async def send(self, data): + """ + Send data on the wrapped websocket + """ + # logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}") + await self._wrapped_ws.send(data) + + async def recv(self): + """ + Receive data on the wrapped websocket + """ + return await self._wrapped_ws.recv() + + async def ping(self): + """ + Ping the websocket + """ + return await self._websocket.ping() + + async def close(self): + """ + Close the WebSocketChannel + """ + + self._closed = True + + def is_closed(self): + return self._closed + + +class ChannelManager: + def __init__(self): + self.channels = dict() + self._lock = RLock() # Re-entrant Lock + + async def on_connect(self, websocket: WebSocketType): + """ + Wrap websocket connection into Channel and add to list + + :param websocket: The WebSocket object to attach to the Channel + """ + if hasattr(websocket, "accept"): + try: + await websocket.accept() + except RuntimeError: + # The connection was closed before we could accept it + return + + ws_channel = WebSocketChannel(websocket) + + with self._lock: + self.channels[websocket] = ws_channel + + return ws_channel + + async def on_disconnect(self, websocket: WebSocketType): + """ + Call close on the channel if it's not, and remove from channel list + + :param websocket: The WebSocket objet attached to the Channel + """ + with self._lock: + channel = self.channels.get(websocket) + if channel: + logger.debug(f"Disconnecting channel - {channel}") + + if not channel.is_closed(): + await channel.close() + + del self.channels[websocket] + + async def disconnect_all(self): + """ + Disconnect all Channels + """ + with self._lock: + for websocket, channel in self.channels.items(): + if not channel.is_closed(): + await channel.close() + + self.channels = dict() + + async def broadcast(self, data): + """ + Broadcast data on all Channels + + :param data: The data to send + """ + with self._lock: + logger.debug(f"Broadcasting data: {data}") + for websocket, channel in self.channels.items(): + try: + await channel.send(data) + except RuntimeError: + # Handle cannot send after close cases + await self.on_disconnect(websocket) + + async def send_direct(self, channel, data): + """ + Send data directly through direct_channel only + + :param direct_channel: The WebSocketChannel object to send data through + :param data: The data to send + """ + # We iterate over the channels to get reference to the websocket object + # so we can disconnect incase of failure + await channel.send(data) + + def has_channels(self): + """ + Flag for more than 0 channels + """ + return len(self.channels) > 0 diff --git a/freqtrade/rpc/api_server/ws/proxy.py b/freqtrade/rpc/api_server/ws/proxy.py new file mode 100644 index 000000000..6acc1d363 --- /dev/null +++ b/freqtrade/rpc/api_server/ws/proxy.py @@ -0,0 +1,61 @@ +from typing import Union + +from fastapi import WebSocket as FastAPIWebSocket +from websockets import WebSocketClientProtocol as WebSocket + +from freqtrade.rpc.api_server.ws.types import WebSocketType + + +class WebSocketProxy: + """ + WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol + under the same API + """ + + def __init__(self, websocket: WebSocketType): + self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket + + async def send(self, data): + """ + Send data on the wrapped websocket + """ + if isinstance(data, str): + data = data.encode() + + if hasattr(self._websocket, "send_bytes"): + await self._websocket.send_bytes(data) + else: + await self._websocket.send(data) + + async def recv(self): + """ + Receive data on the wrapped websocket + """ + if hasattr(self._websocket, "receive_bytes"): + return await self._websocket.receive_bytes() + else: + return await self._websocket.recv() + + async def ping(self): + """ + Ping the websocket, not supported by FastAPI WebSockets + """ + if hasattr(self._websocket, "ping"): + return await self._websocket.ping() + return False + + async def close(self, code: int = 1000): + """ + Close the websocket connection, only supported by FastAPI WebSockets + """ + if hasattr(self._websocket, "close"): + return await self._websocket.close(code) + pass + + async def accept(self): + """ + Accept the WebSocket connection, only support by FastAPI WebSockets + """ + if hasattr(self._websocket, "accept"): + return await self._websocket.accept() + pass diff --git a/freqtrade/rpc/api_server/ws/serializer.py b/freqtrade/rpc/api_server/ws/serializer.py new file mode 100644 index 000000000..40cbbfad7 --- /dev/null +++ b/freqtrade/rpc/api_server/ws/serializer.py @@ -0,0 +1,65 @@ +import json +import logging +from abc import ABC, abstractmethod + +import msgpack +import orjson + +from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy + + +logger = logging.getLogger(__name__) + + +class WebSocketSerializer(ABC): + def __init__(self, websocket: WebSocketProxy): + self._websocket: WebSocketProxy = websocket + + @abstractmethod + def _serialize(self, data): + raise NotImplementedError() + + @abstractmethod + def _deserialize(self, data): + raise NotImplementedError() + + async def send(self, data: bytes): + await self._websocket.send(self._serialize(data)) + + async def recv(self) -> bytes: + data = await self._websocket.recv() + + return self._deserialize(data) + + async def close(self, code: int = 1000): + await self._websocket.close(code) + +# Going to explore using MsgPack as the serialization, +# as that might be the best method for sending pandas +# dataframes over the wire + + +class JSONWebSocketSerializer(WebSocketSerializer): + def _serialize(self, data): + return json.dumps(data) + + def _deserialize(self, data): + return json.loads(data) + + +class ORJSONWebSocketSerializer(WebSocketSerializer): + ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY + + def _serialize(self, data): + return orjson.dumps(data, option=self.ORJSON_OPTIONS) + + def _deserialize(self, data): + return orjson.loads(data, option=self.ORJSON_OPTIONS) + + +class MsgPackWebSocketSerializer(WebSocketSerializer): + def _serialize(self, data): + return msgpack.packb(data, use_bin_type=True) + + def _deserialize(self, data): + return msgpack.unpackb(data, raw=False) diff --git a/freqtrade/rpc/api_server/ws/types.py b/freqtrade/rpc/api_server/ws/types.py new file mode 100644 index 000000000..814fe6649 --- /dev/null +++ b/freqtrade/rpc/api_server/ws/types.py @@ -0,0 +1,8 @@ +from typing import Any, Dict, TypeVar + +from fastapi import WebSocket as FastAPIWebSocket +from websockets import WebSocketClientProtocol as WebSocket + + +WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket) +MessageType = Dict[str, Any] diff --git a/freqtrade/rpc/api_server/ws/utils.py b/freqtrade/rpc/api_server/ws/utils.py new file mode 100644 index 000000000..1ceecab88 --- /dev/null +++ b/freqtrade/rpc/api_server/ws/utils.py @@ -0,0 +1,12 @@ +from fastapi import WebSocket +# fastapi does not make this available through it, so import directly from starlette +from starlette.websockets import WebSocketState + + +async def is_websocket_alive(ws: WebSocket) -> bool: + if ( + ws.application_state == WebSocketState.CONNECTED and + ws.client_state == WebSocketState.CONNECTED + ): + return True + return False diff --git a/freqtrade/rpc/external_signal/__init__.py b/freqtrade/rpc/external_signal/__init__.py index c1b05b3f0..decc51551 100644 --- a/freqtrade/rpc/external_signal/__init__.py +++ b/freqtrade/rpc/external_signal/__init__.py @@ -1,5 +1,5 @@ -# flake8: noqa: F401 -from freqtrade.rpc.external_signal.controller import ExternalSignalController - - -__all__ = ('ExternalSignalController') +# # flake8: noqa: F401 +# from freqtrade.rpc.external_signal.controller import ExternalSignalController +# +# +# __all__ = ('ExternalSignalController') diff --git a/freqtrade/rpc/external_signal/channel.py b/freqtrade/rpc/external_signal/channel.py index 4ccb2d864..5b278dfed 100644 --- a/freqtrade/rpc/external_signal/channel.py +++ b/freqtrade/rpc/external_signal/channel.py @@ -1,145 +1,145 @@ -import logging -from threading import RLock -from typing import Type - -from freqtrade.rpc.external_signal.proxy import WebSocketProxy -from freqtrade.rpc.external_signal.serializer import MsgPackWebSocketSerializer, WebSocketSerializer -from freqtrade.rpc.external_signal.types import WebSocketType - - -logger = logging.getLogger(__name__) - - -class WebSocketChannel: - """ - Object to help facilitate managing a websocket connection - """ - - def __init__( - self, - websocket: WebSocketType, - serializer_cls: Type[WebSocketSerializer] = MsgPackWebSocketSerializer - ): - # The WebSocket object - self._websocket = WebSocketProxy(websocket) - # The Serializing class for the WebSocket object - self._serializer_cls = serializer_cls - - # Internal event to signify a closed websocket - self._closed = False - - # Wrap the WebSocket in the Serializing class - self._wrapped_ws = self._serializer_cls(self._websocket) - - async def send(self, data): - """ - Send data on the wrapped websocket - """ - # logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}") - await self._wrapped_ws.send(data) - - async def recv(self): - """ - Receive data on the wrapped websocket - """ - return await self._wrapped_ws.recv() - - async def ping(self): - """ - Ping the websocket - """ - return await self._websocket.ping() - - async def close(self): - """ - Close the WebSocketChannel - """ - - self._closed = True - - def is_closed(self): - return self._closed - - -class ChannelManager: - def __init__(self): - self.channels = dict() - self._lock = RLock() # Re-entrant Lock - - async def on_connect(self, websocket: WebSocketType): - """ - Wrap websocket connection into Channel and add to list - - :param websocket: The WebSocket object to attach to the Channel - """ - if hasattr(websocket, "accept"): - try: - await websocket.accept() - except RuntimeError: - # The connection was closed before we could accept it - return - - ws_channel = WebSocketChannel(websocket) - - with self._lock: - self.channels[websocket] = ws_channel - - return ws_channel - - async def on_disconnect(self, websocket: WebSocketType): - """ - Call close on the channel if it's not, and remove from channel list - - :param websocket: The WebSocket objet attached to the Channel - """ - with self._lock: - channel = self.channels.get(websocket) - if channel: - logger.debug(f"Disconnecting channel - {channel}") - - if not channel.is_closed(): - await channel.close() - - del self.channels[websocket] - - async def disconnect_all(self): - """ - Disconnect all Channels - """ - with self._lock: - for websocket, channel in self.channels.items(): - if not channel.is_closed(): - await channel.close() - - self.channels = dict() - - async def broadcast(self, data): - """ - Broadcast data on all Channels - - :param data: The data to send - """ - with self._lock: - for websocket, channel in self.channels.items(): - try: - await channel.send(data) - except RuntimeError: - # Handle cannot send after close cases - await self.on_disconnect(websocket) - - async def send_direct(self, channel, data): - """ - Send data directly through direct_channel only - - :param direct_channel: The WebSocketChannel object to send data through - :param data: The data to send - """ - # We iterate over the channels to get reference to the websocket object - # so we can disconnect incase of failure - await channel.send(data) - - def has_channels(self): - """ - Flag for more than 0 channels - """ - return len(self.channels) > 0 +# import logging +# from threading import RLock +# from typing import Type +# +# from freqtrade.rpc.external_signal.proxy import WebSocketProxy +# from freqtrade.rpc.external_signal.serializer import MsgPackWebSocketSerializer +# from freqtrade.rpc.external_signal.types import WebSocketType +# +# +# logger = logging.getLogger(__name__) +# +# +# class WebSocketChannel: +# """ +# Object to help facilitate managing a websocket connection +# """ +# +# def __init__( +# self, +# websocket: WebSocketType, +# serializer_cls: Type[WebSocketSerializer] = MsgPackWebSocketSerializer +# ): +# # The WebSocket object +# self._websocket = WebSocketProxy(websocket) +# # The Serializing class for the WebSocket object +# self._serializer_cls = serializer_cls +# +# # Internal event to signify a closed websocket +# self._closed = False +# +# # Wrap the WebSocket in the Serializing class +# self._wrapped_ws = self._serializer_cls(self._websocket) +# +# async def send(self, data): +# """ +# Send data on the wrapped websocket +# """ +# # logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}") +# await self._wrapped_ws.send(data) +# +# async def recv(self): +# """ +# Receive data on the wrapped websocket +# """ +# return await self._wrapped_ws.recv() +# +# async def ping(self): +# """ +# Ping the websocket +# """ +# return await self._websocket.ping() +# +# async def close(self): +# """ +# Close the WebSocketChannel +# """ +# +# self._closed = True +# +# def is_closed(self): +# return self._closed +# +# +# class ChannelManager: +# def __init__(self): +# self.channels = dict() +# self._lock = RLock() # Re-entrant Lock +# +# async def on_connect(self, websocket: WebSocketType): +# """ +# Wrap websocket connection into Channel and add to list +# +# :param websocket: The WebSocket object to attach to the Channel +# """ +# if hasattr(websocket, "accept"): +# try: +# await websocket.accept() +# except RuntimeError: +# # The connection was closed before we could accept it +# return +# +# ws_channel = WebSocketChannel(websocket) +# +# with self._lock: +# self.channels[websocket] = ws_channel +# +# return ws_channel +# +# async def on_disconnect(self, websocket: WebSocketType): +# """ +# Call close on the channel if it's not, and remove from channel list +# +# :param websocket: The WebSocket objet attached to the Channel +# """ +# with self._lock: +# channel = self.channels.get(websocket) +# if channel: +# logger.debug(f"Disconnecting channel - {channel}") +# +# if not channel.is_closed(): +# await channel.close() +# +# del self.channels[websocket] +# +# async def disconnect_all(self): +# """ +# Disconnect all Channels +# """ +# with self._lock: +# for websocket, channel in self.channels.items(): +# if not channel.is_closed(): +# await channel.close() +# +# self.channels = dict() +# +# async def broadcast(self, data): +# """ +# Broadcast data on all Channels +# +# :param data: The data to send +# """ +# with self._lock: +# for websocket, channel in self.channels.items(): +# try: +# await channel.send(data) +# except RuntimeError: +# # Handle cannot send after close cases +# await self.on_disconnect(websocket) +# +# async def send_direct(self, channel, data): +# """ +# Send data directly through direct_channel only +# +# :param direct_channel: The WebSocketChannel object to send data through +# :param data: The data to send +# """ +# # We iterate over the channels to get reference to the websocket object +# # so we can disconnect incase of failure +# await channel.send(data) +# +# def has_channels(self): +# """ +# Flag for more than 0 channels +# """ +# return len(self.channels) > 0 diff --git a/freqtrade/rpc/external_signal/controller.py b/freqtrade/rpc/external_signal/controller.py index 2b29cde6f..616ea7801 100644 --- a/freqtrade/rpc/external_signal/controller.py +++ b/freqtrade/rpc/external_signal/controller.py @@ -1,449 +1,449 @@ -""" -This module manages replicate mode communication -""" -import asyncio -import logging -import secrets -import socket -from threading import Thread -from typing import Any, Callable, Coroutine, Dict, Union - -import websockets -from fastapi import Depends -from fastapi import WebSocket as FastAPIWebSocket -from fastapi import WebSocketDisconnect, status -from janus import Queue as ThreadedQueue - -from freqtrade.enums import ExternalSignalModeType, LeaderMessageType, RPCMessageType -from freqtrade.rpc import RPC, RPCHandler -from freqtrade.rpc.external_signal.channel import ChannelManager -from freqtrade.rpc.external_signal.types import MessageType -from freqtrade.rpc.external_signal.utils import is_websocket_alive - - -logger = logging.getLogger(__name__) - - -class ExternalSignalController(RPCHandler): - """ This class handles all websocket communication """ - - def __init__( - self, - rpc: RPC, - config: Dict[str, Any], - api_server: Union[Any, None] = None - ) -> None: - """ - Init the ExternalSignalController class, and init the super class RPCHandler - :param rpc: instance of RPC Helper class - :param config: Configuration object - :param api_server: The ApiServer object - :return: None - """ - super().__init__(rpc, config) - - self.freqtrade = rpc._freqtrade - self.api_server = api_server - - if not self.api_server: - raise RuntimeError("The API server must be enabled for external signals to work") - - self._loop = None - self._running = False - self._thread = None - self._queue = None - - self._main_task = None - self._sub_tasks = None - - self._message_handlers = { - LeaderMessageType.pairlist: self._rpc._handle_pairlist_message, - LeaderMessageType.analyzed_df: self._rpc._handle_analyzed_df_message, - LeaderMessageType.default: self._rpc._handle_default_message - } - - self.channel_manager = ChannelManager() - self.external_signal_config = config.get('external_signal', {}) - - # What the config should look like - # "external_signal": { - # "enabled": true, - # "mode": "follower", - # "leaders": [ - # { - # "url": "ws://localhost:8080/signals/ws", - # "api_token": "test" - # } - # ] - # } - - # "external_signal": { - # "enabled": true, - # "mode": "leader", - # "api_token": "test" - # } - - self.mode = ExternalSignalModeType[ - self.external_signal_config.get('mode', 'leader').lower() - ] - - self.leaders_list = self.external_signal_config.get('leaders', []) - self.push_throttle_secs = self.external_signal_config.get('push_throttle_secs', 0.1) - - self.reply_timeout = self.external_signal_config.get('follower_reply_timeout', 10) - self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2) - self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5) - - # Validate external_signal_config here? - - if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0: - raise ValueError("You must specify at least 1 leader in follower mode.") - - # This is only used by the leader, the followers use the tokens specified - # in each of the leaders - # If you do not specify an API key in the config, one will be randomly - # generated and logged on startup - default_api_key = secrets.token_urlsafe(16) - self.secret_api_key = self.external_signal_config.get('api_token', default_api_key) - - self.start() - - def is_leader(self): - """ - Leader flag - """ - return self.enabled() and self.mode == ExternalSignalModeType.leader - - def enabled(self): - """ - Enabled flag - """ - return self.external_signal_config.get('enabled', False) - - def num_leaders(self): - """ - The number of leaders we should be connected to - """ - return len(self.leaders_list) - - def start_threaded_loop(self): - """ - Start the main internal loop in another thread to run coroutines - """ - self._loop = asyncio.new_event_loop() - - if not self._thread: - self._thread = Thread(target=self._loop.run_forever) - self._thread.start() - self._running = True - else: - raise RuntimeError("A loop is already running") - - def submit_coroutine(self, coroutine: Coroutine): - """ - Submit a coroutine to the threaded loop - """ - if not self._running: - raise RuntimeError("Cannot schedule new futures after shutdown") - - if not self._loop or not self._loop.is_running(): - raise RuntimeError("Loop must be started before any function can" - " be submitted") - - return asyncio.run_coroutine_threadsafe(coroutine, self._loop) - - def start(self): - """ - Start the controller main loop - """ - self.start_threaded_loop() - self._main_task = self.submit_coroutine(self.main()) - - async def shutdown(self): - """ - Shutdown all tasks and close up - """ - logger.info("Stopping rpc.externalsignalcontroller") - - # Flip running flag - self._running = False - - # Cancel sub tasks - for task in self._sub_tasks: - task.cancel() - - # Then disconnect all channels - await self.channel_manager.disconnect_all() - - def cleanup(self) -> None: - """ - Cleanup pending module resources. - """ - if self._thread: - if self._loop.is_running(): - self._main_task.cancel() - self._thread.join() - - async def main(self): - """ - Main coro - - Start the loop based on what mode we're in - """ - try: - if self.mode == ExternalSignalModeType.leader: - logger.info("Starting rpc.externalsignalcontroller in Leader mode") - - await self.run_leader_mode() - elif self.mode == ExternalSignalModeType.follower: - logger.info("Starting rpc.externalsignalcontroller in Follower mode") - - await self.run_follower_mode() - - except asyncio.CancelledError: - # We're cancelled - await self.shutdown() - except Exception as e: - # Log the error - logger.error(f"Exception occurred in main task: {e}") - logger.exception(e) - finally: - # This coroutine is the last thing to be ended, so it should stop the loop - self._loop.stop() - - def log_api_token(self): - """ - Log the API token - """ - logger.info("-" * 15) - logger.info(f"API_KEY: {self.secret_api_key}") - logger.info("-" * 15) - - def send_msg(self, msg: MessageType) -> None: - """ - Support RPC calls - """ - if msg["type"] == RPCMessageType.EMIT_DATA: - message = msg.get("message") - if message: - self.send_message(message) - else: - logger.error(f"Message is empty! {msg}") - - def send_message(self, msg: MessageType) -> None: - """ - Broadcast message over all channels if there are any - """ - - if self.channel_manager.has_channels(): - self._send_message(msg) - else: - logger.debug("No listening followers, skipping...") - pass - - def _send_message(self, msg: MessageType): - """ - Add data to the internal queue to be broadcasted. This func will block - if the queue is full. This is meant to be called in the main thread. - """ - if self._queue: - queue = self._queue.sync_q - queue.put(msg) # This will block if the queue is full - else: - logger.warning("Can not send data, leader loop has not started yet!") - - async def send_initial_data(self, channel): - logger.info("Sending initial data through channel") - - data = self._rpc._initial_leader_data() - - for message in data: - await channel.send(message) - - async def _handle_leader_message(self, message: MessageType): - """ - Handle message received from a Leader - """ - type = message.get("data_type", LeaderMessageType.default) - data = message.get("data") - - handler: Callable = self._message_handlers[type] - handler(type, data) - - # ---------------------------------------------------------------------- - - async def run_leader_mode(self): - """ - Main leader coroutine - - This starts all of the leader coros and registers the endpoint on - the ApiServer - """ - self.register_leader_endpoint() - self.log_api_token() - - self._sub_tasks = [ - self._loop.create_task(self._broadcast_queue_data()) - ] - - return await asyncio.gather(*self._sub_tasks) - - async def run_follower_mode(self): - """ - Main follower coroutine - - This starts all of the follower connection coros - """ - - rpc_lock = asyncio.Lock() - - self._sub_tasks = [ - self._loop.create_task(self._handle_leader_connection(leader, rpc_lock)) - for leader in self.leaders_list - ] - - return await asyncio.gather(*self._sub_tasks) - - async def _broadcast_queue_data(self): - """ - Loop over queue data and broadcast it - """ - # Instantiate the queue in this coroutine so it's attached to our loop - self._queue = ThreadedQueue() - async_queue = self._queue.async_q - - try: - while self._running: - # Get data from queue - data = await async_queue.get() - - # Broadcast it to everyone - await self.channel_manager.broadcast(data) - - # Sleep - await asyncio.sleep(self.push_throttle_secs) - - except asyncio.CancelledError: - # Silently stop - pass - - async def get_api_token( - self, - websocket: FastAPIWebSocket, - token: Union[str, None] = None - ): - """ - Extract the API key from query param. Must match the - set secret_api_key or the websocket connection will be closed. - """ - if token == self.secret_api_key: - return token - else: - logger.info("Denying websocket request...") - await websocket.close(code=status.WS_1008_POLICY_VIOLATION) - - def register_leader_endpoint(self, path: str = "/signals/ws"): - """ - Attach and start the main leader loop to the ApiServer - - :param path: The endpoint path - """ - if not self.api_server: - raise RuntimeError("The leader needs the ApiServer to be active") - - # The endpoint function for running the main leader loop - @self.api_server.app.websocket(path) - async def leader_endpoint( - websocket: FastAPIWebSocket, - api_key: str = Depends(self.get_api_token) - ): - await self.leader_endpoint_loop(websocket) - - async def leader_endpoint_loop(self, websocket: FastAPIWebSocket): - """ - The WebSocket endpoint served by the ApiServer. This handles connections, - and adding them to the channel manager. - """ - try: - if is_websocket_alive(websocket): - logger.info(f"Follower connected - {websocket.client}") - channel = await self.channel_manager.on_connect(websocket) - - # Send initial data here - # Data is being broadcasted right away as soon as startup, - # we may not have to send initial data at all. Further testing - # required. - await self.send_initial_data(channel) - - # Keep connection open until explicitly closed, and sleep - try: - while not channel.is_closed(): - request = await channel.recv() - logger.info(f"Follower request - {request}") - - except WebSocketDisconnect: - # Handle client disconnects - logger.info(f"Follower disconnected - {websocket.client}") - await self.channel_manager.on_disconnect(websocket) - except Exception as e: - logger.info(f"Follower connection failed - {websocket.client}") - logger.exception(e) - # Handle cases like - - # RuntimeError('Cannot call "send" once a closed message has been sent') - await self.channel_manager.on_disconnect(websocket) - - except Exception: - logger.error(f"Failed to serve - {websocket.client}") - await self.channel_manager.on_disconnect(websocket) - - async def _handle_leader_connection(self, leader, lock): - """ - Given a leader, connect and wait on data. If connection is lost, - it will attempt to reconnect. - """ - try: - url, token = leader["url"], leader["api_token"] - websocket_url = f"{url}?token={token}" - - logger.info(f"Attempting to connect to Leader at: {url}") - while True: - try: - async with websockets.connect(websocket_url) as ws: - channel = await self.channel_manager.on_connect(ws) - logger.info(f"Connection to Leader at {url} successful") - while True: - try: - data = await asyncio.wait_for( - channel.recv(), - timeout=self.reply_timeout - ) - except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): - # We haven't received data yet. Check the connection and continue. - try: - # ping - ping = await channel.ping() - await asyncio.wait_for(ping, timeout=self.ping_timeout) - logger.debug(f"Connection to {url} still alive...") - continue - except Exception: - logger.info( - f"Ping error {url} - retrying in {self.sleep_time}s") - asyncio.sleep(self.sleep_time) - break - - async with lock: - # Acquire lock so only 1 coro handling at a time - # as we call the RPC module in the main thread - await self._handle_leader_message(data) - - except (socket.gaierror, ConnectionRefusedError): - logger.info(f"Connection Refused - retrying connection in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - continue - except websockets.exceptions.InvalidStatusCode as e: - logger.error(f"Connection Refused - {e}") - await asyncio.sleep(self.sleep_time) - continue - - except asyncio.CancelledError: - pass +# """ +# This module manages replicate mode communication +# """ +# import asyncio +# import logging +# import secrets +# import socket +# from threading import Thread +# from typing import Any, Callable, Coroutine, Dict, Union +# +# import websockets +# from fastapi import Depends +# from fastapi import WebSocket as FastAPIWebSocket +# from fastapi import WebSocketDisconnect, status +# from janus import Queue as ThreadedQueue +# +# from freqtrade.enums import ExternalSignalModeType, LeaderMessageType, RPCMessageType +# from freqtrade.rpc import RPC, RPCHandler +# from freqtrade.rpc.external_signal.channel import ChannelManager +# from freqtrade.rpc.external_signal.types import MessageType +# from freqtrade.rpc.external_signal.utils import is_websocket_alive +# +# +# logger = logging.getLogger(__name__) +# +# +# class ExternalSignalController(RPCHandler): +# """ This class handles all websocket communication """ +# +# def __init__( +# self, +# rpc: RPC, +# config: Dict[str, Any], +# api_server: Union[Any, None] = None +# ) -> None: +# """ +# Init the ExternalSignalController class, and init the super class RPCHandler +# :param rpc: instance of RPC Helper class +# :param config: Configuration object +# :param api_server: The ApiServer object +# :return: None +# """ +# super().__init__(rpc, config) +# +# self.freqtrade = rpc._freqtrade +# self.api_server = api_server +# +# if not self.api_server: +# raise RuntimeError("The API server must be enabled for external signals to work") +# +# self._loop = None +# self._running = False +# self._thread = None +# self._queue = None +# +# self._main_task = None +# self._sub_tasks = None +# +# self._message_handlers = { +# LeaderMessageType.pairlist: self._rpc._handle_pairlist_message, +# LeaderMessageType.analyzed_df: self._rpc._handle_analyzed_df_message, +# LeaderMessageType.default: self._rpc._handle_default_message +# } +# +# self.channel_manager = ChannelManager() +# self.external_signal_config = config.get('external_signal', {}) +# +# # What the config should look like +# # "external_signal": { +# # "enabled": true, +# # "mode": "follower", +# # "leaders": [ +# # { +# # "url": "ws://localhost:8080/signals/ws", +# # "api_token": "test" +# # } +# # ] +# # } +# +# # "external_signal": { +# # "enabled": true, +# # "mode": "leader", +# # "api_token": "test" +# # } +# +# self.mode = ExternalSignalModeType[ +# self.external_signal_config.get('mode', 'leader').lower() +# ] +# +# self.leaders_list = self.external_signal_config.get('leaders', []) +# self.push_throttle_secs = self.external_signal_config.get('push_throttle_secs', 0.1) +# +# self.reply_timeout = self.external_signal_config.get('follower_reply_timeout', 10) +# self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2) +# self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5) +# +# # Validate external_signal_config here? +# +# if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0: +# raise ValueError("You must specify at least 1 leader in follower mode.") +# +# # This is only used by the leader, the followers use the tokens specified +# # in each of the leaders +# # If you do not specify an API key in the config, one will be randomly +# # generated and logged on startup +# default_api_key = secrets.token_urlsafe(16) +# self.secret_api_key = self.external_signal_config.get('api_token', default_api_key) +# +# self.start() +# +# def is_leader(self): +# """ +# Leader flag +# """ +# return self.enabled() and self.mode == ExternalSignalModeType.leader +# +# def enabled(self): +# """ +# Enabled flag +# """ +# return self.external_signal_config.get('enabled', False) +# +# def num_leaders(self): +# """ +# The number of leaders we should be connected to +# """ +# return len(self.leaders_list) +# +# def start_threaded_loop(self): +# """ +# Start the main internal loop in another thread to run coroutines +# """ +# self._loop = asyncio.new_event_loop() +# +# if not self._thread: +# self._thread = Thread(target=self._loop.run_forever) +# self._thread.start() +# self._running = True +# else: +# raise RuntimeError("A loop is already running") +# +# def submit_coroutine(self, coroutine: Coroutine): +# """ +# Submit a coroutine to the threaded loop +# """ +# if not self._running: +# raise RuntimeError("Cannot schedule new futures after shutdown") +# +# if not self._loop or not self._loop.is_running(): +# raise RuntimeError("Loop must be started before any function can" +# " be submitted") +# +# return asyncio.run_coroutine_threadsafe(coroutine, self._loop) +# +# def start(self): +# """ +# Start the controller main loop +# """ +# self.start_threaded_loop() +# self._main_task = self.submit_coroutine(self.main()) +# +# async def shutdown(self): +# """ +# Shutdown all tasks and close up +# """ +# logger.info("Stopping rpc.externalsignalcontroller") +# +# # Flip running flag +# self._running = False +# +# # Cancel sub tasks +# for task in self._sub_tasks: +# task.cancel() +# +# # Then disconnect all channels +# await self.channel_manager.disconnect_all() +# +# def cleanup(self) -> None: +# """ +# Cleanup pending module resources. +# """ +# if self._thread: +# if self._loop.is_running(): +# self._main_task.cancel() +# self._thread.join() +# +# async def main(self): +# """ +# Main coro +# +# Start the loop based on what mode we're in +# """ +# try: +# if self.mode == ExternalSignalModeType.leader: +# logger.info("Starting rpc.externalsignalcontroller in Leader mode") +# +# await self.run_leader_mode() +# elif self.mode == ExternalSignalModeType.follower: +# logger.info("Starting rpc.externalsignalcontroller in Follower mode") +# +# await self.run_follower_mode() +# +# except asyncio.CancelledError: +# # We're cancelled +# await self.shutdown() +# except Exception as e: +# # Log the error +# logger.error(f"Exception occurred in main task: {e}") +# logger.exception(e) +# finally: +# # This coroutine is the last thing to be ended, so it should stop the loop +# self._loop.stop() +# +# def log_api_token(self): +# """ +# Log the API token +# """ +# logger.info("-" * 15) +# logger.info(f"API_KEY: {self.secret_api_key}") +# logger.info("-" * 15) +# +# def send_msg(self, msg: MessageType) -> None: +# """ +# Support RPC calls +# """ +# if msg["type"] == RPCMessageType.EMIT_DATA: +# message = msg.get("message") +# if message: +# self.send_message(message) +# else: +# logger.error(f"Message is empty! {msg}") +# +# def send_message(self, msg: MessageType) -> None: +# """ +# Broadcast message over all channels if there are any +# """ +# +# if self.channel_manager.has_channels(): +# self._send_message(msg) +# else: +# logger.debug("No listening followers, skipping...") +# pass +# +# def _send_message(self, msg: MessageType): +# """ +# Add data to the internal queue to be broadcasted. This func will block +# if the queue is full. This is meant to be called in the main thread. +# """ +# if self._queue: +# queue = self._queue.sync_q +# queue.put(msg) # This will block if the queue is full +# else: +# logger.warning("Can not send data, leader loop has not started yet!") +# +# async def send_initial_data(self, channel): +# logger.info("Sending initial data through channel") +# +# data = self._rpc._initial_leader_data() +# +# for message in data: +# await channel.send(message) +# +# async def _handle_leader_message(self, message: MessageType): +# """ +# Handle message received from a Leader +# """ +# type = message.get("data_type", LeaderMessageType.default) +# data = message.get("data") +# +# handler: Callable = self._message_handlers[type] +# handler(type, data) +# +# # ---------------------------------------------------------------------- +# +# async def run_leader_mode(self): +# """ +# Main leader coroutine +# +# This starts all of the leader coros and registers the endpoint on +# the ApiServer +# """ +# self.register_leader_endpoint() +# self.log_api_token() +# +# self._sub_tasks = [ +# self._loop.create_task(self._broadcast_queue_data()) +# ] +# +# return await asyncio.gather(*self._sub_tasks) +# +# async def run_follower_mode(self): +# """ +# Main follower coroutine +# +# This starts all of the follower connection coros +# """ +# +# rpc_lock = asyncio.Lock() +# +# self._sub_tasks = [ +# self._loop.create_task(self._handle_leader_connection(leader, rpc_lock)) +# for leader in self.leaders_list +# ] +# +# return await asyncio.gather(*self._sub_tasks) +# +# async def _broadcast_queue_data(self): +# """ +# Loop over queue data and broadcast it +# """ +# # Instantiate the queue in this coroutine so it's attached to our loop +# self._queue = ThreadedQueue() +# async_queue = self._queue.async_q +# +# try: +# while self._running: +# # Get data from queue +# data = await async_queue.get() +# +# # Broadcast it to everyone +# await self.channel_manager.broadcast(data) +# +# # Sleep +# await asyncio.sleep(self.push_throttle_secs) +# +# except asyncio.CancelledError: +# # Silently stop +# pass +# +# async def get_api_token( +# self, +# websocket: FastAPIWebSocket, +# token: Union[str, None] = None +# ): +# """ +# Extract the API key from query param. Must match the +# set secret_api_key or the websocket connection will be closed. +# """ +# if token == self.secret_api_key: +# return token +# else: +# logger.info("Denying websocket request...") +# await websocket.close(code=status.WS_1008_POLICY_VIOLATION) +# +# def register_leader_endpoint(self, path: str = "/signals/ws"): +# """ +# Attach and start the main leader loop to the ApiServer +# +# :param path: The endpoint path +# """ +# if not self.api_server: +# raise RuntimeError("The leader needs the ApiServer to be active") +# +# # The endpoint function for running the main leader loop +# @self.api_server.app.websocket(path) +# async def leader_endpoint( +# websocket: FastAPIWebSocket, +# api_key: str = Depends(self.get_api_token) +# ): +# await self.leader_endpoint_loop(websocket) +# +# async def leader_endpoint_loop(self, websocket: FastAPIWebSocket): +# """ +# The WebSocket endpoint served by the ApiServer. This handles connections, +# and adding them to the channel manager. +# """ +# try: +# if is_websocket_alive(websocket): +# logger.info(f"Follower connected - {websocket.client}") +# channel = await self.channel_manager.on_connect(websocket) +# +# # Send initial data here +# # Data is being broadcasted right away as soon as startup, +# # we may not have to send initial data at all. Further testing +# # required. +# await self.send_initial_data(channel) +# +# # Keep connection open until explicitly closed, and sleep +# try: +# while not channel.is_closed(): +# request = await channel.recv() +# logger.info(f"Follower request - {request}") +# +# except WebSocketDisconnect: +# # Handle client disconnects +# logger.info(f"Follower disconnected - {websocket.client}") +# await self.channel_manager.on_disconnect(websocket) +# except Exception as e: +# logger.info(f"Follower connection failed - {websocket.client}") +# logger.exception(e) +# # Handle cases like - +# # RuntimeError('Cannot call "send" once a closed message has been sent') +# await self.channel_manager.on_disconnect(websocket) +# +# except Exception: +# logger.error(f"Failed to serve - {websocket.client}") +# await self.channel_manager.on_disconnect(websocket) +# +# async def _handle_leader_connection(self, leader, lock): +# """ +# Given a leader, connect and wait on data. If connection is lost, +# it will attempt to reconnect. +# """ +# try: +# url, token = leader["url"], leader["api_token"] +# websocket_url = f"{url}?token={token}" +# +# logger.info(f"Attempting to connect to Leader at: {url}") +# while True: +# try: +# async with websockets.connect(websocket_url) as ws: +# channel = await self.channel_manager.on_connect(ws) +# logger.info(f"Connection to Leader at {url} successful") +# while True: +# try: +# data = await asyncio.wait_for( +# channel.recv(), +# timeout=self.reply_timeout +# ) +# except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): +# # We haven't received data yet. Check the connection and continue. +# try: +# # ping +# ping = await channel.ping() +# await asyncio.wait_for(ping, timeout=self.ping_timeout) +# logger.debug(f"Connection to {url} still alive...") +# continue +# except Exception: +# logger.info( +# f"Ping error {url} - retrying in {self.sleep_time}s") +# asyncio.sleep(self.sleep_time) +# break +# +# async with lock: +# # Acquire lock so only 1 coro handling at a time +# # as we call the RPC module in the main thread +# await self._handle_leader_message(data) +# +# except (socket.gaierror, ConnectionRefusedError): +# logger.info(f"Connection Refused - retrying connection in {self.sleep_time}s") +# await asyncio.sleep(self.sleep_time) +# continue +# except websockets.exceptions.InvalidStatusCode as e: +# logger.error(f"Connection Refused - {e}") +# await asyncio.sleep(self.sleep_time) +# continue +# +# except asyncio.CancelledError: +# pass diff --git a/freqtrade/rpc/external_signal/proxy.py b/freqtrade/rpc/external_signal/proxy.py index 36ff4a74e..df2a07da0 100644 --- a/freqtrade/rpc/external_signal/proxy.py +++ b/freqtrade/rpc/external_signal/proxy.py @@ -1,61 +1,61 @@ -from typing import Union - -from fastapi import WebSocket as FastAPIWebSocket -from websockets import WebSocketClientProtocol as WebSocket - -from freqtrade.rpc.external_signal.types import WebSocketType - - -class WebSocketProxy: - """ - WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol - under the same API - """ - - def __init__(self, websocket: WebSocketType): - self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket - - async def send(self, data): - """ - Send data on the wrapped websocket - """ - if isinstance(data, str): - data = data.encode() - - if hasattr(self._websocket, "send_bytes"): - await self._websocket.send_bytes(data) - else: - await self._websocket.send(data) - - async def recv(self): - """ - Receive data on the wrapped websocket - """ - if hasattr(self._websocket, "receive_bytes"): - return await self._websocket.receive_bytes() - else: - return await self._websocket.recv() - - async def ping(self): - """ - Ping the websocket, not supported by FastAPI WebSockets - """ - if hasattr(self._websocket, "ping"): - return await self._websocket.ping() - return False - - async def close(self, code: int = 1000): - """ - Close the websocket connection, only supported by FastAPI WebSockets - """ - if hasattr(self._websocket, "close"): - return await self._websocket.close(code) - pass - - async def accept(self): - """ - Accept the WebSocket connection, only support by FastAPI WebSockets - """ - if hasattr(self._websocket, "accept"): - return await self._websocket.accept() - pass +# from typing import Union +# +# from fastapi import WebSocket as FastAPIWebSocket +# from websockets import WebSocketClientProtocol as WebSocket +# +# from freqtrade.rpc.external_signal.types import WebSocketType +# +# +# class WebSocketProxy: +# """ +# WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol +# under the same API +# """ +# +# def __init__(self, websocket: WebSocketType): +# self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket +# +# async def send(self, data): +# """ +# Send data on the wrapped websocket +# """ +# if isinstance(data, str): +# data = data.encode() +# +# if hasattr(self._websocket, "send_bytes"): +# await self._websocket.send_bytes(data) +# else: +# await self._websocket.send(data) +# +# async def recv(self): +# """ +# Receive data on the wrapped websocket +# """ +# if hasattr(self._websocket, "receive_bytes"): +# return await self._websocket.receive_bytes() +# else: +# return await self._websocket.recv() +# +# async def ping(self): +# """ +# Ping the websocket, not supported by FastAPI WebSockets +# """ +# if hasattr(self._websocket, "ping"): +# return await self._websocket.ping() +# return False +# +# async def close(self, code: int = 1000): +# """ +# Close the websocket connection, only supported by FastAPI WebSockets +# """ +# if hasattr(self._websocket, "close"): +# return await self._websocket.close(code) +# pass +# +# async def accept(self): +# """ +# Accept the WebSocket connection, only support by FastAPI WebSockets +# """ +# if hasattr(self._websocket, "accept"): +# return await self._websocket.accept() +# pass diff --git a/freqtrade/rpc/external_signal/serializer.py b/freqtrade/rpc/external_signal/serializer.py index 2a0f53037..a23469ef4 100644 --- a/freqtrade/rpc/external_signal/serializer.py +++ b/freqtrade/rpc/external_signal/serializer.py @@ -1,65 +1,65 @@ -import json -import logging -from abc import ABC, abstractmethod - -import msgpack -import orjson - -from freqtrade.rpc.external_signal.proxy import WebSocketProxy - - -logger = logging.getLogger(__name__) - - -class WebSocketSerializer(ABC): - def __init__(self, websocket: WebSocketProxy): - self._websocket: WebSocketProxy = websocket - - @abstractmethod - def _serialize(self, data): - raise NotImplementedError() - - @abstractmethod - def _deserialize(self, data): - raise NotImplementedError() - - async def send(self, data: bytes): - await self._websocket.send(self._serialize(data)) - - async def recv(self) -> bytes: - data = await self._websocket.recv() - - return self._deserialize(data) - - async def close(self, code: int = 1000): - await self._websocket.close(code) - -# Going to explore using MsgPack as the serialization, -# as that might be the best method for sending pandas -# dataframes over the wire - - -class JSONWebSocketSerializer(WebSocketSerializer): - def _serialize(self, data): - return json.dumps(data) - - def _deserialize(self, data): - return json.loads(data) - - -class ORJSONWebSocketSerializer(WebSocketSerializer): - ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY - - def _serialize(self, data): - return orjson.dumps(data, option=self.ORJSON_OPTIONS) - - def _deserialize(self, data): - return orjson.loads(data, option=self.ORJSON_OPTIONS) - - -class MsgPackWebSocketSerializer(WebSocketSerializer): - def _serialize(self, data): - return msgpack.packb(data, use_bin_type=True) - - def _deserialize(self, data): - return msgpack.unpackb(data, raw=False) +# import json +# import logging +# from abc import ABC, abstractmethod +# +# import msgpack +# import orjson +# +# from freqtrade.rpc.external_signal.proxy import WebSocketProxy +# +# +# logger = logging.getLogger(__name__) +# +# +# class WebSocketSerializer(ABC): +# def __init__(self, websocket: WebSocketProxy): +# self._websocket: WebSocketProxy = websocket +# +# @abstractmethod +# def _serialize(self, data): +# raise NotImplementedError() +# +# @abstractmethod +# def _deserialize(self, data): +# raise NotImplementedError() +# +# async def send(self, data: bytes): +# await self._websocket.send(self._serialize(data)) +# +# async def recv(self) -> bytes: +# data = await self._websocket.recv() +# +# return self._deserialize(data) +# +# async def close(self, code: int = 1000): +# await self._websocket.close(code) +# +# # Going to explore using MsgPack as the serialization, +# # as that might be the best method for sending pandas +# # dataframes over the wire +# +# +# class JSONWebSocketSerializer(WebSocketSerializer): +# def _serialize(self, data): +# return json.dumps(data) +# +# def _deserialize(self, data): +# return json.loads(data) +# +# +# class ORJSONWebSocketSerializer(WebSocketSerializer): +# ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY +# +# def _serialize(self, data): +# return orjson.dumps(data, option=self.ORJSON_OPTIONS) +# +# def _deserialize(self, data): +# return orjson.loads(data, option=self.ORJSON_OPTIONS) +# +# +# class MsgPackWebSocketSerializer(WebSocketSerializer): +# def _serialize(self, data): +# return msgpack.packb(data, use_bin_type=True) +# +# def _deserialize(self, data): +# return msgpack.unpackb(data, raw=False) diff --git a/freqtrade/rpc/external_signal/types.py b/freqtrade/rpc/external_signal/types.py index 814fe6649..38e43f667 100644 --- a/freqtrade/rpc/external_signal/types.py +++ b/freqtrade/rpc/external_signal/types.py @@ -1,8 +1,8 @@ -from typing import Any, Dict, TypeVar - -from fastapi import WebSocket as FastAPIWebSocket -from websockets import WebSocketClientProtocol as WebSocket - - -WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket) -MessageType = Dict[str, Any] +# from typing import Any, Dict, TypeVar +# +# from fastapi import WebSocket as FastAPIWebSocket +# from websockets import WebSocketClientProtocol as WebSocket +# +# +# WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket) +# MessageType = Dict[str, Any] diff --git a/freqtrade/rpc/external_signal/utils.py b/freqtrade/rpc/external_signal/utils.py index e5469dce3..72c8d2ef8 100644 --- a/freqtrade/rpc/external_signal/utils.py +++ b/freqtrade/rpc/external_signal/utils.py @@ -1,22 +1,10 @@ -from pandas import DataFrame -from starlette.websockets import WebSocket, WebSocketState - -from freqtrade.enums.signaltype import SignalTagType, SignalType - - -async def is_websocket_alive(ws: WebSocket) -> bool: - if ( - ws.application_state == WebSocketState.CONNECTED and - ws.client_state == WebSocketState.CONNECTED - ): - return True - return False - - -def remove_entry_exit_signals(dataframe: DataFrame): - dataframe[SignalType.ENTER_LONG.value] = 0 - dataframe[SignalType.EXIT_LONG.value] = 0 - dataframe[SignalType.ENTER_SHORT.value] = 0 - dataframe[SignalType.EXIT_SHORT.value] = 0 - dataframe[SignalTagType.ENTER_TAG.value] = None - dataframe[SignalTagType.EXIT_TAG.value] = None +# from starlette.websockets import WebSocket, WebSocketState +# +# +# async def is_websocket_alive(ws: WebSocket) -> bool: +# if ( +# ws.application_state == WebSocketState.CONNECTED and +# ws.client_state == WebSocketState.CONNECTED +# ): +# return True +# return False diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 82d50f33c..3c7558158 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -19,13 +19,12 @@ from freqtrade.configuration.timerange import TimeRange from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT from freqtrade.data.history import load_data from freqtrade.data.metrics import calculate_max_drawdown -from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, LeaderMessageType, - SignalDirection, State, TradingMode) +from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, State, + TradingMode) from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler -from freqtrade.misc import (decimals_per_coin, json_to_dataframe, remove_entry_exit_signals, - shorten_date) +from freqtrade.misc import decimals_per_coin, shorten_date from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence.models import PairLock from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist @@ -1090,65 +1089,65 @@ class RPC: 'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT), 'last_process_ts': int(last_p.timestamp()), } - - # ------------------------------ EXTERNAL SIGNALS ----------------------- - - def _initial_leader_data(self): - # We create a list of Messages to send to the follower on connect - data = [] - - # Send Pairlist data - data.append({ - "data_type": LeaderMessageType.pairlist, - "data": self._freqtrade.pairlists._whitelist - }) - - return data - - def _handle_pairlist_message(self, type, data): - """ - Handles the emitted pairlists from the Leaders - - :param type: The data_type of the data - :param data: The data - """ - pairlist = data - - logger.debug(f"Handling Pairlist message: {pairlist}") - - external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0] - external_pairlist.add_pairlist_data(pairlist) - - def _handle_analyzed_df_message(self, type, data): - """ - Handles the analyzed dataframes from the Leaders - - :param type: The data_type of the data - :param data: The data - """ - key, value = data["key"], data["value"] - pair, timeframe, candle_type = key - - # Skip any pairs that we don't have in the pairlist? - # leader_pairlist = self._freqtrade.pairlists._whitelist - # if pair not in leader_pairlist: - # return - - dataframe = json_to_dataframe(value) - - if self._config.get('external_signal', {}).get('remove_signals_analyzed_df', False): - dataframe = remove_entry_exit_signals(dataframe) - - logger.debug(f"Handling analyzed dataframe for {pair}") - logger.debug(dataframe.tail()) - - # Add the dataframe to the dataprovider - dataprovider = self._freqtrade.dataprovider - dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) - - def _handle_default_message(self, type, data): - """ - Default leader message handler, just logs it. We should never have to - run this unless the leader sends us some weird message. - """ - logger.debug(f"Received message from Leader of type {type}: {data}") + # + # # ------------------------------ EXTERNAL SIGNALS ----------------------- + # + # def _initial_leader_data(self): + # # We create a list of Messages to send to the follower on connect + # data = [] + # + # # Send Pairlist data + # data.append({ + # "data_type": LeaderMessageType.pairlist, + # "data": self._freqtrade.pairlists._whitelist + # }) + # + # return data + # + # def _handle_pairlist_message(self, type, data): + # """ + # Handles the emitted pairlists from the Leaders + # + # :param type: The data_type of the data + # :param data: The data + # """ + # pairlist = data + # + # logger.debug(f"Handling Pairlist message: {pairlist}") + # + # external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0] + # external_pairlist.add_pairlist_data(pairlist) + # + # def _handle_analyzed_df_message(self, type, data): + # """ + # Handles the analyzed dataframes from the Leaders + # + # :param type: The data_type of the data + # :param data: The data + # """ + # key, value = data["key"], data["value"] + # pair, timeframe, candle_type = key + # + # # Skip any pairs that we don't have in the pairlist? + # # leader_pairlist = self._freqtrade.pairlists._whitelist + # # if pair not in leader_pairlist: + # # return + # + # dataframe = json_to_dataframe(value) + # + # if self._config.get('external_signal', {}).get('remove_signals_analyzed_df', False): + # dataframe = remove_entry_exit_signals(dataframe) + # + # logger.debug(f"Handling analyzed dataframe for {pair}") + # logger.debug(dataframe.tail()) + # + # # Add the dataframe to the dataprovider + # dataprovider = self._freqtrade.dataprovider + # dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) + # + # def _handle_default_message(self, type, data): + # """ + # Default leader message handler, just logs it. We should never have to + # run this unless the leader sends us some weird message. + # """ + # logger.debug(f"Received message from Leader of type {type}: {data}") diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index 11e21da6f..c5e93e3b4 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -51,14 +51,14 @@ class RPCManager: # Enable External Signals mode # For this to be enabled, the API server must also be enabled - if config.get('external_signal', {}).get('enabled', False): - logger.info('Enabling RPC.ExternalSignalController') - from freqtrade.rpc.external_signal import ExternalSignalController - external_signals = ExternalSignalController(self._rpc, config, apiserver) - self.registered_modules.append(external_signals) - - # Attach the controller to FreqTrade - freqtrade.external_signal_controller = external_signals + # if config.get('external_signal', {}).get('enabled', False): + # logger.info('Enabling RPC.ExternalSignalController') + # from freqtrade.rpc.external_signal import ExternalSignalController + # external_signals = ExternalSignalController(self._rpc, config, apiserver) + # self.registered_modules.append(external_signals) + # + # # Attach the controller to FreqTrade + # freqtrade.external_signal_controller = external_signals def cleanup(self) -> None: """ Stops all enabled rpc modules """ @@ -78,8 +78,7 @@ class RPCManager: 'status': 'stopping bot' } """ - if msg.get("type") != RPCMessageType.EMIT_DATA: - logger.info('Sending rpc message: %s', msg) + logger.info('Sending rpc message: %s', msg) if 'pair' in msg: msg.update({ 'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair']) @@ -138,12 +137,3 @@ class RPCManager: 'type': RPCMessageType.STARTUP, 'status': f'Using Protections: \n{prots}' }) - - def emit_data(self, data: Dict[str, Any]): - """ - Send a message via RPC with type RPCMessageType.EMIT_DATA - """ - self.send_msg({ - "type": RPCMessageType.EMIT_DATA, - "message": data - }) diff --git a/scripts/test_ws_client.py b/scripts/test_ws_client.py new file mode 100644 index 000000000..caa495a19 --- /dev/null +++ b/scripts/test_ws_client.py @@ -0,0 +1,58 @@ +import asyncio +import logging +import socket + +import websockets + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +async def _client(): + try: + while True: + try: + url = "ws://localhost:8080/api/v1/message/ws?token=testtoken" + async with websockets.connect(url) as ws: + logger.info("Connection successful") + while True: + try: + data = await asyncio.wait_for( + ws.recv(), + timeout=5 + ) + logger.info(f"Data received - {data}") + except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): + # We haven't received data yet. Check the connection and continue. + try: + # ping + ping = await ws.ping() + await asyncio.wait_for(ping, timeout=2) + logger.debug(f"Connection to {url} still alive...") + continue + except Exception: + logger.info( + f"Ping error {url} - retrying in 5s") + asyncio.sleep(2) + break + + except (socket.gaierror, ConnectionRefusedError): + logger.info("Connection Refused - retrying connection in 5s") + await asyncio.sleep(2) + continue + except websockets.exceptions.InvalidStatusCode as e: + logger.error(f"Connection Refused - {e}") + await asyncio.sleep(2) + continue + + except (asyncio.CancelledError, KeyboardInterrupt): + pass + + +def main(): + asyncio.run(_client()) + + +if __name__ == "__main__": + main()