2022-08-29 19:41:15 +00:00
|
|
|
import logging
|
2022-08-31 16:40:26 +00:00
|
|
|
from typing import Any, Dict
|
2022-08-29 19:41:15 +00:00
|
|
|
|
2022-09-13 18:12:02 +00:00
|
|
|
from fastapi import APIRouter, Depends, WebSocketDisconnect
|
|
|
|
from fastapi.websockets import WebSocket, WebSocketState
|
2022-09-08 19:58:28 +00:00
|
|
|
from pydantic import ValidationError
|
2022-10-11 05:30:43 +00:00
|
|
|
from websockets.exceptions import WebSocketException
|
2022-08-29 19:41:15 +00:00
|
|
|
|
2022-08-31 16:40:26 +00:00
|
|
|
from freqtrade.enums import RPCMessageType, RPCRequestType
|
2022-09-10 12:19:11 +00:00
|
|
|
from freqtrade.rpc.api_server.api_auth import validate_ws_token
|
2022-09-02 02:06:36 +00:00
|
|
|
from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc
|
2022-09-13 18:42:24 +00:00
|
|
|
from freqtrade.rpc.api_server.ws import WebSocketChannel
|
2022-10-23 01:02:05 +00:00
|
|
|
from freqtrade.rpc.api_server.ws.channel import ChannelManager
|
2022-09-08 19:58:28 +00:00
|
|
|
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSMessageSchema,
|
|
|
|
WSRequestSchema, WSWhitelistMessage)
|
2022-09-02 02:06:36 +00:00
|
|
|
from freqtrade.rpc.rpc import RPC
|
2022-08-29 19:41:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
# Private router, protected by API Key authentication
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
|
2022-09-04 16:22:10 +00:00
|
|
|
async def is_websocket_alive(ws: WebSocket) -> bool:
|
2022-09-07 21:08:01 +00:00
|
|
|
"""
|
|
|
|
Check if a FastAPI Websocket is still open
|
|
|
|
"""
|
2022-09-04 16:22:10 +00:00
|
|
|
if (
|
|
|
|
ws.application_state == WebSocketState.CONNECTED and
|
|
|
|
ws.client_state == WebSocketState.CONNECTED
|
|
|
|
):
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2022-09-02 02:06:36 +00:00
|
|
|
async def _process_consumer_request(
|
|
|
|
request: Dict[str, Any],
|
|
|
|
channel: WebSocketChannel,
|
2022-10-23 01:02:05 +00:00
|
|
|
rpc: RPC,
|
|
|
|
channel_manager: ChannelManager
|
2022-09-02 02:06:36 +00:00
|
|
|
):
|
2022-09-07 21:08:01 +00:00
|
|
|
"""
|
|
|
|
Validate and handle a request from a websocket consumer
|
|
|
|
"""
|
|
|
|
# Validate the request, makes sure it matches the schema
|
|
|
|
try:
|
|
|
|
websocket_request = WSRequestSchema.parse_obj(request)
|
|
|
|
except ValidationError as e:
|
|
|
|
logger.error(f"Invalid request from {channel}: {e}")
|
|
|
|
return
|
|
|
|
|
|
|
|
type, data = websocket_request.type, websocket_request.data
|
2022-09-08 16:34:37 +00:00
|
|
|
response: WSMessageSchema
|
2022-08-31 16:40:26 +00:00
|
|
|
|
2022-09-02 21:05:16 +00:00
|
|
|
logger.debug(f"Request of type {type} from {channel}")
|
|
|
|
|
2022-08-31 16:40:26 +00:00
|
|
|
# If we have a request of type SUBSCRIBE, set the topics in this channel
|
|
|
|
if type == RPCRequestType.SUBSCRIBE:
|
2022-09-02 02:06:36 +00:00
|
|
|
# If the request is empty, do nothing
|
|
|
|
if not data:
|
|
|
|
return
|
|
|
|
|
2022-08-31 16:40:26 +00:00
|
|
|
# If all topics passed are a valid RPCMessageType, set subscriptions on channel
|
|
|
|
if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
|
|
|
|
channel.set_subscriptions(data)
|
|
|
|
|
2022-09-07 21:08:01 +00:00
|
|
|
# We don't send a response for subscriptions
|
2022-09-08 16:34:37 +00:00
|
|
|
return
|
2022-09-07 21:08:01 +00:00
|
|
|
|
2022-09-02 05:52:13 +00:00
|
|
|
elif type == RPCRequestType.WHITELIST:
|
2022-09-07 21:08:01 +00:00
|
|
|
# Get whitelist
|
2022-09-02 05:52:13 +00:00
|
|
|
whitelist = rpc._ws_request_whitelist()
|
2022-09-02 02:06:36 +00:00
|
|
|
|
2022-09-07 21:08:01 +00:00
|
|
|
# Format response
|
2022-10-23 17:42:59 +00:00
|
|
|
response = WSWhitelistMessage(data=whitelist)
|
2022-09-07 21:08:01 +00:00
|
|
|
# Send it back
|
2022-10-25 17:36:40 +00:00
|
|
|
await channel_manager.send_direct(channel, response.dict(exclude_none=True))
|
2022-09-02 02:06:36 +00:00
|
|
|
|
2022-09-02 05:52:13 +00:00
|
|
|
elif type == RPCRequestType.ANALYZED_DF:
|
2022-09-02 21:05:16 +00:00
|
|
|
limit = None
|
|
|
|
|
|
|
|
if data:
|
|
|
|
# Limit the amount of candles per dataframe to 'limit' or 1500
|
2022-09-07 21:08:01 +00:00
|
|
|
limit = max(data.get('limit', 1500), 1500)
|
2022-09-02 21:05:16 +00:00
|
|
|
|
2022-11-14 06:25:52 +00:00
|
|
|
# For every pair in the generator, send a separate message
|
|
|
|
for message in rpc._ws_request_analyzed_df(limit):
|
2022-10-23 17:42:59 +00:00
|
|
|
response = WSAnalyzedDFMessage(data=message)
|
2022-10-25 20:08:28 +00:00
|
|
|
await channel_manager.send_direct(channel, response.dict(exclude_none=True))
|
2022-09-02 02:06:36 +00:00
|
|
|
|
2022-08-31 16:40:26 +00:00
|
|
|
|
2022-08-29 19:41:15 +00:00
|
|
|
@router.websocket("/message/ws")
|
|
|
|
async def message_endpoint(
|
|
|
|
ws: WebSocket,
|
2022-09-02 02:06:36 +00:00
|
|
|
rpc: RPC = Depends(get_rpc),
|
|
|
|
channel_manager=Depends(get_channel_manager),
|
2022-09-10 12:19:11 +00:00
|
|
|
token: str = Depends(validate_ws_token)
|
2022-08-29 19:41:15 +00:00
|
|
|
):
|
2022-09-07 21:08:01 +00:00
|
|
|
"""
|
|
|
|
Message WebSocket endpoint, facilitates sending RPC messages
|
|
|
|
"""
|
2022-08-29 19:41:15 +00:00
|
|
|
try:
|
2022-09-08 19:58:28 +00:00
|
|
|
channel = await channel_manager.on_connect(ws)
|
|
|
|
if await is_websocket_alive(ws):
|
2022-09-08 17:25:30 +00:00
|
|
|
|
2022-09-02 21:05:16 +00:00
|
|
|
logger.info(f"Consumer connected - {channel}")
|
|
|
|
|
2022-08-31 01:21:34 +00:00
|
|
|
# Keep connection open until explicitly closed, and process requests
|
2022-08-29 19:41:15 +00:00
|
|
|
try:
|
|
|
|
while not channel.is_closed():
|
|
|
|
request = await channel.recv()
|
|
|
|
|
2022-09-02 21:05:16 +00:00
|
|
|
# Process the request here
|
2022-10-23 01:02:05 +00:00
|
|
|
await _process_consumer_request(request, channel, rpc, channel_manager)
|
2022-08-29 19:41:15 +00:00
|
|
|
|
2022-10-11 05:30:43 +00:00
|
|
|
except (WebSocketDisconnect, WebSocketException):
|
2022-08-29 19:41:15 +00:00
|
|
|
# Handle client disconnects
|
2022-09-02 21:05:16 +00:00
|
|
|
logger.info(f"Consumer disconnected - {channel}")
|
2022-10-09 21:04:52 +00:00
|
|
|
except RuntimeError:
|
2022-08-29 19:41:15 +00:00
|
|
|
# Handle cases like -
|
|
|
|
# RuntimeError('Cannot call "send" once a closed message has been sent')
|
2022-10-09 21:04:52 +00:00
|
|
|
pass
|
|
|
|
except Exception as e:
|
2022-10-11 05:30:43 +00:00
|
|
|
logger.info(f"Consumer connection failed - {channel}: {e}")
|
2022-10-09 21:04:52 +00:00
|
|
|
logger.debug(e, exc_info=e)
|
2022-09-08 19:58:28 +00:00
|
|
|
|
2022-09-09 17:38:42 +00:00
|
|
|
except RuntimeError:
|
|
|
|
# WebSocket was closed
|
2022-10-09 21:04:52 +00:00
|
|
|
# Do nothing
|
|
|
|
pass
|
2022-09-02 21:05:16 +00:00
|
|
|
except Exception as e:
|
2022-08-29 19:41:15 +00:00
|
|
|
logger.error(f"Failed to serve - {ws.client}")
|
2022-09-02 21:05:16 +00:00
|
|
|
# Log tracebacks to keep track of what errors are happening
|
|
|
|
logger.exception(e)
|
2022-10-09 21:04:52 +00:00
|
|
|
finally:
|
2022-11-02 19:26:27 +00:00
|
|
|
if channel:
|
|
|
|
await channel_manager.on_disconnect(ws)
|