From fcf13580f14aea8e889eaf1af82140eb17596d5c Mon Sep 17 00:00:00 2001 From: Matthias Date: Sat, 26 Nov 2022 13:33:54 +0100 Subject: [PATCH] Revert "offload initial df computation to thread" This reverts commit f268187e9b357127151ae45704538aed6c89f7f5. --- freqtrade/misc.py | 43 ------------------------------ freqtrade/rpc/api_server/api_ws.py | 3 +-- 2 files changed, 1 insertion(+), 45 deletions(-) diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 349735dcd..2d2c7513a 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -1,11 +1,9 @@ """ Various tool function for Freqtrade and scripts """ -import asyncio import gzip import logging import re -import threading from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterator, List, Mapping, Union @@ -303,44 +301,3 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame): dataframe[SignalTagType.EXIT_TAG.value] = None return dataframe - - -def sync_to_async_iter(iter): - """ - Wrap blocking iterator into an asynchronous by - offloading computation to thread and using - pubsub pattern for yielding results - - :param iter: A synchronous iterator - :returns: An asynchronous iterator - """ - - loop = asyncio.get_event_loop() - q = asyncio.Queue(1) - exception = None - _END = object() - - async def yield_queue_items(): - while True: - next_item = await q.get() - if next_item is _END: - break - yield next_item - if exception is not None: - # The iterator has raised, propagate the exception - raise exception - - def iter_to_queue(): - nonlocal exception - try: - for item in iter: - # This runs outside the event loop thread, so we - # must use thread-safe API to talk to the queue. - asyncio.run_coroutine_threadsafe(q.put(item), loop).result() - except Exception as e: - exception = e - finally: - asyncio.run_coroutine_threadsafe(q.put(_END), loop).result() - - threading.Thread(target=iter_to_queue).start() - return yield_queue_items() diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 9e7bb17a4..e183cd7e7 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -7,7 +7,6 @@ from fastapi.websockets import WebSocket from pydantic import ValidationError from freqtrade.enums import RPCMessageType, RPCRequestType -from freqtrade.misc import sync_to_async_iter from freqtrade.rpc.api_server.api_auth import validate_ws_token from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel @@ -94,7 +93,7 @@ async def _process_consumer_request( limit = min(data.get('limit', 1500), 1500) if data else None # For every pair in the generator, send a separate message - async for message in sync_to_async_iter(rpc._ws_request_analyzed_df(limit)): + for message in rpc._ws_request_analyzed_df(limit): # Format response response = WSAnalyzedDFMessage(data=message) await channel.send(response.dict(exclude_none=True))