diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index e380cf391..acf47b065 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -4,17 +4,15 @@ Freqtrade is the main module of this bot. It contains the class Freqtrade() import copy import logging -import time import traceback from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import arrow from requests.exceptions import RequestException -import sdnotify from freqtrade import (DependencyException, OperationalException, - TemporaryError, __version__, constants, persistence) + __version__, constants, persistence) from freqtrade.data.converter import order_book_to_dataframe from freqtrade.data.dataprovider import DataProvider from freqtrade.edge import Edge @@ -42,20 +40,14 @@ class FreqtradeBot(object): to get the config dict. """ - logger.info( - 'Starting freqtrade %s', - __version__, - ) + logger.info('Starting freqtrade %s', __version__) - # Init bot states + # Init bot state self.state = State.STOPPED # Init objects self.config = config - self._sd_notify = sdnotify.SystemdNotifier() if \ - self.config.get('internals', {}).get('sd_notify', False) else None - self.strategy: IStrategy = StrategyResolver(self.config).strategy self.rpc: RPCManager = RPCManager(self) @@ -79,29 +71,12 @@ class FreqtradeBot(object): self.config.get('edge', {}).get('enabled', False) else None self.active_pair_whitelist: List[str] = self.config['exchange']['pair_whitelist'] - self._init_modules() - - # Tell the systemd that we completed initialization phase - if self._sd_notify: - logger.debug("sd_notify: READY=1") - self._sd_notify.notify("READY=1") - - def _init_modules(self) -> None: - """ - Initializes all modules and updates the config - :return: None - """ - # Initialize all modules persistence.init(self.config) - # Set initial application state + # Set initial bot state from config initial_state = self.config.get('initial_state') - - if initial_state: - self.state = State[initial_state.upper()] - else: - self.state = State.STOPPED + self.state = State[initial_state.upper()] if initial_state else State.STOPPED def cleanup(self) -> None: """ @@ -113,130 +88,50 @@ class FreqtradeBot(object): self.rpc.cleanup() persistence.cleanup() - def stopping(self) -> None: - # Tell systemd that we are exiting now - if self._sd_notify: - logger.debug("sd_notify: STOPPING=1") - self._sd_notify.notify("STOPPING=1") - - def reconfigure(self) -> None: - # Tell systemd that we initiated reconfiguring - if self._sd_notify: - logger.debug("sd_notify: RELOADING=1") - self._sd_notify.notify("RELOADING=1") - - def worker(self, old_state: State = None) -> State: - """ - Trading routine that must be run at each loop - :param old_state: the previous service state from the previous call - :return: current service state - """ - # Log state transition - state = self.state - if state != old_state: - self.rpc.send_msg({ - 'type': RPCMessageType.STATUS_NOTIFICATION, - 'status': f'{state.name.lower()}' - }) - logger.info('Changing state to: %s', state.name) - if state == State.RUNNING: - self.rpc.startup_messages(self.config, self.pairlists) - - throttle_secs = self.config.get('internals', {}).get( - 'process_throttle_secs', - constants.PROCESS_THROTTLE_SECS - ) - - if state == State.STOPPED: - # Ping systemd watchdog before sleeping in the stopped state - if self._sd_notify: - logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.") - self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.") - - time.sleep(throttle_secs) - - elif state == State.RUNNING: - # Ping systemd watchdog before throttling - if self._sd_notify: - logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.") - self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.") - - self._throttle(func=self._process, min_secs=throttle_secs) - - return state - - def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any: - """ - Throttles the given callable that it - takes at least `min_secs` to finish execution. - :param func: Any callable - :param min_secs: minimum execution time in seconds - :return: Any - """ - start = time.time() - result = func(*args, **kwargs) - end = time.time() - duration = max(min_secs - (end - start), 0.0) - logger.debug('Throttling %s for %.2f seconds', func.__name__, duration) - time.sleep(duration) - return result - - def _process(self) -> bool: + def process(self) -> bool: """ Queries the persistence layer for open trades and handles them, otherwise a new trade is created. :return: True if one or more trades has been created or closed, False otherwise """ state_changed = False - try: - # Check whether markets have to be reloaded - self.exchange._reload_markets() - # Refresh whitelist - self.pairlists.refresh_pairlist() - self.active_pair_whitelist = self.pairlists.whitelist + # Check whether markets have to be reloaded + self.exchange._reload_markets() - # Calculating Edge positioning - if self.edge: - self.edge.calculate() - self.active_pair_whitelist = self.edge.adjust(self.active_pair_whitelist) + # Refresh whitelist + self.pairlists.refresh_pairlist() + self.active_pair_whitelist = self.pairlists.whitelist - # Query trades from persistence layer - trades = Trade.get_open_trades() + # Calculating Edge positioning + if self.edge: + self.edge.calculate() + self.active_pair_whitelist = self.edge.adjust(self.active_pair_whitelist) - # Extend active-pair whitelist with pairs from open trades - # It ensures that tickers are downloaded for open trades - self._extend_whitelist_with_trades(self.active_pair_whitelist, trades) + # Query trades from persistence layer + trades = Trade.get_open_trades() - # Refreshing candles - self.dataprovider.refresh(self._create_pair_whitelist(self.active_pair_whitelist), - self.strategy.informative_pairs()) + # Extend active-pair whitelist with pairs from open trades + # It ensures that tickers are downloaded for open trades + self._extend_whitelist_with_trades(self.active_pair_whitelist, trades) - # First process current opened trades - for trade in trades: - state_changed |= self.process_maybe_execute_sell(trade) + # Refreshing candles + self.dataprovider.refresh(self._create_pair_whitelist(self.active_pair_whitelist), + self.strategy.informative_pairs()) - # Then looking for buy opportunities - if len(trades) < self.config['max_open_trades']: - state_changed = self.process_maybe_execute_buy() + # First process current opened trades + for trade in trades: + state_changed |= self.process_maybe_execute_sell(trade) - if 'unfilledtimeout' in self.config: - # Check and handle any timed out open orders - self.check_handle_timedout() - Trade.session.flush() + # Then looking for buy opportunities + if len(trades) < self.config['max_open_trades']: + state_changed = self.process_maybe_execute_buy() + + if 'unfilledtimeout' in self.config: + # Check and handle any timed out open orders + self.check_handle_timedout() + Trade.session.flush() - except TemporaryError as error: - logger.warning(f"Error: {error}, retrying in {constants.RETRY_TIMEOUT} seconds...") - time.sleep(constants.RETRY_TIMEOUT) - except OperationalException: - tb = traceback.format_exc() - hint = 'Issue `/start` if you think it is safe to restart.' - self.rpc.send_msg({ - 'type': RPCMessageType.STATUS_NOTIFICATION, - 'status': f'OperationalException:\n```\n{tb}```{hint}' - }) - logger.exception('OperationalException. Stopping trader ...') - self.state = State.STOPPED return state_changed def _extend_whitelist_with_trades(self, whitelist: List[str], trades: List[Any]): diff --git a/freqtrade/main.py b/freqtrade/main.py index c41d54f0e..877e2921d 100755 --- a/freqtrade/main.py +++ b/freqtrade/main.py @@ -10,10 +10,9 @@ from typing import List from freqtrade import OperationalException from freqtrade.arguments import Arguments -from freqtrade.configuration import Configuration, set_loggers -from freqtrade.freqtradebot import FreqtradeBot -from freqtrade.state import State -from freqtrade.rpc import RPCMessageType +from freqtrade.configuration import set_loggers +from freqtrade.worker import Worker + logger = logging.getLogger('freqtrade') @@ -27,7 +26,7 @@ def main(sysargv: List[str]) -> None: sysargv, 'Free, open source crypto trading bot' ) - args = arguments.get_parsed_arg() + args: Namespace = arguments.get_parsed_arg() # A subcommand has been issued. # Means if Backtesting or Hyperopt have been called we exit the bot @@ -35,20 +34,12 @@ def main(sysargv: List[str]) -> None: args.func(args) return - freqtrade = None + worker = None return_code = 1 try: - # Load and validate configuration - config = Configuration(args, None).get_config() - - # Init the bot - freqtrade = FreqtradeBot(config) - - state = None - while True: - state = freqtrade.worker(old_state=state) - if state == State.RELOAD_CONF: - freqtrade = reconfigure(freqtrade, args) + # Load and run worker + worker = Worker(args) + worker.run() except KeyboardInterrupt: logger.info('SIGINT received, aborting ...') @@ -59,34 +50,11 @@ def main(sysargv: List[str]) -> None: except BaseException: logger.exception('Fatal exception!') finally: - if freqtrade: - freqtrade.stopping() - freqtrade.rpc.send_msg({ - 'type': RPCMessageType.STATUS_NOTIFICATION, - 'status': 'process died' - }) - freqtrade.cleanup() + if worker: + worker.exit() sys.exit(return_code) -def reconfigure(freqtrade: FreqtradeBot, args: Namespace) -> FreqtradeBot: - """ - Cleans up current instance, reloads the configuration and returns the new instance - """ - freqtrade.reconfigure() - - # Clean up current modules - freqtrade.cleanup() - - # Create new instance - freqtrade = FreqtradeBot(Configuration(args, None).get_config()) - freqtrade.rpc.send_msg({ - 'type': RPCMessageType.STATUS_NOTIFICATION, - 'status': 'config reloaded' - }) - return freqtrade - - if __name__ == '__main__': set_loggers() main(sys.argv[1:]) diff --git a/freqtrade/tests/conftest.py b/freqtrade/tests/conftest.py index cba754199..0bff1d5e9 100644 --- a/freqtrade/tests/conftest.py +++ b/freqtrade/tests/conftest.py @@ -16,6 +16,7 @@ from freqtrade.edge import Edge, PairInfo from freqtrade.exchange import Exchange from freqtrade.freqtradebot import FreqtradeBot from freqtrade.resolvers import ExchangeResolver +from freqtrade.worker import Worker logging.getLogger('').setLevel(logging.INFO) @@ -87,7 +88,7 @@ def get_patched_edge(mocker, config) -> Edge: # Functions for recurrent object patching -def get_patched_freqtradebot(mocker, config) -> FreqtradeBot: +def patch_freqtradebot(mocker, config) -> None: """ This function patch _init_modules() to not call dependencies :param mocker: a Mocker object to apply patches @@ -100,9 +101,17 @@ def get_patched_freqtradebot(mocker, config) -> FreqtradeBot: mocker.patch('freqtrade.freqtradebot.RPCManager._init', MagicMock()) mocker.patch('freqtrade.freqtradebot.RPCManager.send_msg', MagicMock()) + +def get_patched_freqtradebot(mocker, config) -> FreqtradeBot: + patch_freqtradebot(mocker, config) return FreqtradeBot(config) +def get_patched_worker(mocker, config) -> Worker: + patch_freqtradebot(mocker, config) + return Worker(args=None, config=config) + + @pytest.fixture(autouse=True) def patch_coinmarketcap(mocker) -> None: """ diff --git a/freqtrade/tests/test_freqtradebot.py b/freqtrade/tests/test_freqtradebot.py index 7e02bb714..b77e0a610 100644 --- a/freqtrade/tests/test_freqtradebot.py +++ b/freqtrade/tests/test_freqtradebot.py @@ -21,12 +21,13 @@ from freqtrade.state import State from freqtrade.strategy.interface import SellCheckTuple, SellType from freqtrade.tests.conftest import (log_has, log_has_re, patch_edge, patch_exchange, patch_wallet) +from freqtrade.worker import Worker # Functions for recurrent object patching -def get_patched_freqtradebot(mocker, config) -> FreqtradeBot: +def patch_freqtradebot(mocker, config) -> None: """ - This function patch _init_modules() to not call dependencies + This function patches _init_modules() to not call dependencies :param mocker: a Mocker object to apply patches :param config: Config to pass to the bot :return: None @@ -35,9 +36,29 @@ def get_patched_freqtradebot(mocker, config) -> FreqtradeBot: mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock()) patch_exchange(mocker) + +def get_patched_freqtradebot(mocker, config) -> FreqtradeBot: + """ + This function patches _init_modules() to not call dependencies + :param mocker: a Mocker object to apply patches + :param config: Config to pass to the bot + :return: FreqtradeBot + """ + patch_freqtradebot(mocker, config) return FreqtradeBot(config) +def get_patched_worker(mocker, config) -> Worker: + """ + This function patches _init_modules() to not call dependencies + :param mocker: a Mocker object to apply patches + :param config: Config to pass to the bot + :return: Worker + """ + patch_freqtradebot(mocker, config) + return Worker(args=None, config=config) + + def patch_get_signal(freqtrade: FreqtradeBot, value=(True, False)) -> None: """ :param mocker: mocker to patch IStrategy class @@ -61,7 +82,7 @@ def patch_RPCManager(mocker) -> MagicMock: # Unit tests -def test_freqtradebot(mocker, default_conf, markets) -> None: +def test_freqtradebot_state(mocker, default_conf, markets) -> None: mocker.patch('freqtrade.exchange.Exchange.markets', PropertyMock(return_value=markets)) freqtrade = get_patched_freqtradebot(mocker, default_conf) assert freqtrade.state is State.RUNNING @@ -71,6 +92,16 @@ def test_freqtradebot(mocker, default_conf, markets) -> None: assert freqtrade.state is State.STOPPED +def test_worker_state(mocker, default_conf, markets) -> None: + mocker.patch('freqtrade.exchange.Exchange.markets', PropertyMock(return_value=markets)) + worker = get_patched_worker(mocker, default_conf) + assert worker.state is State.RUNNING + + default_conf.pop('initial_state') + worker = Worker(args=None, config=default_conf) + assert worker.state is State.STOPPED + + def test_cleanup(mocker, default_conf, caplog) -> None: mock_cleanup = MagicMock() mocker.patch('freqtrade.persistence.cleanup', mock_cleanup) @@ -82,28 +113,28 @@ def test_cleanup(mocker, default_conf, caplog) -> None: def test_worker_running(mocker, default_conf, caplog) -> None: mock_throttle = MagicMock() - mocker.patch('freqtrade.freqtradebot.FreqtradeBot._throttle', mock_throttle) + mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle) - freqtrade = get_patched_freqtradebot(mocker, default_conf) + worker = get_patched_worker(mocker, default_conf) - state = freqtrade.worker(old_state=None) + state = worker._worker(old_state=None) assert state is State.RUNNING assert log_has('Changing state to: RUNNING', caplog.record_tuples) assert mock_throttle.call_count == 1 # Check strategy is loaded, and received a dataprovider object - assert freqtrade.strategy - assert freqtrade.strategy.dp - assert isinstance(freqtrade.strategy.dp, DataProvider) + assert worker.freqtrade.strategy + assert worker.freqtrade.strategy.dp + assert isinstance(worker.freqtrade.strategy.dp, DataProvider) def test_worker_stopped(mocker, default_conf, caplog) -> None: mock_throttle = MagicMock() - mocker.patch('freqtrade.freqtradebot.FreqtradeBot._throttle', mock_throttle) + mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle) mock_sleep = mocker.patch('time.sleep', return_value=None) - freqtrade = get_patched_freqtradebot(mocker, default_conf) - freqtrade.state = State.STOPPED - state = freqtrade.worker(old_state=State.RUNNING) + worker = get_patched_worker(mocker, default_conf) + worker.state = State.STOPPED + state = worker._worker(old_state=State.RUNNING) assert state is State.STOPPED assert log_has('Changing state to: STOPPED', caplog.record_tuples) assert mock_throttle.call_count == 0 @@ -115,17 +146,17 @@ def test_throttle(mocker, default_conf, caplog) -> None: return 42 caplog.set_level(logging.DEBUG) - freqtrade = get_patched_freqtradebot(mocker, default_conf) + worker = get_patched_worker(mocker, default_conf) start = time.time() - result = freqtrade._throttle(throttled_func, min_secs=0.1) + result = worker._throttle(throttled_func, min_secs=0.1) end = time.time() assert result == 42 assert end - start > 0.1 assert log_has('Throttling throttled_func for 0.10 seconds', caplog.record_tuples) - result = freqtrade._throttle(throttled_func, min_secs=-1) + result = worker._throttle(throttled_func, min_secs=-1) assert result == 42 @@ -133,12 +164,12 @@ def test_throttle_with_assets(mocker, default_conf) -> None: def throttled_func(nb_assets=-1): return nb_assets - freqtrade = get_patched_freqtradebot(mocker, default_conf) + worker = get_patched_worker(mocker, default_conf) - result = freqtrade._throttle(throttled_func, min_secs=0.1, nb_assets=666) + result = worker._throttle(throttled_func, min_secs=0.1, nb_assets=666) assert result == 666 - result = freqtrade._throttle(throttled_func, min_secs=0.1) + result = worker._throttle(throttled_func, min_secs=0.1) assert result == -1 @@ -224,7 +255,7 @@ def test_edge_called_in_process(mocker, edge_conf) -> None: freqtrade = FreqtradeBot(edge_conf) freqtrade.pairlists._validate_whitelist = _refresh_whitelist patch_get_signal(freqtrade) - freqtrade._process() + freqtrade.process() assert freqtrade.active_pair_whitelist == ['NEO/BTC', 'LTC/BTC'] @@ -654,7 +685,7 @@ def test_process_trade_creation(default_conf, ticker, limit_buy_order, trades = Trade.query.filter(Trade.is_open.is_(True)).all() assert not trades - result = freqtrade._process() + result = freqtrade.process() assert result is True trades = Trade.query.filter(Trade.is_open.is_(True)).all() @@ -685,10 +716,10 @@ def test_process_exchange_failures(default_conf, ticker, markets, mocker) -> Non ) sleep_mock = mocker.patch('time.sleep', side_effect=lambda _: None) - freqtrade = FreqtradeBot(default_conf) - patch_get_signal(freqtrade) + worker = Worker(args=None, config=default_conf) + patch_get_signal(worker.freqtrade) - result = freqtrade._process() + result = worker._process() assert result is False assert sleep_mock.has_calls() @@ -702,14 +733,14 @@ def test_process_operational_exception(default_conf, ticker, markets, mocker) -> markets=PropertyMock(return_value=markets), buy=MagicMock(side_effect=OperationalException) ) - freqtrade = FreqtradeBot(default_conf) - patch_get_signal(freqtrade) + worker = Worker(args=None, config=default_conf) + patch_get_signal(worker.freqtrade) - assert freqtrade.state == State.RUNNING + assert worker.state == State.RUNNING - result = freqtrade._process() + result = worker._process() assert result is False - assert freqtrade.state == State.STOPPED + assert worker.state == State.STOPPED assert 'OperationalException' in msg_mock.call_args_list[-1][0][0]['status'] @@ -730,18 +761,18 @@ def test_process_trade_handling( trades = Trade.query.filter(Trade.is_open.is_(True)).all() assert not trades - result = freqtrade._process() + result = freqtrade.process() assert result is True trades = Trade.query.filter(Trade.is_open.is_(True)).all() assert len(trades) == 1 - result = freqtrade._process() + result = freqtrade.process() assert result is False def test_process_trade_no_whitelist_pair( default_conf, ticker, limit_buy_order, markets, fee, mocker) -> None: - """ Test _process with trade not in pair list """ + """ Test process with trade not in pair list """ patch_RPCManager(mocker) patch_exchange(mocker) mocker.patch.multiple( @@ -778,7 +809,7 @@ def test_process_trade_no_whitelist_pair( )) assert pair not in freqtrade.active_pair_whitelist - result = freqtrade._process() + result = freqtrade.process() assert pair in freqtrade.active_pair_whitelist # Make sure each pair is only in the list once assert len(freqtrade.active_pair_whitelist) == len(set(freqtrade.active_pair_whitelist)) @@ -808,7 +839,7 @@ def test_process_informative_pairs_added(default_conf, ticker, markets, mocker) freqtrade.strategy.informative_pairs = inf_pairs # patch_get_signal(freqtrade) - freqtrade._process() + freqtrade.process() assert inf_pairs.call_count == 1 assert refresh_mock.call_count == 1 assert ("BTC/ETH", "1m") in refresh_mock.call_args[0][0] @@ -3056,5 +3087,5 @@ def test_startup_messages(default_conf, mocker): 'config': {'number_assets': 20} } mocker.patch('freqtrade.exchange.Exchange.exchange_has', MagicMock(return_value=True)) - freqtrade = get_patched_freqtradebot(mocker, default_conf) - assert freqtrade.state is State.RUNNING + worker = get_patched_worker(mocker, default_conf) + assert worker.state is State.RUNNING diff --git a/freqtrade/tests/test_main.py b/freqtrade/tests/test_main.py index 51c95a4a9..fc5d2e378 100644 --- a/freqtrade/tests/test_main.py +++ b/freqtrade/tests/test_main.py @@ -7,8 +7,8 @@ import pytest from freqtrade import OperationalException from freqtrade.arguments import Arguments -from freqtrade.freqtradebot import FreqtradeBot -from freqtrade.main import main, reconfigure +from freqtrade.worker import Worker +from freqtrade.main import main from freqtrade.state import State from freqtrade.tests.conftest import log_has, patch_exchange @@ -43,17 +43,14 @@ def test_main_start_hyperopt(mocker) -> None: def test_main_fatal_exception(mocker, default_conf, caplog) -> None: patch_exchange(mocker) - mocker.patch.multiple( - 'freqtrade.freqtradebot.FreqtradeBot', - _init_modules=MagicMock(), - worker=MagicMock(side_effect=Exception), - cleanup=MagicMock(), - ) + mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock()) + mocker.patch('freqtrade.worker.Worker._worker', MagicMock(side_effect=Exception)) mocker.patch( 'freqtrade.configuration.Configuration._load_config_file', lambda *args, **kwargs: default_conf ) mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock()) + mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock()) args = ['-c', 'config.json.example'] @@ -66,17 +63,14 @@ def test_main_fatal_exception(mocker, default_conf, caplog) -> None: def test_main_keyboard_interrupt(mocker, default_conf, caplog) -> None: patch_exchange(mocker) - mocker.patch.multiple( - 'freqtrade.freqtradebot.FreqtradeBot', - _init_modules=MagicMock(), - worker=MagicMock(side_effect=KeyboardInterrupt), - cleanup=MagicMock(), - ) + mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock()) + mocker.patch('freqtrade.worker.Worker._worker', MagicMock(side_effect=KeyboardInterrupt)) mocker.patch( 'freqtrade.configuration.Configuration._load_config_file', lambda *args, **kwargs: default_conf ) mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock()) + mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock()) args = ['-c', 'config.json.example'] @@ -89,17 +83,17 @@ def test_main_keyboard_interrupt(mocker, default_conf, caplog) -> None: def test_main_operational_exception(mocker, default_conf, caplog) -> None: patch_exchange(mocker) - mocker.patch.multiple( - 'freqtrade.freqtradebot.FreqtradeBot', - _init_modules=MagicMock(), - worker=MagicMock(side_effect=OperationalException('Oh snap!')), - cleanup=MagicMock(), + mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock()) + mocker.patch( + 'freqtrade.worker.Worker._worker', + MagicMock(side_effect=OperationalException('Oh snap!')) ) mocker.patch( 'freqtrade.configuration.Configuration._load_config_file', lambda *args, **kwargs: default_conf ) mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock()) + mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock()) args = ['-c', 'config.json.example'] @@ -112,21 +106,18 @@ def test_main_operational_exception(mocker, default_conf, caplog) -> None: def test_main_reload_conf(mocker, default_conf, caplog) -> None: patch_exchange(mocker) - mocker.patch.multiple( - 'freqtrade.freqtradebot.FreqtradeBot', - _init_modules=MagicMock(), - worker=MagicMock(return_value=State.RELOAD_CONF), - cleanup=MagicMock(), - ) + mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock()) + mocker.patch('freqtrade.worker.Worker._worker', MagicMock(return_value=State.RELOAD_CONF)) mocker.patch( 'freqtrade.configuration.Configuration._load_config_file', lambda *args, **kwargs: default_conf ) mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock()) + mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock()) # Raise exception as side effect to avoid endless loop reconfigure_mock = mocker.patch( - 'freqtrade.main.reconfigure', MagicMock(side_effect=Exception) + 'freqtrade.main.Worker._reconfigure', MagicMock(side_effect=Exception) ) with pytest.raises(SystemExit): @@ -138,19 +129,21 @@ def test_main_reload_conf(mocker, default_conf, caplog) -> None: def test_reconfigure(mocker, default_conf) -> None: patch_exchange(mocker) - mocker.patch.multiple( - 'freqtrade.freqtradebot.FreqtradeBot', - _init_modules=MagicMock(), - worker=MagicMock(side_effect=OperationalException('Oh snap!')), - cleanup=MagicMock(), + mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cleanup', MagicMock()) + mocker.patch( + 'freqtrade.worker.Worker._worker', + MagicMock(side_effect=OperationalException('Oh snap!')) ) mocker.patch( 'freqtrade.configuration.Configuration._load_config_file', lambda *args, **kwargs: default_conf ) mocker.patch('freqtrade.freqtradebot.RPCManager', MagicMock()) + mocker.patch('freqtrade.freqtradebot.persistence.init', MagicMock()) - freqtrade = FreqtradeBot(default_conf) + args = Arguments(['-c', 'config.json.example'], '').get_parsed_arg() + worker = Worker(args=args, config=default_conf) + freqtrade = worker.freqtrade # Renew mock to return modified data conf = deepcopy(default_conf) @@ -160,11 +153,10 @@ def test_reconfigure(mocker, default_conf) -> None: lambda *args, **kwargs: conf ) + worker._config = conf # reconfigure should return a new instance - freqtrade2 = reconfigure( - freqtrade, - Arguments(['-c', 'config.json.example'], '').get_parsed_arg() - ) + worker._reconfigure() + freqtrade2 = worker.freqtrade # Verify we have a new instance with the new config assert freqtrade is not freqtrade2 diff --git a/freqtrade/worker.py b/freqtrade/worker.py new file mode 100755 index 000000000..c7afe5c97 --- /dev/null +++ b/freqtrade/worker.py @@ -0,0 +1,188 @@ +""" +Main Freqtrade worker class. +""" +import logging +import time +import traceback +from argparse import Namespace +from typing import Any, Callable, Optional +import sdnotify + +from freqtrade import (constants, OperationalException, TemporaryError, + __version__) +from freqtrade.configuration import Configuration +from freqtrade.freqtradebot import FreqtradeBot +from freqtrade.state import State +from freqtrade.rpc import RPCMessageType + + +logger = logging.getLogger(__name__) + + +class Worker(object): + """ + Freqtradebot worker class + """ + + def __init__(self, args: Namespace, config=None) -> None: + """ + Init all variables and objects the bot needs to work + """ + logger.info('Starting worker %s', __version__) + + self._args = args + self._config = config + self._init(False) + + # Tell systemd that we completed initialization phase + if self._sd_notify: + logger.debug("sd_notify: READY=1") + self._sd_notify.notify("READY=1") + + def _init(self, reconfig: bool): + """ + Also called from the _reconfigure() method (with reconfig=True). + """ + if reconfig or self._config is None: + # Load configuration + self._config = Configuration(self._args, None).get_config() + + # Init the instance of the bot + self.freqtrade = FreqtradeBot(self._config) + + self._throttle_secs = self._config.get('internals', {}).get( + 'process_throttle_secs', + constants.PROCESS_THROTTLE_SECS + ) + + self._sd_notify = sdnotify.SystemdNotifier() if \ + self._config.get('internals', {}).get('sd_notify', False) else None + + @property + def state(self) -> State: + return self.freqtrade.state + + @state.setter + def state(self, value: State): + self.freqtrade.state = value + + def run(self): + state = None + while True: + state = self._worker(old_state=state) + if state == State.RELOAD_CONF: + self.freqtrade = self._reconfigure() + + def _worker(self, old_state: State, throttle_secs: Optional[float] = None) -> State: + """ + Trading routine that must be run at each loop + :param old_state: the previous service state from the previous call + :return: current service state + """ + state = self.freqtrade.state + if throttle_secs is None: + throttle_secs = self._throttle_secs + + # Log state transition + if state != old_state: + self.freqtrade.rpc.send_msg({ + 'type': RPCMessageType.STATUS_NOTIFICATION, + 'status': f'{state.name.lower()}' + }) + logger.info('Changing state to: %s', state.name) + if state == State.RUNNING: + self.freqtrade.rpc.startup_messages(self._config, self.freqtrade.pairlists) + + if state == State.STOPPED: + # Ping systemd watchdog before sleeping in the stopped state + if self._sd_notify: + logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.") + self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.") + + time.sleep(throttle_secs) + + elif state == State.RUNNING: + # Ping systemd watchdog before throttling + if self._sd_notify: + logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.") + self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.") + + self._throttle(func=self._process, min_secs=throttle_secs) + + return state + + def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any: + """ + Throttles the given callable that it + takes at least `min_secs` to finish execution. + :param func: Any callable + :param min_secs: minimum execution time in seconds + :return: Any + """ + start = time.time() + result = func(*args, **kwargs) + end = time.time() + duration = max(min_secs - (end - start), 0.0) + logger.debug('Throttling %s for %.2f seconds', func.__name__, duration) + time.sleep(duration) + return result + + def _process(self) -> bool: + state_changed = False + try: + state_changed = self.freqtrade.process() + except TemporaryError as error: + logger.warning(f"Error: {error}, retrying in {constants.RETRY_TIMEOUT} seconds...") + time.sleep(constants.RETRY_TIMEOUT) + except OperationalException: + tb = traceback.format_exc() + hint = 'Issue `/start` if you think it is safe to restart.' + self.freqtrade.rpc.send_msg({ + 'type': RPCMessageType.STATUS_NOTIFICATION, + 'status': f'OperationalException:\n```\n{tb}```{hint}' + }) + logger.exception('OperationalException. Stopping trader ...') + self.freqtrade.state = State.STOPPED + # TODO: The return value of _process() is not used apart tests + # and should (could) be eliminated later. See PR #1689. +# state_changed = True + return state_changed + + def _reconfigure(self): + """ + Cleans up current freqtradebot instance, reloads the configuration and + replaces it with the new instance + """ + # Tell systemd that we initiated reconfiguration + if self._sd_notify: + logger.debug("sd_notify: RELOADING=1") + self._sd_notify.notify("RELOADING=1") + + # Clean up current freqtrade modules + self.freqtrade.cleanup() + + # Load and validate config and create new instance of the bot + self._init(True) + + self.freqtrade.rpc.send_msg({ + 'type': RPCMessageType.STATUS_NOTIFICATION, + 'status': 'config reloaded' + }) + + # Tell systemd that we completed reconfiguration + if self._sd_notify: + logger.debug("sd_notify: READY=1") + self._sd_notify.notify("READY=1") + + def exit(self): + # Tell systemd that we are exiting now + if self._sd_notify: + logger.debug("sd_notify: STOPPING=1") + self._sd_notify.notify("STOPPING=1") + + if self.freqtrade: + self.freqtrade.rpc.send_msg({ + 'type': RPCMessageType.STATUS_NOTIFICATION, + 'status': 'process died' + }) + self.freqtrade.cleanup()