stable/freqtrade/rpc/rpc_manager.py

66 lines
1.9 KiB
Python
Raw Normal View History

2018-02-13 03:45:59 +00:00
"""
This module contains class to manage RPC communications (Telegram, Slack, ...)
"""
2018-05-31 19:08:11 +00:00
from typing import Any, List
2018-03-25 19:37:14 +00:00
import logging
import time
2018-02-13 03:45:59 +00:00
from freqtrade.rpc.telegram import Telegram
from freqtrade.rpc.local_rpc_server import LocalRPCSuperWrap
2018-02-13 03:45:59 +00:00
2018-03-25 19:37:14 +00:00
logger = logging.getLogger(__name__)
2018-02-13 03:45:59 +00:00
class RPCManager(object):
"""
Class to manage RPC objects (Telegram, Slack, ...)
"""
def __init__(self, freqtrade) -> None:
"""
Initializes all enabled rpc modules
:param config: config to use
:return: None
"""
self.freqtrade = freqtrade
2018-05-31 18:55:26 +00:00
self.registered_modules: List[str] = []
self.telegram: Any = None
2018-02-13 03:45:59 +00:00
self._init()
def _init(self) -> None:
2018-02-13 03:45:59 +00:00
"""
Init RPC modules
:return:
"""
if self.freqtrade.config['telegram'].get('enabled', False):
2018-03-25 19:37:14 +00:00
logger.info('Enabling rpc.telegram ...')
2018-02-13 03:45:59 +00:00
self.registered_modules.append('telegram')
self.telegram = Telegram(self.freqtrade)
# Added another RPC client - for cmdline local client.
# Uses existing superclass RPC build for Telegram
if self.freqtrade.config['localrpc'].get('enabled', False):
self.localRPC = LocalRPCSuperWrap(self.freqtrade)
time.sleep(1)
2018-02-13 03:45:59 +00:00
def cleanup(self) -> None:
"""
Stops all enabled rpc modules
:return: None
"""
if 'telegram' in self.registered_modules:
2018-03-25 19:37:14 +00:00
logger.info('Cleaning up rpc.telegram ...')
2018-02-13 03:45:59 +00:00
self.registered_modules.remove('telegram')
self.telegram.cleanup()
def send_msg(self, msg: str) -> None:
"""
Send given markdown message to all registered rpc modules
:param msg: message
:return: None
"""
2018-03-25 19:37:14 +00:00
logger.info(msg)
2018-02-13 03:45:59 +00:00
if 'telegram' in self.registered_modules:
self.telegram.send_msg(msg)