Merge branch 'freqtrade:develop' into develop

This commit is contained in:
lolong
2022-10-27 23:26:13 +02:00
committed by GitHub
29 changed files with 328 additions and 96 deletions

View File

@@ -303,7 +303,7 @@ class IDataHandler(ABC):
timerange=timerange_startup,
candle_type=candle_type
)
if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data):
if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data, True):
return pairdf
else:
enddate = pairdf.iloc[-1]['date']
@@ -323,8 +323,9 @@ class IDataHandler(ABC):
self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data)
return pairdf
def _check_empty_df(self, pairdf: DataFrame, pair: str, timeframe: str,
candle_type: CandleType, warn_no_data: bool):
def _check_empty_df(
self, pairdf: DataFrame, pair: str, timeframe: str, candle_type: CandleType,
warn_no_data: bool, warn_price: bool = False) -> bool:
"""
Warn on empty dataframe
"""
@@ -335,6 +336,20 @@ class IDataHandler(ABC):
"Use `freqtrade download-data` to download the data"
)
return True
elif warn_price:
candle_price_gap = 0
if (candle_type in (CandleType.SPOT, CandleType.FUTURES) and
not pairdf.empty
and 'close' in pairdf.columns and 'open' in pairdf.columns):
# Detect gaps between prior close and open
gaps = ((pairdf['open'] - pairdf['close'].shift(1)) / pairdf['close'].shift(1))
gaps = gaps.dropna()
if len(gaps):
candle_price_gap = max(abs(gaps))
if candle_price_gap > 0.1:
logger.info(f"Price jump in {pair}, {timeframe}, {candle_type} between two candles "
f"of {candle_price_gap:.2%} detected.")
return False
def _validate_pairdata(self, pair, pairdata: DataFrame, timeframe: str,

View File

@@ -1996,9 +1996,9 @@ class Exchange:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
return not (
return (
(self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0)
+ interval_in_sec) >= arrow.utcnow().int_timestamp
+ interval_in_sec) < arrow.utcnow().int_timestamp
)
@retrier_async

View File

@@ -210,7 +210,10 @@ class FreqaiDataKitchen:
const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
if const_cols:
filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
self.data['constant_features_list'] = const_cols
logger.warning(f"Removed features {const_cols} with constant values.")
else:
self.data['constant_features_list'] = []
# we don't care about total row number (total no. datapoints) in training, we only care
# about removing any row with NaNs
# if labels has multiple columns (user wants to train multiple modelEs), we detect here
@@ -241,7 +244,8 @@ class FreqaiDataKitchen:
self.data["filter_drop_index_training"] = drop_index
else:
filtered_df = self.check_pred_labels(filtered_df)
if len(self.data['constant_features_list']):
filtered_df = self.check_pred_labels(filtered_df)
# we are backtesting so we need to preserve row number to send back to strategy,
# so now we use do_predict to avoid any prediction based on a NaN
drop_index = pd.isnull(filtered_df).any(axis=1)
@@ -464,18 +468,16 @@ class FreqaiDataKitchen:
def check_pred_labels(self, df_predictions: DataFrame) -> DataFrame:
"""
Check that prediction feature labels match training feature labels.
:params:
:df_predictions: incoming predictions
:param df_predictions: incoming predictions
"""
train_labels = self.data_dictionary["train_features"].columns
pred_labels = df_predictions.columns
num_diffs = len(pred_labels.difference(train_labels))
if num_diffs != 0:
df_predictions = df_predictions[train_labels]
logger.warning(
f"Removed {num_diffs} features from prediction features, "
f"these were likely considered constant values during most recent training."
)
constant_labels = self.data['constant_features_list']
df_predictions = df_predictions.filter(
df_predictions.columns.difference(constant_labels)
)
logger.warning(
f"Removed {len(constant_labels)} features from prediction features, "
f"these were considered constant values during most recent training."
)
return df_predictions

View File

@@ -26,9 +26,8 @@ class XGBoostRFClassifier(BaseClassifierModel):
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
:param data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
X = data_dictionary["train_features"].to_numpy()
@@ -65,7 +64,7 @@ class XGBoostRFClassifier(BaseClassifierModel):
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_df: Full dataframe for the current backtest period.
:param unfiltered_df: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove

View File

@@ -29,6 +29,7 @@ class XGBoostRFRegressor(BaseRegressionModel):
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) == 0:
eval_set = None
eval_weights = None
else:
eval_set = [(data_dictionary["test_features"], data_dictionary["test_labels"])]
eval_weights = [data_dictionary['test_weights']]

View File

@@ -29,6 +29,7 @@ class XGBoostRegressor(BaseRegressionModel):
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) == 0:
eval_set = None
eval_weights = None
else:
eval_set = [(data_dictionary["test_features"], data_dictionary["test_labels"])]
eval_weights = [data_dictionary['test_weights']]

View File

@@ -1,4 +1,3 @@
import asyncio
import logging
from typing import Any, Dict
@@ -11,6 +10,7 @@ from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.rpc.api_server.api_auth import validate_ws_token
from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc
from freqtrade.rpc.api_server.ws import WebSocketChannel
from freqtrade.rpc.api_server.ws.channel import ChannelManager
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSMessageSchema,
WSRequestSchema, WSWhitelistMessage)
from freqtrade.rpc.rpc import RPC
@@ -37,7 +37,8 @@ async def is_websocket_alive(ws: WebSocket) -> bool:
async def _process_consumer_request(
request: Dict[str, Any],
channel: WebSocketChannel,
rpc: RPC
rpc: RPC,
channel_manager: ChannelManager
):
"""
Validate and handle a request from a websocket consumer
@@ -74,7 +75,7 @@ async def _process_consumer_request(
# Format response
response = WSWhitelistMessage(data=whitelist)
# Send it back
await channel.send(response.dict(exclude_none=True))
await channel_manager.send_direct(channel, response.dict(exclude_none=True))
elif type == RPCRequestType.ANALYZED_DF:
limit = None
@@ -89,9 +90,7 @@ async def _process_consumer_request(
# For every dataframe, send as a separate message
for _, message in analyzed_df.items():
response = WSAnalyzedDFMessage(data=message)
await channel.send(response.dict(exclude_none=True))
# Throttle the messages to 50/s
await asyncio.sleep(0.02)
await channel_manager.send_direct(channel, response.dict(exclude_none=True))
@router.websocket("/message/ws")
@@ -116,7 +115,7 @@ async def message_endpoint(
request = await channel.recv()
# Process the request here
await _process_consumer_request(request, channel, rpc)
await _process_consumer_request(request, channel, rpc, channel_manager)
except (WebSocketDisconnect, WebSocketException):
# Handle client disconnects

View File

@@ -16,6 +16,7 @@ from freqtrade.constants import Config
from freqtrade.exceptions import OperationalException
from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer
from freqtrade.rpc.api_server.ws import ChannelManager
from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType
from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler
@@ -127,7 +128,7 @@ class ApiServer(RPCHandler):
cls._has_rpc = False
cls._rpc = None
def send_msg(self, msg: Dict[str, str]) -> None:
def send_msg(self, msg: Dict[str, Any]) -> None:
if self._ws_queue:
sync_q = self._ws_queue.sync_q
sync_q.put(msg)
@@ -194,14 +195,10 @@ class ApiServer(RPCHandler):
while True:
logger.debug("Getting queue messages...")
# Get data from queue
message = await async_queue.get()
message: WSMessageSchemaType = await async_queue.get()
logger.debug(f"Found message of type: {message.get('type')}")
# Broadcast it
await self._ws_channel_manager.broadcast(message)
# Limit messages per sec.
# Could cause problems with queue size if too low, and
# problems with network traffik if too high.
await asyncio.sleep(0.001)
except asyncio.CancelledError:
pass

View File

@@ -1,7 +1,7 @@
import asyncio
import logging
from threading import RLock
from typing import Any, Dict, List, Optional, Type
from typing import Any, Dict, List, Optional, Type, Union
from uuid import uuid4
from fastapi import WebSocket as FastAPIWebSocket
@@ -10,6 +10,7 @@ from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer,
WebSocketSerializer)
from freqtrade.rpc.api_server.ws.types import WebSocketType
from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType
logger = logging.getLogger(__name__)
@@ -24,6 +25,8 @@ class WebSocketChannel:
self,
websocket: WebSocketType,
channel_id: Optional[str] = None,
drain_timeout: int = 3,
throttle: float = 0.01,
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer
):
@@ -34,7 +37,11 @@ class WebSocketChannel:
# The Serializing class for the WebSocket object
self._serializer_cls = serializer_cls
self.drain_timeout = drain_timeout
self.throttle = throttle
self._subscriptions: List[str] = []
# 32 is the size of the receiving queue in websockets package
self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32)
self._relay_task = asyncio.create_task(self.relay())
@@ -47,6 +54,10 @@ class WebSocketChannel:
def __repr__(self):
return f"WebSocketChannel({self.channel_id}, {self.remote_addr})"
@property
def raw_websocket(self):
return self._websocket.raw_websocket
@property
def remote_addr(self):
return self._websocket.remote_addr
@@ -57,11 +68,19 @@ class WebSocketChannel:
"""
await self._wrapped_ws.send(data)
async def send(self, data):
async def send(self, data) -> bool:
"""
Add the data to the queue to be sent
Add the data to the queue to be sent.
:returns: True if data added to queue, False otherwise
"""
self.queue.put_nowait(data)
try:
await asyncio.wait_for(
self.queue.put(data),
timeout=self.drain_timeout
)
return True
except asyncio.TimeoutError:
return False
async def recv(self):
"""
@@ -119,8 +138,8 @@ class WebSocketChannel:
# Limit messages per sec.
# Could cause problems with queue size if too low, and
# problems with network traffik if too high.
# 0.001 = 1000/s
await asyncio.sleep(0.001)
# 0.01 = 100/s
await asyncio.sleep(self.throttle)
except RuntimeError:
# The connection was closed, just exit the task
return
@@ -160,6 +179,7 @@ class ChannelManager:
with self._lock:
channel = self.channels.get(websocket)
if channel:
logger.info(f"Disconnecting channel {channel}")
if not channel.is_closed():
await channel.close()
@@ -170,36 +190,30 @@ class ChannelManager:
Disconnect all Channels
"""
with self._lock:
for websocket, channel in self.channels.copy().items():
if not channel.is_closed():
await channel.close()
for websocket in self.channels.copy().keys():
await self.on_disconnect(websocket)
self.channels = dict()
async def broadcast(self, data):
async def broadcast(self, message: WSMessageSchemaType):
"""
Broadcast data on all Channels
Broadcast a message on all Channels
:param data: The data to send
:param message: The message to send
"""
with self._lock:
message_type = data.get('type')
for websocket, channel in self.channels.copy().items():
if channel.subscribed_to(message_type):
if not channel.queue.full():
await channel.send(data)
else:
logger.info(f"Channel {channel} is too far behind, disconnecting")
await self.on_disconnect(websocket)
for channel in self.channels.copy().values():
if channel.subscribed_to(message.get('type')):
await self.send_direct(channel, message)
async def send_direct(self, channel, data):
async def send_direct(
self, channel: WebSocketChannel, message: Union[WSMessageSchemaType, Dict[str, Any]]):
"""
Send data directly through direct_channel only
Send a message directly through direct_channel only
:param direct_channel: The WebSocketChannel object to send data through
:param data: The data to send
:param direct_channel: The WebSocketChannel object to send the message through
:param message: The message to send
"""
await channel.send(data)
if not await channel.send(message):
await self.on_disconnect(channel.raw_websocket)
def has_channels(self):
"""

View File

@@ -15,6 +15,10 @@ class WebSocketProxy:
def __init__(self, websocket: WebSocketType):
self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket
@property
def raw_websocket(self):
return self._websocket
@property
def remote_addr(self) -> Tuple[Any, ...]:
if isinstance(self._websocket, WebSocket):

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, TypedDict
from pandas import DataFrame
from pydantic import BaseModel
@@ -18,6 +18,12 @@ class WSRequestSchema(BaseArbitraryModel):
data: Optional[Any] = None
class WSMessageSchemaType(TypedDict):
# Type for typing to avoid doing pydantic typechecks.
type: RPCMessageType
data: Optional[Dict[str, Any]]
class WSMessageSchema(BaseArbitraryModel):
type: RPCMessageType
data: Optional[Any] = None

View File

@@ -270,6 +270,11 @@ class ExternalMessageConsumer:
logger.debug(f"Connection to {channel} still alive...")
continue
except (websockets.exceptions.ConnectionClosed):
# Just eat the error and continue reconnecting
logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except Exception as e:
logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s")
logger.debug(e, exc_info=e)

View File

@@ -1072,26 +1072,26 @@ class IStrategy(ABC, HyperStrategyMixin):
trade.stop_loss > (high or current_rate)
)
# Make sure current_profit is calculated using high for backtesting.
bound = (low if trade.is_short else high)
bound_profit = current_profit if not bound else trade.calc_profit_ratio(bound)
if self.use_custom_stoploss and dir_correct:
stop_loss_value = strategy_safe_wrapper(self.custom_stoploss, default_retval=None
)(pair=trade.pair, trade=trade,
current_time=current_time,
current_rate=current_rate,
current_profit=current_profit)
current_rate=(bound or current_rate),
current_profit=bound_profit)
# Sanity check - error cases will return None
if stop_loss_value:
# logger.info(f"{trade.pair} {stop_loss_value=} {current_profit=}")
trade.adjust_stop_loss(current_rate, stop_loss_value)
# logger.info(f"{trade.pair} {stop_loss_value=} {bound_profit=}")
trade.adjust_stop_loss(bound or current_rate, stop_loss_value)
else:
logger.warning("CustomStoploss function did not return valid stoploss")
if self.trailing_stop and dir_correct:
# trailing stoploss handling
sl_offset = self.trailing_stop_positive_offset
# Make sure current_profit is calculated using high for backtesting.
bound = low if trade.is_short else high
bound_profit = current_profit if not bound else trade.calc_profit_ratio(bound)
# Don't update stoploss if trailing_only_offset_is_reached is true.
if not (self.trailing_only_offset_is_reached and bound_profit < sl_offset):

View File

@@ -14,6 +14,7 @@ from freqtrade.configuration import Configuration
from freqtrade.constants import PROCESS_THROTTLE_SECS, RETRY_TIMEOUT, Config
from freqtrade.enums import State
from freqtrade.exceptions import OperationalException, TemporaryError
from freqtrade.exchange import timeframe_to_next_date
from freqtrade.freqtradebot import FreqtradeBot
@@ -35,7 +36,6 @@ class Worker:
self._config = config
self._init(False)
self.last_throttle_start_time: float = 0
self._heartbeat_msg: float = 0
# Tell systemd that we completed initialization phase
@@ -112,7 +112,10 @@ class Worker:
# Ping systemd watchdog before throttling
self._notify("WATCHDOG=1\nSTATUS=State: RUNNING.")
self._throttle(func=self._process_running, throttle_secs=self._throttle_secs)
# Use an offset of 1s to ensure a new candle has been issued
self._throttle(func=self._process_running, throttle_secs=self._throttle_secs,
timeframe=self._config['timeframe'] if self._config else None,
timeframe_offset=1)
if self._heartbeat_interval:
now = time.time()
@@ -127,24 +130,42 @@ class Worker:
return state
def _throttle(self, func: Callable[..., Any], throttle_secs: float, *args, **kwargs) -> Any:
def _throttle(self, func: Callable[..., Any], throttle_secs: float,
timeframe: Optional[str] = None, timeframe_offset: float = 1.0,
*args, **kwargs) -> Any:
"""
Throttles the given callable that it
takes at least `min_secs` to finish execution.
:param func: Any callable
:param throttle_secs: throttling interation execution time limit in seconds
:param timeframe: ensure iteration is executed at the beginning of the next candle.
:param timeframe_offset: offset in seconds to apply to the next candle time.
:return: Any (result of execution of func)
"""
self.last_throttle_start_time = time.time()
last_throttle_start_time = time.time()
logger.debug("========================================")
result = func(*args, **kwargs)
time_passed = time.time() - self.last_throttle_start_time
sleep_duration = max(throttle_secs - time_passed, 0.0)
time_passed = time.time() - last_throttle_start_time
sleep_duration = throttle_secs - time_passed
if timeframe:
next_tf = timeframe_to_next_date(timeframe)
# Maximum throttling should be until new candle arrives
# Offset of 0.2s is added to ensure a new candle has been issued.
next_tf_with_offset = next_tf.timestamp() - time.time() + timeframe_offset
sleep_duration = min(sleep_duration, next_tf_with_offset)
sleep_duration = max(sleep_duration, 0.0)
# next_iter = datetime.now(timezone.utc) + timedelta(seconds=sleep_duration)
logger.debug(f"Throttling with '{func.__name__}()': sleep for {sleep_duration:.2f} s, "
f"last iteration took {time_passed:.2f} s.")
time.sleep(sleep_duration)
self._sleep(sleep_duration)
return result
@staticmethod
def _sleep(sleep_duration: float) -> None:
"""Local sleep method - to improve testability"""
time.sleep(sleep_duration)
def _process_stopped(self) -> None:
self.freqtrade.process_stopped()