Add send_msg capability to dataprovider
This commit is contained in:
parent
a0b9388757
commit
229e8864bb
@ -5,12 +5,14 @@ including ticker and orderbook data, live and historical candle (OHLCV) data
|
|||||||
Common Interface for bot and strategy to access data.
|
Common Interface for bot and strategy to access data.
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
from collections import deque
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from pandas import DataFrame
|
from pandas import DataFrame
|
||||||
|
|
||||||
from freqtrade.configuration import TimeRange
|
from freqtrade.configuration import TimeRange
|
||||||
|
from freqtrade.configuration.PeriodicCache import PeriodicCache
|
||||||
from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe
|
from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe
|
||||||
from freqtrade.data.history import load_pair_history
|
from freqtrade.data.history import load_pair_history
|
||||||
from freqtrade.enums import CandleType, RunMode
|
from freqtrade.enums import CandleType, RunMode
|
||||||
@ -33,6 +35,9 @@ class DataProvider:
|
|||||||
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
|
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
|
||||||
self.__slice_index: Optional[int] = None
|
self.__slice_index: Optional[int] = None
|
||||||
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
|
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
|
||||||
|
self._msg_queue: deque = deque()
|
||||||
|
self.__msg_cache = PeriodicCache(
|
||||||
|
maxsize=1000, ttl=timeframe_to_seconds(self._config['timeframe']))
|
||||||
|
|
||||||
def _set_dataframe_max_index(self, limit_index: int):
|
def _set_dataframe_max_index(self, limit_index: int):
|
||||||
"""
|
"""
|
||||||
@ -265,3 +270,19 @@ class DataProvider:
|
|||||||
if self._exchange is None:
|
if self._exchange is None:
|
||||||
raise OperationalException(NO_EXCHANGE_EXCEPTION)
|
raise OperationalException(NO_EXCHANGE_EXCEPTION)
|
||||||
return self._exchange.fetch_l2_order_book(pair, maximum)
|
return self._exchange.fetch_l2_order_book(pair, maximum)
|
||||||
|
|
||||||
|
def send_msg(self, message: str, always_send: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
TODO: Document me
|
||||||
|
:param message: Message to be sent. Must be below 4096.
|
||||||
|
:param always_send: If False, will send the message only once per candle, and surpress
|
||||||
|
identical messages.
|
||||||
|
Careful as this can end up spaming your chat.
|
||||||
|
Defaults to False
|
||||||
|
"""
|
||||||
|
if self.runmode not in (RunMode.DRY_RUN, RunMode.LIVE):
|
||||||
|
return
|
||||||
|
|
||||||
|
if always_send or message not in self.__msg_cache:
|
||||||
|
self._msg_queue.append(message)
|
||||||
|
self.__msg_cache[message] = True
|
||||||
|
@ -17,6 +17,8 @@ class RPCMessageType(Enum):
|
|||||||
PROTECTION_TRIGGER = 'protection_trigger'
|
PROTECTION_TRIGGER = 'protection_trigger'
|
||||||
PROTECTION_TRIGGER_GLOBAL = 'protection_trigger_global'
|
PROTECTION_TRIGGER_GLOBAL = 'protection_trigger_global'
|
||||||
|
|
||||||
|
STRATEGY_MSG = 'strategy_msg'
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return self.value
|
return self.value
|
||||||
|
|
||||||
|
@ -214,6 +214,7 @@ class FreqtradeBot(LoggingMixin):
|
|||||||
if self.trading_mode == TradingMode.FUTURES:
|
if self.trading_mode == TradingMode.FUTURES:
|
||||||
self._schedule.run_pending()
|
self._schedule.run_pending()
|
||||||
Trade.commit()
|
Trade.commit()
|
||||||
|
self.rpc.process_msg_queue(self.dataprovider._msg_queue)
|
||||||
self.last_process = datetime.now(timezone.utc)
|
self.last_process = datetime.now(timezone.utc)
|
||||||
|
|
||||||
def process_stopped(self) -> None:
|
def process_stopped(self) -> None:
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
This module contains class to manage RPC communications (Telegram, API, ...)
|
This module contains class to manage RPC communications (Telegram, API, ...)
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
from collections import deque
|
||||||
from typing import Any, Dict, List
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
from freqtrade.enums import RPCMessageType
|
from freqtrade.enums import RPCMessageType
|
||||||
@ -77,6 +78,17 @@ class RPCManager:
|
|||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
logger.error(f"Message type '{msg['type']}' not implemented by handler {mod.name}.")
|
logger.error(f"Message type '{msg['type']}' not implemented by handler {mod.name}.")
|
||||||
|
|
||||||
|
def process_msg_queue(self, queue: deque) -> None:
|
||||||
|
"""
|
||||||
|
Process all messages in the queue.
|
||||||
|
"""
|
||||||
|
while queue:
|
||||||
|
msg = queue.popleft()
|
||||||
|
self.send_msg({
|
||||||
|
'type': RPCMessageType.STRATEGY_MSG,
|
||||||
|
'msg': msg,
|
||||||
|
})
|
||||||
|
|
||||||
def startup_messages(self, config: Dict[str, Any], pairlist, protections) -> None:
|
def startup_messages(self, config: Dict[str, Any], pairlist, protections) -> None:
|
||||||
if config['dry_run']:
|
if config['dry_run']:
|
||||||
self.send_msg({
|
self.send_msg({
|
||||||
|
@ -376,7 +376,8 @@ class Telegram(RPCHandler):
|
|||||||
|
|
||||||
elif msg_type == RPCMessageType.STARTUP:
|
elif msg_type == RPCMessageType.STARTUP:
|
||||||
message = f"{msg['status']}"
|
message = f"{msg['status']}"
|
||||||
|
elif msg_type == RPCMessageType.STRATEGY_MSG:
|
||||||
|
message = f"{msg['msg']}"
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(f"Unknown message type: {msg_type}")
|
raise NotImplementedError(f"Unknown message type: {msg_type}")
|
||||||
return message
|
return message
|
||||||
|
Loading…
Reference in New Issue
Block a user