fix constants, update config example, add emc config validation
This commit is contained in:
@@ -101,8 +101,6 @@ async def message_endpoint(
|
||||
Message WebSocket endpoint, facilitates sending RPC messages
|
||||
"""
|
||||
try:
|
||||
# TODO:
|
||||
# Return a channel ID, pass that instead of ws to the rest of the methods
|
||||
channel = await channel_manager.on_connect(ws)
|
||||
|
||||
if await is_websocket_alive(ws):
|
||||
|
@@ -57,9 +57,6 @@ class ExternalMessageConsumer:
|
||||
self.enabled = self._emc_config.get('enabled', False)
|
||||
self.producers = self._emc_config.get('producers', [])
|
||||
|
||||
if self.enabled and len(self.producers) < 1:
|
||||
raise OperationalException("You must specify at least 1 Producer to connect to.")
|
||||
|
||||
self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds
|
||||
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
|
||||
self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds
|
||||
@@ -71,6 +68,8 @@ class ExternalMessageConsumer:
|
||||
# as the websockets client expects bytes.
|
||||
self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20)
|
||||
|
||||
self.validate_config()
|
||||
|
||||
# Setting these explicitly as they probably shouldn't be changed by a user
|
||||
# Unless we somehow integrate this with the strategy to allow creating
|
||||
# callbacks for the messages
|
||||
@@ -91,6 +90,18 @@ class ExternalMessageConsumer:
|
||||
|
||||
self.start()
|
||||
|
||||
def validate_config(self):
|
||||
"""
|
||||
Make sure values are what they are supposed to be
|
||||
"""
|
||||
if self.enabled and len(self.producers) < 1:
|
||||
raise OperationalException("You must specify at least 1 Producer to connect to.")
|
||||
|
||||
if self.enabled and self._config.get('process_only_new_candles', True):
|
||||
# Warning here or require it?
|
||||
logger.warning("To receive best performance with external data,"
|
||||
"please set `process_only_new_candles` to False")
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the main internal loop in another thread to run coroutines
|
||||
@@ -174,12 +185,10 @@ class ExternalMessageConsumer:
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
url, token = producer['url'], producer['ws_token']
|
||||
name = producer["name"]
|
||||
ws_url = f"{url}?token={token}"
|
||||
|
||||
logger.info(
|
||||
f"Connecting to {name} @ {url}, max message size: {self.message_size_limit}")
|
||||
host, port = producer['host'], producer['port']
|
||||
token = producer['ws_token']
|
||||
name = producer['name']
|
||||
ws_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}"
|
||||
|
||||
# This will raise InvalidURI if the url is bad
|
||||
async with websockets.connect(ws_url, max_size=self.message_size_limit) as ws:
|
||||
|
Reference in New Issue
Block a user