diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 2d2c7513a..349735dcd 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -1,9 +1,11 @@ """ Various tool function for Freqtrade and scripts """ +import asyncio import gzip import logging import re +import threading from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterator, List, Mapping, Union @@ -301,3 +303,44 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame): dataframe[SignalTagType.EXIT_TAG.value] = None return dataframe + + +def sync_to_async_iter(iter): + """ + Wrap blocking iterator into an asynchronous by + offloading computation to thread and using + pubsub pattern for yielding results + + :param iter: A synchronous iterator + :returns: An asynchronous iterator + """ + + loop = asyncio.get_event_loop() + q = asyncio.Queue(1) + exception = None + _END = object() + + async def yield_queue_items(): + while True: + next_item = await q.get() + if next_item is _END: + break + yield next_item + if exception is not None: + # The iterator has raised, propagate the exception + raise exception + + def iter_to_queue(): + nonlocal exception + try: + for item in iter: + # This runs outside the event loop thread, so we + # must use thread-safe API to talk to the queue. + asyncio.run_coroutine_threadsafe(q.put(item), loop).result() + except Exception as e: + exception = e + finally: + asyncio.run_coroutine_threadsafe(q.put(_END), loop).result() + + threading.Thread(target=iter_to_queue).start() + return yield_queue_items() diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index a80250c1b..6ecc1ef2a 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -7,6 +7,7 @@ from fastapi.websockets import WebSocket from pydantic import ValidationError from freqtrade.enums import RPCMessageType, RPCRequestType +from freqtrade.misc import sync_to_async_iter from freqtrade.rpc.api_server.api_auth import validate_ws_token from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel @@ -93,7 +94,7 @@ async def _process_consumer_request( limit = min(data.get('limit', 1500), 1500) if data else None # For every pair in the generator, send a separate message - for message in rpc._ws_request_analyzed_df(limit): + async for message in sync_to_async_iter(rpc._ws_request_analyzed_df(limit)): # Format response response = WSAnalyzedDFMessage(data=message) await channel.send(response.dict(exclude_none=True))