merge develop into feat/freqai-rl-dev
This commit is contained in:
@@ -512,6 +512,7 @@ CONF_SCHEMA = {
|
||||
'minimum': 0,
|
||||
'maximum': 65535
|
||||
},
|
||||
'secure': {'type': 'boolean', 'default': False},
|
||||
'ws_token': {'type': 'string'},
|
||||
},
|
||||
'required': ['name', 'host', 'ws_token']
|
||||
|
@@ -1133,10 +1133,8 @@ class FreqtradeBot(LoggingMixin):
|
||||
trade.exit_reason = ExitType.STOPLOSS_ON_EXCHANGE.value
|
||||
self.update_trade_state(trade, trade.stoploss_order_id, stoploss_order,
|
||||
stoploss_order=True)
|
||||
# Lock pair for one candle to prevent immediate rebuys
|
||||
self.strategy.lock_pair(trade.pair, datetime.now(timezone.utc),
|
||||
reason='Auto lock')
|
||||
self._notify_exit(trade, "stoploss", True)
|
||||
self.handle_protections(trade.pair, trade.trade_direction)
|
||||
return True
|
||||
|
||||
if trade.open_order_id or not trade.is_open:
|
||||
@@ -1595,11 +1593,6 @@ class FreqtradeBot(LoggingMixin):
|
||||
trade.close_rate_requested = limit
|
||||
trade.exit_reason = exit_reason
|
||||
|
||||
if not sub_trade_amt:
|
||||
# Lock pair for one candle to prevent immediate re-trading
|
||||
self.strategy.lock_pair(trade.pair, datetime.now(timezone.utc),
|
||||
reason='Auto lock')
|
||||
|
||||
self._notify_exit(trade, order_type, sub_trade=bool(sub_trade_amt), order=order_obj)
|
||||
# In case of market sell orders the order can be closed immediately
|
||||
if order.get('status', 'unknown') in ('closed', 'expired'):
|
||||
@@ -1809,6 +1802,8 @@ class FreqtradeBot(LoggingMixin):
|
||||
self._notify_enter(trade, order, fill=True, sub_trade=sub_trade)
|
||||
|
||||
def handle_protections(self, pair: str, side: LongShort) -> None:
|
||||
# Lock pair for one candle to prevent immediate rebuys
|
||||
self.strategy.lock_pair(pair, datetime.now(timezone.utc), reason='Auto lock')
|
||||
prot_trig = self.protections.stop_per_pair(pair, side=side)
|
||||
if prot_trig:
|
||||
msg = {'type': RPCMessageType.PROTECTION_TRIGGER, }
|
||||
|
@@ -10,7 +10,8 @@ from typing import Any, Dict, Iterator, List, Mapping, Union
|
||||
from typing.io import IO
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pandas
|
||||
import orjson
|
||||
import pandas as pd
|
||||
import rapidjson
|
||||
|
||||
from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN
|
||||
@@ -256,29 +257,37 @@ def parse_db_uri_for_logging(uri: str):
|
||||
return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@')
|
||||
|
||||
|
||||
def dataframe_to_json(dataframe: pandas.DataFrame) -> str:
|
||||
def dataframe_to_json(dataframe: pd.DataFrame) -> str:
|
||||
"""
|
||||
Serialize a DataFrame for transmission over the wire using JSON
|
||||
:param dataframe: A pandas DataFrame
|
||||
:returns: A JSON string of the pandas DataFrame
|
||||
"""
|
||||
return dataframe.to_json(orient='split')
|
||||
# https://github.com/pandas-dev/pandas/issues/24889
|
||||
# https://github.com/pandas-dev/pandas/issues/40443
|
||||
# We need to convert to a dict to avoid mem leak
|
||||
def default(z):
|
||||
if isinstance(z, pd.Timestamp):
|
||||
return z.timestamp() * 1e3
|
||||
raise TypeError
|
||||
|
||||
return str(orjson.dumps(dataframe.to_dict(orient='split'), default=default), 'utf-8')
|
||||
|
||||
|
||||
def json_to_dataframe(data: str) -> pandas.DataFrame:
|
||||
def json_to_dataframe(data: str) -> pd.DataFrame:
|
||||
"""
|
||||
Deserialize JSON into a DataFrame
|
||||
:param data: A JSON string
|
||||
:returns: A pandas DataFrame from the JSON string
|
||||
"""
|
||||
dataframe = pandas.read_json(data, orient='split')
|
||||
dataframe = pd.read_json(data, orient='split')
|
||||
if 'date' in dataframe.columns:
|
||||
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True)
|
||||
dataframe['date'] = pd.to_datetime(dataframe['date'], unit='ms', utc=True)
|
||||
|
||||
return dataframe
|
||||
|
||||
|
||||
def remove_entry_exit_signals(dataframe: pandas.DataFrame):
|
||||
def remove_entry_exit_signals(dataframe: pd.DataFrame):
|
||||
"""
|
||||
Remove Entry and Exit signals from a DataFrame
|
||||
|
||||
|
@@ -194,6 +194,9 @@ class ApiServer(RPCHandler):
|
||||
try:
|
||||
while True:
|
||||
logger.debug("Getting queue messages...")
|
||||
if (qsize := async_queue.qsize()) > 20:
|
||||
# If the queue becomes too big for too long, this may indicate a problem.
|
||||
logger.warning(f"Queue size now {qsize}")
|
||||
# Get data from queue
|
||||
message: WSMessageSchemaType = await async_queue.get()
|
||||
logger.debug(f"Found message of type: {message.get('type')}")
|
||||
|
@@ -77,21 +77,24 @@ class WebSocketChannel:
|
||||
# until self.drain_timeout for the relay to drain the outgoing queue
|
||||
# We can't use asyncio.wait_for here because the queue may have been created with a
|
||||
# different eventloop
|
||||
start = time.time()
|
||||
while self.queue.full():
|
||||
await asyncio.sleep(1)
|
||||
if (time.time() - start) > self.drain_timeout:
|
||||
if not self.is_closed():
|
||||
start = time.time()
|
||||
while self.queue.full():
|
||||
await asyncio.sleep(1)
|
||||
if (time.time() - start) > self.drain_timeout:
|
||||
return False
|
||||
|
||||
# If for some reason the queue is still full, just return False
|
||||
try:
|
||||
self.queue.put_nowait(data)
|
||||
except asyncio.QueueFull:
|
||||
return False
|
||||
|
||||
# If for some reason the queue is still full, just return False
|
||||
try:
|
||||
self.queue.put_nowait(data)
|
||||
except asyncio.QueueFull:
|
||||
# If we got here everything is ok
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
# If we got here everything is ok
|
||||
return True
|
||||
|
||||
async def recv(self):
|
||||
"""
|
||||
Receive data on the wrapped websocket
|
||||
@@ -109,14 +112,14 @@ class WebSocketChannel:
|
||||
Close the WebSocketChannel
|
||||
"""
|
||||
|
||||
self._closed.set()
|
||||
self._relay_task.cancel()
|
||||
|
||||
try:
|
||||
await self.raw_websocket.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._closed.set()
|
||||
self._relay_task.cancel()
|
||||
|
||||
def is_closed(self) -> bool:
|
||||
"""
|
||||
Closed flag
|
||||
|
@@ -31,6 +31,7 @@ class Producer(TypedDict):
|
||||
name: str
|
||||
host: str
|
||||
port: int
|
||||
secure: bool
|
||||
ws_token: str
|
||||
|
||||
|
||||
@@ -180,7 +181,8 @@ class ExternalMessageConsumer:
|
||||
host, port = producer['host'], producer['port']
|
||||
token = producer['ws_token']
|
||||
name = producer['name']
|
||||
ws_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}"
|
||||
scheme = 'wss' if producer.get('secure', False) else 'ws'
|
||||
ws_url = f"{scheme}://{host}:{port}/api/v1/message/ws?token={token}"
|
||||
|
||||
# This will raise InvalidURI if the url is bad
|
||||
async with websockets.connect(
|
||||
|
@@ -774,6 +774,9 @@ class RPC:
|
||||
is_short = trade.is_short
|
||||
if not self._freqtrade.strategy.position_adjustment_enable:
|
||||
raise RPCException(f'position for {pair} already open - id: {trade.id}')
|
||||
if trade.open_order_id is not None:
|
||||
raise RPCException(f'position for {pair} already open - id: {trade.id} '
|
||||
f'and has open order {trade.open_order_id}')
|
||||
else:
|
||||
if Trade.get_open_trade_count() >= self._config['max_open_trades']:
|
||||
raise RPCException("Maximum number of trades is reached.")
|
||||
|
Reference in New Issue
Block a user