From 15d33c147866987903edd63e9243ea25cdf1b30f Mon Sep 17 00:00:00 2001 From: Axel-CH Date: Sun, 2 Sep 2018 00:11:47 +0200 Subject: [PATCH] add feature multithreading for create trade with Dask distributed --- freqtrade/freqtradebot.py | 67 ++++++++++++++++++++------ freqtrade/multithreading.py | 18 +++++++ freqtrade/tests/conftest.py | 4 ++ freqtrade/tests/test_multithreading.py | 11 +++++ 4 files changed, 84 insertions(+), 16 deletions(-) create mode 100644 freqtrade/multithreading.py create mode 100644 freqtrade/tests/test_multithreading.py diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 3d3737372..3dbb8cc9a 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -12,9 +12,10 @@ from typing import Any, Callable, Dict, List, Optional import arrow import requests from cachetools import TTLCache, cached +from dask.distributed import as_completed from freqtrade import (DependencyException, OperationalException, - TemporaryError, __version__, constants, persistence) + TemporaryError, __version__, constants, persistence, multithreading) from freqtrade.exchange import Exchange from freqtrade.persistence import Trade from freqtrade.rpc import RPCManager, RPCMessageType @@ -64,6 +65,10 @@ class FreqtradeBot(object): persistence.init(self.config) + if self.config['multithreading']['enabled']: + global client + client = multithreading.init(self.config) + # Set initial application state initial_state = self.config.get('initial_state') @@ -359,6 +364,12 @@ class FreqtradeBot(object): # it should not be more than 50% amount_reserve_percent = max(amount_reserve_percent, 0.5) return min(min_stake_amounts)/amount_reserve_percent + + def get_history_and_signal(self, pair, interval): + history = self.exchange.get_candle_history(pair, interval) + signal = self.strategy.get_signal(pair, interval, history) + result = {'signal': signal, 'pair': pair} + return result def create_trade(self) -> bool: """ @@ -387,22 +398,46 @@ class FreqtradeBot(object): if not whitelist: raise DependencyException('No currency pairs in whitelist') - # Pick pair based on buy signals - for _pair in whitelist: - thistory = self.exchange.get_candle_history(_pair, interval) - (buy, sell) = self.strategy.get_signal(_pair, interval, thistory) - - if buy and not sell: - bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\ - get('check_depth_of_market', {}) - if (bidstrat_check_depth_of_market.get('enabled', False)) and\ - (bidstrat_check_depth_of_market.get('bids_to_ask_delta', 0) > 0): - if self._check_depth_of_market_buy(_pair, bidstrat_check_depth_of_market): - return self.execute_buy(_pair, stake_amount) + if self.config['multithreading']['enabled']: # Multithreaded LOOP + loop_start_time = time.time() + signals = client.map(self.get_history_and_signal, whitelist, interval=interval) + for future, result in as_completed(signals, with_results=True): + (buy, sell) = result['signal'] + _pair = result['pair'] + if buy and not sell: + bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\ + get('check_depth_of_market', {}) + if (bidstrat_check_depth_of_market.get('enabled', False)) and\ + (bidstrat_check_depth_of_market.get('bids_to_ask_delta', 0) > 0): + if self._check_depth_of_market_buy(_pair, bidstrat_check_depth_of_market): + self.execute_buy(_pair, stake_amount) else: - return False - return self.execute_buy(_pair, stake_amount) - return False + self.execute_buy(_pair, stake_amount) + + client.restart() # restart workers to prevent side effects of memory leaks + + total_time = (time.time() - loop_start_time) + pairs_number = len(whitelist) + print("Done gathering histories and checking signals for " + str(pairs_number) + " \ + pairs of whitelist in " + str(total_time) + " seconds") + + else: # Synchronous single threaded LOOP + # Pick pair based on buy signals + for _pair in whitelist: + thistory = self.exchange.get_candle_history(_pair, interval) + (buy, sell) = self.strategy.get_signal(_pair, interval, thistory) + + if buy and not sell: + bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\ + get('check_depth_of_market', {}) + if (bidstrat_check_depth_of_market.get('enabled', False)) and\ + (bidstrat_check_depth_of_market.get('bids_to_ask_delta', 0) > 0): + if self._check_depth_of_market_buy(_pair, bidstrat_check_depth_of_market): + return self.execute_buy(_pair, stake_amount) + else: + return False + return self.execute_buy(_pair, stake_amount) + return False def _check_depth_of_market_buy(self, pair: str, conf: Dict) -> bool: """ diff --git a/freqtrade/multithreading.py b/freqtrade/multithreading.py new file mode 100644 index 000000000..f8d815ec6 --- /dev/null +++ b/freqtrade/multithreading.py @@ -0,0 +1,18 @@ +""" +This module contains the class to use the dask module +""" + +from typing import Dict +from dask.distributed import Client + + +def init(config: Dict) -> object: + """ + Initialise Dask Distributed Client + """ + workers_number = config['multithreading']['workers_number'] + client = Client(processes=False, threads_per_worker=1, n_workers=workers_number) + + return client + + diff --git a/freqtrade/tests/conftest.py b/freqtrade/tests/conftest.py index af9062cab..3d861ce6e 100644 --- a/freqtrade/tests/conftest.py +++ b/freqtrade/tests/conftest.py @@ -127,6 +127,10 @@ def default_conf(): "NEO/BTC" ] }, + "multithreading": { + "enabled": False, + "workers_number": 4 + }, "telegram": { "enabled": True, "token": "token", diff --git a/freqtrade/tests/test_multithreading.py b/freqtrade/tests/test_multithreading.py new file mode 100644 index 000000000..4eafa5684 --- /dev/null +++ b/freqtrade/tests/test_multithreading.py @@ -0,0 +1,11 @@ +from freqtrade import (multithreading) + + +def test_client_init(): + # Check if create a Dask Client + default_conf = {} + default_conf.update({ + 'multithreading': {'use_multithreading': True, 'workers_number': 4} + }) + client = multithreading.init(default_conf) + assert (str(type(client))) == ""