move worker stuff to main.py
This commit is contained in:
		| @@ -7,11 +7,10 @@ import logging | |||||||
| import time | import time | ||||||
| import traceback | import traceback | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from typing import Any, Callable, Dict, List, Optional, Tuple | from typing import Any, Dict, List, Optional, Tuple | ||||||
|  |  | ||||||
| import arrow | import arrow | ||||||
| from requests.exceptions import RequestException | from requests.exceptions import RequestException | ||||||
| import sdnotify |  | ||||||
|  |  | ||||||
| from freqtrade import (DependencyException, OperationalException, | from freqtrade import (DependencyException, OperationalException, | ||||||
|                        TemporaryError, __version__, constants, persistence) |                        TemporaryError, __version__, constants, persistence) | ||||||
| @@ -24,6 +23,7 @@ from freqtrade.resolvers import ExchangeResolver, StrategyResolver, PairListReso | |||||||
| from freqtrade.state import State | from freqtrade.state import State | ||||||
| from freqtrade.strategy.interface import SellType, IStrategy | from freqtrade.strategy.interface import SellType, IStrategy | ||||||
| from freqtrade.wallets import Wallets | from freqtrade.wallets import Wallets | ||||||
|  | from freqtrade.main import Worker | ||||||
|  |  | ||||||
|  |  | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
| @@ -35,26 +35,18 @@ class FreqtradeBot(object): | |||||||
|     This is from here the bot start its logic. |     This is from here the bot start its logic. | ||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     def __init__(self, config: Dict[str, Any]) -> None: |     def __init__(self, config: Dict[str, Any], worker: Worker) -> None: | ||||||
|         """ |         """ | ||||||
|         Init all variables and objects the bot needs to work |         Init all variables and objects the bot needs to work | ||||||
|         :param config: configuration dict, you can use Configuration.get_config() |         :param config: configuration dict, you can use Configuration.get_config() | ||||||
|         to get the config dict. |         to get the config dict. | ||||||
|         """ |         """ | ||||||
|  |  | ||||||
|         logger.info( |         logger.info('Starting freqtrade %s', __version__) | ||||||
|             'Starting freqtrade %s', |  | ||||||
|             __version__, |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|         # Init bot states |  | ||||||
|         self.state = State.STOPPED |  | ||||||
|  |  | ||||||
|         # Init objects |         # Init objects | ||||||
|         self.config = config |         self.config = config | ||||||
|  |         self._worker: Worker = worker | ||||||
|         self._sd_notify = sdnotify.SystemdNotifier() if \ |  | ||||||
|             self.config.get('internals', {}).get('sd_notify', False) else None |  | ||||||
|  |  | ||||||
|         self.strategy: IStrategy = StrategyResolver(self.config).strategy |         self.strategy: IStrategy = StrategyResolver(self.config).strategy | ||||||
|  |  | ||||||
| @@ -79,29 +71,16 @@ class FreqtradeBot(object): | |||||||
|             self.config.get('edge', {}).get('enabled', False) else None |             self.config.get('edge', {}).get('enabled', False) else None | ||||||
|  |  | ||||||
|         self.active_pair_whitelist: List[str] = self.config['exchange']['pair_whitelist'] |         self.active_pair_whitelist: List[str] = self.config['exchange']['pair_whitelist'] | ||||||
|         self._init_modules() |  | ||||||
|  |  | ||||||
|         # Tell the systemd that we completed initialization phase |  | ||||||
|         if self._sd_notify: |  | ||||||
|             logger.debug("sd_notify: READY=1") |  | ||||||
|             self._sd_notify.notify("READY=1") |  | ||||||
|  |  | ||||||
|     def _init_modules(self) -> None: |  | ||||||
|         """ |  | ||||||
|         Initializes all modules and updates the config |  | ||||||
|         :return: None |  | ||||||
|         """ |  | ||||||
|         # Initialize all modules |  | ||||||
|  |  | ||||||
|         persistence.init(self.config) |         persistence.init(self.config) | ||||||
|  |  | ||||||
|         # Set initial application state |     @property | ||||||
|         initial_state = self.config.get('initial_state') |     def state(self) -> State: | ||||||
|  |         return self._worker.state | ||||||
|  |  | ||||||
|         if initial_state: |     @state.setter | ||||||
|             self.state = State[initial_state.upper()] |     def state(self, value: State): | ||||||
|         else: |         self._worker.state = value | ||||||
|             self.state = State.STOPPED |  | ||||||
|  |  | ||||||
|     def cleanup(self) -> None: |     def cleanup(self) -> None: | ||||||
|         """ |         """ | ||||||
| @@ -113,75 +92,7 @@ class FreqtradeBot(object): | |||||||
|         self.rpc.cleanup() |         self.rpc.cleanup() | ||||||
|         persistence.cleanup() |         persistence.cleanup() | ||||||
|  |  | ||||||
|     def stopping(self) -> None: |     def process(self) -> bool: | ||||||
|         # Tell systemd that we are exiting now |  | ||||||
|         if self._sd_notify: |  | ||||||
|             logger.debug("sd_notify: STOPPING=1") |  | ||||||
|             self._sd_notify.notify("STOPPING=1") |  | ||||||
|  |  | ||||||
|     def reconfigure(self) -> None: |  | ||||||
|         # Tell systemd that we initiated reconfiguring |  | ||||||
|         if self._sd_notify: |  | ||||||
|             logger.debug("sd_notify: RELOADING=1") |  | ||||||
|             self._sd_notify.notify("RELOADING=1") |  | ||||||
|  |  | ||||||
|     def worker(self, old_state: State = None) -> State: |  | ||||||
|         """ |  | ||||||
|         Trading routine that must be run at each loop |  | ||||||
|         :param old_state: the previous service state from the previous call |  | ||||||
|         :return: current service state |  | ||||||
|         """ |  | ||||||
|         # Log state transition |  | ||||||
|         state = self.state |  | ||||||
|         if state != old_state: |  | ||||||
|             self.rpc.send_msg({ |  | ||||||
|                 'type': RPCMessageType.STATUS_NOTIFICATION, |  | ||||||
|                 'status': f'{state.name.lower()}' |  | ||||||
|             }) |  | ||||||
|             logger.info('Changing state to: %s', state.name) |  | ||||||
|             if state == State.RUNNING: |  | ||||||
|                 self.rpc.startup_messages(self.config, self.pairlists) |  | ||||||
|  |  | ||||||
|         throttle_secs = self.config.get('internals', {}).get( |  | ||||||
|             'process_throttle_secs', |  | ||||||
|             constants.PROCESS_THROTTLE_SECS |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|         if state == State.STOPPED: |  | ||||||
|             # Ping systemd watchdog before sleeping in the stopped state |  | ||||||
|             if self._sd_notify: |  | ||||||
|                 logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.") |  | ||||||
|                 self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.") |  | ||||||
|  |  | ||||||
|             time.sleep(throttle_secs) |  | ||||||
|  |  | ||||||
|         elif state == State.RUNNING: |  | ||||||
|             # Ping systemd watchdog before throttling |  | ||||||
|             if self._sd_notify: |  | ||||||
|                 logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.") |  | ||||||
|                 self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.") |  | ||||||
|  |  | ||||||
|             self._throttle(func=self._process, min_secs=throttle_secs) |  | ||||||
|  |  | ||||||
|         return state |  | ||||||
|  |  | ||||||
|     def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any: |  | ||||||
|         """ |  | ||||||
|         Throttles the given callable that it |  | ||||||
|         takes at least `min_secs` to finish execution. |  | ||||||
|         :param func: Any callable |  | ||||||
|         :param min_secs: minimum execution time in seconds |  | ||||||
|         :return: Any |  | ||||||
|         """ |  | ||||||
|         start = time.time() |  | ||||||
|         result = func(*args, **kwargs) |  | ||||||
|         end = time.time() |  | ||||||
|         duration = max(min_secs - (end - start), 0.0) |  | ||||||
|         logger.debug('Throttling %s for %.2f seconds', func.__name__, duration) |  | ||||||
|         time.sleep(duration) |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     def _process(self) -> bool: |  | ||||||
|         """ |         """ | ||||||
|         Queries the persistence layer for open trades and handles them, |         Queries the persistence layer for open trades and handles them, | ||||||
|         otherwise a new trade is created. |         otherwise a new trade is created. | ||||||
|   | |||||||
| @@ -5,10 +5,12 @@ Read the documentation to know what cli arguments you need. | |||||||
| """ | """ | ||||||
| import logging | import logging | ||||||
| import sys | import sys | ||||||
|  | import time | ||||||
| from argparse import Namespace | from argparse import Namespace | ||||||
| from typing import List | from typing import Any, Callable, List | ||||||
|  | import sdnotify | ||||||
|  |  | ||||||
| from freqtrade import OperationalException | from freqtrade import (constants, OperationalException, __version__) | ||||||
| from freqtrade.arguments import Arguments | from freqtrade.arguments import Arguments | ||||||
| from freqtrade.configuration import Configuration, set_loggers | from freqtrade.configuration import Configuration, set_loggers | ||||||
| from freqtrade.freqtradebot import FreqtradeBot | from freqtrade.freqtradebot import FreqtradeBot | ||||||
| @@ -35,20 +37,11 @@ def main(sysargv: List[str]) -> None: | |||||||
|         args.func(args) |         args.func(args) | ||||||
|         return |         return | ||||||
|  |  | ||||||
|     freqtrade = None |  | ||||||
|     return_code = 1 |     return_code = 1 | ||||||
|     try: |     try: | ||||||
|         # Load and validate configuration |         # Load and run worker | ||||||
|         config = Configuration(args, None).get_config() |         worker = Worker(args) | ||||||
|  |         worker.run() | ||||||
|         # Init the bot |  | ||||||
|         freqtrade = FreqtradeBot(config) |  | ||||||
|  |  | ||||||
|         state = None |  | ||||||
|         while True: |  | ||||||
|             state = freqtrade.worker(old_state=state) |  | ||||||
|             if state == State.RELOAD_CONF: |  | ||||||
|                 freqtrade = reconfigure(freqtrade, args) |  | ||||||
|  |  | ||||||
|     except KeyboardInterrupt: |     except KeyboardInterrupt: | ||||||
|         logger.info('SIGINT received, aborting ...') |         logger.info('SIGINT received, aborting ...') | ||||||
| @@ -59,32 +52,163 @@ def main(sysargv: List[str]) -> None: | |||||||
|     except BaseException: |     except BaseException: | ||||||
|         logger.exception('Fatal exception!') |         logger.exception('Fatal exception!') | ||||||
|     finally: |     finally: | ||||||
|         if freqtrade: |         if worker is not None: | ||||||
|             freqtrade.stopping() |             worker.exit() | ||||||
|             freqtrade.rpc.send_msg({ |  | ||||||
|                 'type': RPCMessageType.STATUS_NOTIFICATION, |  | ||||||
|                 'status': 'process died' |  | ||||||
|             }) |  | ||||||
|             freqtrade.cleanup() |  | ||||||
|         sys.exit(return_code) |         sys.exit(return_code) | ||||||
|  |  | ||||||
|  |  | ||||||
| def reconfigure(freqtrade: FreqtradeBot, args: Namespace) -> FreqtradeBot: | class Worker(object): | ||||||
|     """ |     """ | ||||||
|     Cleans up current instance, reloads the configuration and returns the new instance |     Freqtradebot worker class | ||||||
|     """ |     """ | ||||||
|     freqtrade.reconfigure() |  | ||||||
|  |  | ||||||
|     # Clean up current modules |     def __init__(self, args: Namespace) -> None: | ||||||
|     freqtrade.cleanup() |         """ | ||||||
|  |         Init all variables and objects the bot needs to work | ||||||
|  |         """ | ||||||
|  |         logger.info('Starting worker %s', __version__) | ||||||
|  |  | ||||||
|     # Create new instance |         self._args = args | ||||||
|     freqtrade = FreqtradeBot(Configuration(args, None).get_config()) |         self._init() | ||||||
|     freqtrade.rpc.send_msg({ |  | ||||||
|  |         # Tell systemd that we completed initialization phase | ||||||
|  |         if self._sd_notify: | ||||||
|  |             logger.debug("sd_notify: READY=1") | ||||||
|  |             self._sd_notify.notify("READY=1") | ||||||
|  |  | ||||||
|  |     def _init(self): | ||||||
|  |         """ | ||||||
|  |         Also called from the _reconfigure() method. | ||||||
|  |         """ | ||||||
|  |         # Load configuration | ||||||
|  |         self._config = Configuration(self._args, None).get_config() | ||||||
|  |  | ||||||
|  |         # Init the instance of the bot | ||||||
|  |         self.freqtrade = FreqtradeBot(self._config, self) | ||||||
|  |  | ||||||
|  |         # Set initial bot state | ||||||
|  |         initial_state = self._config.get('initial_state') | ||||||
|  |         if initial_state: | ||||||
|  |             self._state = State[initial_state.upper()] | ||||||
|  |         else: | ||||||
|  |             self._state = State.STOPPED | ||||||
|  |  | ||||||
|  |         self._throttle_secs = self._config.get('internals', {}).get( | ||||||
|  |             'process_throttle_secs', | ||||||
|  |             constants.PROCESS_THROTTLE_SECS | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |         self._sd_notify = sdnotify.SystemdNotifier() if \ | ||||||
|  |             self._config.get('internals', {}).get('sd_notify', False) else None | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def state(self) -> State: | ||||||
|  |         return self._state | ||||||
|  |  | ||||||
|  |     @state.setter | ||||||
|  |     def state(self, value: State): | ||||||
|  |         self._state = value | ||||||
|  |  | ||||||
|  |     def run(self): | ||||||
|  |         state = None | ||||||
|  |         while True: | ||||||
|  |             state = self._worker(old_state=state, throttle_secs=self._throttle_secs) | ||||||
|  |             if state == State.RELOAD_CONF: | ||||||
|  |                 self.freqtrade = self._reconfigure() | ||||||
|  |  | ||||||
|  |     def _worker(self, old_state: State, throttle_secs: float) -> State: | ||||||
|  |         """ | ||||||
|  |         Trading routine that must be run at each loop | ||||||
|  |         :param old_state: the previous service state from the previous call | ||||||
|  |         :return: current service state | ||||||
|  |         """ | ||||||
|  |         state = self._state | ||||||
|  |  | ||||||
|  |         # Log state transition | ||||||
|  |         if state != old_state: | ||||||
|  |             self.freqtrade.rpc.send_msg({ | ||||||
|  |                 'type': RPCMessageType.STATUS_NOTIFICATION, | ||||||
|  |                 'status': f'{state.name.lower()}' | ||||||
|  |             }) | ||||||
|  |             logger.info('Changing state to: %s', state.name) | ||||||
|  |             if state == State.RUNNING: | ||||||
|  |                 self.freqtrade.rpc.startup_messages(self._config, self.freqtrade.pairlists) | ||||||
|  |  | ||||||
|  |         if state == State.STOPPED: | ||||||
|  |             # Ping systemd watchdog before sleeping in the stopped state | ||||||
|  |             if self._sd_notify: | ||||||
|  |                 logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.") | ||||||
|  |                 self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.") | ||||||
|  |  | ||||||
|  |             time.sleep(throttle_secs) | ||||||
|  |  | ||||||
|  |         elif state == State.RUNNING: | ||||||
|  |             # Ping systemd watchdog before throttling | ||||||
|  |             if self._sd_notify: | ||||||
|  |                 logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.") | ||||||
|  |                 self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.") | ||||||
|  |  | ||||||
|  |             self._throttle(func=self._process, min_secs=throttle_secs) | ||||||
|  |  | ||||||
|  |         return state | ||||||
|  |  | ||||||
|  |     def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any: | ||||||
|  |         """ | ||||||
|  |         Throttles the given callable that it | ||||||
|  |         takes at least `min_secs` to finish execution. | ||||||
|  |         :param func: Any callable | ||||||
|  |         :param min_secs: minimum execution time in seconds | ||||||
|  |         :return: Any | ||||||
|  |         """ | ||||||
|  |         start = time.time() | ||||||
|  |         result = func(*args, **kwargs) | ||||||
|  |         end = time.time() | ||||||
|  |         duration = max(min_secs - (end - start), 0.0) | ||||||
|  |         logger.debug('Throttling %s for %.2f seconds', func.__name__, duration) | ||||||
|  |         time.sleep(duration) | ||||||
|  |         return result | ||||||
|  |  | ||||||
|  |     def _process(self) -> bool: | ||||||
|  |         return self.freqtrade.process() | ||||||
|  |  | ||||||
|  |     def _reconfigure(self): | ||||||
|  |         """ | ||||||
|  |         Cleans up current freqtradebot instance, reloads the configuration and | ||||||
|  |         returns the new instance | ||||||
|  |         """ | ||||||
|  |         # Tell systemd that we initiated reconfiguration | ||||||
|  |         if self._sd_notify: | ||||||
|  |             logger.debug("sd_notify: RELOADING=1") | ||||||
|  |             self._sd_notify.notify("RELOADING=1") | ||||||
|  |  | ||||||
|  |         # Clean up current freqtrade modules | ||||||
|  |         self.freqtrade.cleanup() | ||||||
|  |  | ||||||
|  |         # Load and validate config and create new instance of the bot | ||||||
|  |         self._init() | ||||||
|  |  | ||||||
|  |         self.freqtrade.rpc.send_msg({ | ||||||
|             'type': RPCMessageType.STATUS_NOTIFICATION, |             'type': RPCMessageType.STATUS_NOTIFICATION, | ||||||
|             'status': 'config reloaded' |             'status': 'config reloaded' | ||||||
|         }) |         }) | ||||||
|     return freqtrade |  | ||||||
|  |         # Tell systemd that we completed reconfiguration | ||||||
|  |         if self._sd_notify: | ||||||
|  |             logger.debug("sd_notify: READY=1") | ||||||
|  |             self._sd_notify.notify("READY=1") | ||||||
|  |  | ||||||
|  |     def exit(self): | ||||||
|  |         # Tell systemd that we are exiting now | ||||||
|  |         if self._sd_notify: | ||||||
|  |             logger.debug("sd_notify: STOPPING=1") | ||||||
|  |             self._sd_notify.notify("STOPPING=1") | ||||||
|  |  | ||||||
|  |         if self.freqtrade: | ||||||
|  |             self.freqtrade.rpc.send_msg({ | ||||||
|  |                 'type': RPCMessageType.STATUS_NOTIFICATION, | ||||||
|  |                 'status': 'process died' | ||||||
|  |             }) | ||||||
|  |             self.freqtrade.cleanup() | ||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user