Revert "offload initial df computation to thread"
This reverts commit f268187e9b
.
This commit is contained in:
parent
7b0a76fb70
commit
fcf13580f1
@ -1,11 +1,9 @@
|
|||||||
"""
|
"""
|
||||||
Various tool function for Freqtrade and scripts
|
Various tool function for Freqtrade and scripts
|
||||||
"""
|
"""
|
||||||
import asyncio
|
|
||||||
import gzip
|
import gzip
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import threading
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Iterator, List, Mapping, Union
|
from typing import Any, Dict, Iterator, List, Mapping, Union
|
||||||
@ -303,44 +301,3 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame):
|
|||||||
dataframe[SignalTagType.EXIT_TAG.value] = None
|
dataframe[SignalTagType.EXIT_TAG.value] = None
|
||||||
|
|
||||||
return dataframe
|
return dataframe
|
||||||
|
|
||||||
|
|
||||||
def sync_to_async_iter(iter):
|
|
||||||
"""
|
|
||||||
Wrap blocking iterator into an asynchronous by
|
|
||||||
offloading computation to thread and using
|
|
||||||
pubsub pattern for yielding results
|
|
||||||
|
|
||||||
:param iter: A synchronous iterator
|
|
||||||
:returns: An asynchronous iterator
|
|
||||||
"""
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
q = asyncio.Queue(1)
|
|
||||||
exception = None
|
|
||||||
_END = object()
|
|
||||||
|
|
||||||
async def yield_queue_items():
|
|
||||||
while True:
|
|
||||||
next_item = await q.get()
|
|
||||||
if next_item is _END:
|
|
||||||
break
|
|
||||||
yield next_item
|
|
||||||
if exception is not None:
|
|
||||||
# The iterator has raised, propagate the exception
|
|
||||||
raise exception
|
|
||||||
|
|
||||||
def iter_to_queue():
|
|
||||||
nonlocal exception
|
|
||||||
try:
|
|
||||||
for item in iter:
|
|
||||||
# This runs outside the event loop thread, so we
|
|
||||||
# must use thread-safe API to talk to the queue.
|
|
||||||
asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
|
|
||||||
except Exception as e:
|
|
||||||
exception = e
|
|
||||||
finally:
|
|
||||||
asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()
|
|
||||||
|
|
||||||
threading.Thread(target=iter_to_queue).start()
|
|
||||||
return yield_queue_items()
|
|
||||||
|
@ -7,7 +7,6 @@ from fastapi.websockets import WebSocket
|
|||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
|
|
||||||
from freqtrade.enums import RPCMessageType, RPCRequestType
|
from freqtrade.enums import RPCMessageType, RPCRequestType
|
||||||
from freqtrade.misc import sync_to_async_iter
|
|
||||||
from freqtrade.rpc.api_server.api_auth import validate_ws_token
|
from freqtrade.rpc.api_server.api_auth import validate_ws_token
|
||||||
from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc
|
from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc
|
||||||
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
|
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
|
||||||
@ -94,7 +93,7 @@ async def _process_consumer_request(
|
|||||||
limit = min(data.get('limit', 1500), 1500) if data else None
|
limit = min(data.get('limit', 1500), 1500) if data else None
|
||||||
|
|
||||||
# For every pair in the generator, send a separate message
|
# For every pair in the generator, send a separate message
|
||||||
async for message in sync_to_async_iter(rpc._ws_request_analyzed_df(limit)):
|
for message in rpc._ws_request_analyzed_df(limit):
|
||||||
# Format response
|
# Format response
|
||||||
response = WSAnalyzedDFMessage(data=message)
|
response = WSAnalyzedDFMessage(data=message)
|
||||||
await channel.send(response.dict(exclude_none=True))
|
await channel.send(response.dict(exclude_none=True))
|
||||||
|
Loading…
Reference in New Issue
Block a user