offload initial df computation to thread

This commit is contained in:
Timothy Pogue 2022-11-25 12:56:33 -07:00
parent afc00bc30a
commit f268187e9b
2 changed files with 45 additions and 1 deletions

View File

@ -1,9 +1,11 @@
""" """
Various tool function for Freqtrade and scripts Various tool function for Freqtrade and scripts
""" """
import asyncio
import gzip import gzip
import logging import logging
import re import re
import threading
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterator, List, Mapping, Union from typing import Any, Dict, Iterator, List, Mapping, Union
@ -301,3 +303,44 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame):
dataframe[SignalTagType.EXIT_TAG.value] = None dataframe[SignalTagType.EXIT_TAG.value] = None
return dataframe return dataframe
def sync_to_async_iter(iter):
"""
Wrap blocking iterator into an asynchronous by
offloading computation to thread and using
pubsub pattern for yielding results
:param iter: A synchronous iterator
:returns: An asynchronous iterator
"""
loop = asyncio.get_event_loop()
q = asyncio.Queue(1)
exception = None
_END = object()
async def yield_queue_items():
while True:
next_item = await q.get()
if next_item is _END:
break
yield next_item
if exception is not None:
# The iterator has raised, propagate the exception
raise exception
def iter_to_queue():
nonlocal exception
try:
for item in iter:
# This runs outside the event loop thread, so we
# must use thread-safe API to talk to the queue.
asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
except Exception as e:
exception = e
finally:
asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()
threading.Thread(target=iter_to_queue).start()
return yield_queue_items()

View File

@ -7,6 +7,7 @@ from fastapi.websockets import WebSocket
from pydantic import ValidationError from pydantic import ValidationError
from freqtrade.enums import RPCMessageType, RPCRequestType from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.misc import sync_to_async_iter
from freqtrade.rpc.api_server.api_auth import validate_ws_token from freqtrade.rpc.api_server.api_auth import validate_ws_token
from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
@ -93,7 +94,7 @@ async def _process_consumer_request(
limit = min(data.get('limit', 1500), 1500) if data else None limit = min(data.get('limit', 1500), 1500) if data else None
# For every pair in the generator, send a separate message # For every pair in the generator, send a separate message
for message in rpc._ws_request_analyzed_df(limit): async for message in sync_to_async_iter(rpc._ws_request_analyzed_df(limit)):
# Format response # Format response
response = WSAnalyzedDFMessage(data=message) response = WSAnalyzedDFMessage(data=message)
await channel.send(response.dict(exclude_none=True)) await channel.send(response.dict(exclude_none=True))