add feature multithreading for create trade with Dask distributed

This commit is contained in:
Axel-CH 2018-09-02 00:11:47 +02:00
parent 77935b833b
commit 15d33c1478
4 changed files with 84 additions and 16 deletions

View File

@ -12,9 +12,10 @@ from typing import Any, Callable, Dict, List, Optional
import arrow
import requests
from cachetools import TTLCache, cached
from dask.distributed import as_completed
from freqtrade import (DependencyException, OperationalException,
TemporaryError, __version__, constants, persistence)
TemporaryError, __version__, constants, persistence, multithreading)
from freqtrade.exchange import Exchange
from freqtrade.persistence import Trade
from freqtrade.rpc import RPCManager, RPCMessageType
@ -64,6 +65,10 @@ class FreqtradeBot(object):
persistence.init(self.config)
if self.config['multithreading']['enabled']:
global client
client = multithreading.init(self.config)
# Set initial application state
initial_state = self.config.get('initial_state')
@ -359,6 +364,12 @@ class FreqtradeBot(object):
# it should not be more than 50%
amount_reserve_percent = max(amount_reserve_percent, 0.5)
return min(min_stake_amounts)/amount_reserve_percent
def get_history_and_signal(self, pair, interval):
history = self.exchange.get_candle_history(pair, interval)
signal = self.strategy.get_signal(pair, interval, history)
result = {'signal': signal, 'pair': pair}
return result
def create_trade(self) -> bool:
"""
@ -387,22 +398,46 @@ class FreqtradeBot(object):
if not whitelist:
raise DependencyException('No currency pairs in whitelist')
# Pick pair based on buy signals
for _pair in whitelist:
thistory = self.exchange.get_candle_history(_pair, interval)
(buy, sell) = self.strategy.get_signal(_pair, interval, thistory)
if buy and not sell:
bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\
get('check_depth_of_market', {})
if (bidstrat_check_depth_of_market.get('enabled', False)) and\
(bidstrat_check_depth_of_market.get('bids_to_ask_delta', 0) > 0):
if self._check_depth_of_market_buy(_pair, bidstrat_check_depth_of_market):
return self.execute_buy(_pair, stake_amount)
if self.config['multithreading']['enabled']: # Multithreaded LOOP
loop_start_time = time.time()
signals = client.map(self.get_history_and_signal, whitelist, interval=interval)
for future, result in as_completed(signals, with_results=True):
(buy, sell) = result['signal']
_pair = result['pair']
if buy and not sell:
bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\
get('check_depth_of_market', {})
if (bidstrat_check_depth_of_market.get('enabled', False)) and\
(bidstrat_check_depth_of_market.get('bids_to_ask_delta', 0) > 0):
if self._check_depth_of_market_buy(_pair, bidstrat_check_depth_of_market):
self.execute_buy(_pair, stake_amount)
else:
return False
return self.execute_buy(_pair, stake_amount)
return False
self.execute_buy(_pair, stake_amount)
client.restart() # restart workers to prevent side effects of memory leaks
total_time = (time.time() - loop_start_time)
pairs_number = len(whitelist)
print("Done gathering histories and checking signals for " + str(pairs_number) + " \
pairs of whitelist in " + str(total_time) + " seconds")
else: # Synchronous single threaded LOOP
# Pick pair based on buy signals
for _pair in whitelist:
thistory = self.exchange.get_candle_history(_pair, interval)
(buy, sell) = self.strategy.get_signal(_pair, interval, thistory)
if buy and not sell:
bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\
get('check_depth_of_market', {})
if (bidstrat_check_depth_of_market.get('enabled', False)) and\
(bidstrat_check_depth_of_market.get('bids_to_ask_delta', 0) > 0):
if self._check_depth_of_market_buy(_pair, bidstrat_check_depth_of_market):
return self.execute_buy(_pair, stake_amount)
else:
return False
return self.execute_buy(_pair, stake_amount)
return False
def _check_depth_of_market_buy(self, pair: str, conf: Dict) -> bool:
"""

View File

@ -0,0 +1,18 @@
"""
This module contains the class to use the dask module
"""
from typing import Dict
from dask.distributed import Client
def init(config: Dict) -> object:
"""
Initialise Dask Distributed Client
"""
workers_number = config['multithreading']['workers_number']
client = Client(processes=False, threads_per_worker=1, n_workers=workers_number)
return client

View File

@ -127,6 +127,10 @@ def default_conf():
"NEO/BTC"
]
},
"multithreading": {
"enabled": False,
"workers_number": 4
},
"telegram": {
"enabled": True,
"token": "token",

View File

@ -0,0 +1,11 @@
from freqtrade import (multithreading)
def test_client_init():
# Check if create a Dask Client
default_conf = {}
default_conf.update({
'multithreading': {'use_multithreading': True, 'workers_number': 4}
})
client = multithreading.init(default_conf)
assert (str(type(client))) == "<class 'distributed.client.Client'>"