stable/freqtrade/clients/client_manager.py
creslinux 631df64c68 To better align with multiple client protocols, this PR
Refactors the existing RPC directory, classes, defs as below.
This is to better reflect the nature of the files/classes/defs to support other clients - such as rest

The code is functional, ive tested every RPC button in Telegram.
1 unit test is failing ( they all were till find_replace) in test_rpc_telegram.

Can I ask for anothers opinion on why this 1 test fails, im at loss to debug
- mocker is stretching my working knowledge of python -

Ive renamed common methods from RPC_method_name to server_method_name
e.g rpc_stop becomes server_stop, the super class is CLIENTS not RPC as it is agnostic to RPC

freqtrade/rcp/ - rpc.py
......................- rpc_manager.py
......................- telegram.py

to:

freqtrade/clients/client_manager.py (was rpc_manager
......................-common/client.py (was rpc.py - but is agnostic to what client calls)
......................-rpc/telegram.py
......................-rpc/*discord.py (example future RPC client
......................-rest/*cmd line.py (example future rest cmdline rest client
......................-rest/*web.py (example future rest web client
2018-06-09 09:03:22 +00:00

58 lines
1.5 KiB
Python

"""
This module contains class to manage CLIENT communications (Telegram, Slack, ...)
"""
from typing import Any, List
import logging
from freqtrade.clients.rpc.telegram import Telegram
logger = logging.getLogger(__name__)
class ClientManager(object):
"""
Class to manage CLIENT objects (Telegram, Slack, ...)
"""
def __init__(self, freqtrade) -> None:
"""
Initializes all enabled rpc modules
:param config: config to use
:return: None
"""
self.freqtrade = freqtrade
self.registered_modules: List[str] = []
self.telegram: Any = None
self._init()
def _init(self) -> None:
"""
Init CLIENT modules
:return:
"""
if self.freqtrade.config['telegram'].get('enabled', False):
logger.info('Enabling rpc.telegram ...')
self.registered_modules.append('telegram')
self.telegram = Telegram(self.freqtrade)
def cleanup(self) -> None:
"""
Stops all enabled rpc modules
:return: None
"""
if 'telegram' in self.registered_modules:
logger.info('Cleaning up rpc.telegram ...')
self.registered_modules.remove('telegram')
self.telegram.cleanup()
def send_msg(self, msg: str) -> None:
"""
Send given markdown message to all registered rpc modules
:param msg: message
:return: None
"""
logger.info(msg)
if 'telegram' in self.registered_modules:
self.telegram.send_msg(msg)