Add typedDict for RPC messages

Currently not fully functional.
This commit is contained in:
Matthias
2022-09-20 19:51:42 +02:00
parent b317524ed7
commit 0ece73578c
7 changed files with 101 additions and 12 deletions

View File

@@ -1,6 +1,6 @@
import logging
from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from typing import Any, Optional
import orjson
import uvicorn
@@ -13,6 +13,7 @@ from freqtrade.exceptions import OperationalException
from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler
from freqtrade.rpc.rpc_types import RPCSendMsg
logger = logging.getLogger(__name__)
@@ -108,7 +109,7 @@ class ApiServer(RPCHandler):
cls._has_rpc = False
cls._rpc = None
def send_msg(self, msg: Dict[str, Any]) -> None:
def send_msg(self, msg: RPCSendMsg) -> None:
"""
Publish the message to the message stream
"""