Merge pull request #1 from xmatthias/ccxt-async-xmatt

some fixes and improvements hopefully
This commit is contained in:
misagh 2018-07-31 21:21:45 +02:00 committed by GitHub
commit b47c5f1d9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 31 deletions

View File

@ -2,7 +2,7 @@
""" Cryptocurrency Exchanges support """
import logging
from random import randint
from typing import List, Dict, Any, Optional
from typing import List, Dict, Tuple, Any, Optional
from datetime import datetime
from math import floor, ceil
@ -95,7 +95,7 @@ class Exchange(object):
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''),
#'enableRateLimit': True,
# 'enableRateLimit': True,
'enableRateLimit': False,
})
except (KeyError, AttributeError):
@ -334,23 +334,23 @@ class Exchange(object):
logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair]
async def async_get_tickers_history(self, pairs, tick_interval):
# COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ?
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
input_coroutines = [self.async_get_ticker_history(symbol, tick_interval) for symbol in pairs]
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
input_coroutines = [self.async_get_ticker_history(
symbol, tick_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
#await self._api_async.close()
# await self._api_async.close()
return tickers
async def async_get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]:
since_ms: Optional[int] = None) -> Tuple[str, List]:
try:
# fetch ohlcv asynchronously
print("fetching %s ..." % pair)
logger.debug("fetching %s ...", pair)
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, since=since_ms)
print("done fetching %s ..." % pair)
logger.debug("done fetching %s ...", pair)
return pair, data
except ccxt.NotSupported as e:
@ -363,6 +363,18 @@ class Exchange(object):
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch ticker data. Msg: {e}')
def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> Dict:
"""
Refresh tickers asyncronously and return the result.
"""
# TODO: maybe add since_ms to use async in the download-script?
# TODO: only refresh once per interval ? *may require this to move to freqtradebot.py
# TODO@ Add tests for this and the async stuff above
logger.debug("Refreshing klines for %d pairs", len(pair_list))
datatups = asyncio.get_event_loop().run_until_complete(
self.async_get_tickers_history(pair_list, ticker_interval))
return {pair: data for (pair, data) in datatups}
@retrier
def get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]:

View File

@ -8,7 +8,6 @@ import time
import traceback
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
import asyncio
import arrow
import requests
@ -56,6 +55,7 @@ class FreqtradeBot(object):
self.persistence = None
self.exchange = Exchange(self.config)
self._init_modules()
self._klines = {}
def _init_modules(self) -> None:
"""
@ -149,6 +149,8 @@ class FreqtradeBot(object):
final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list
self.config['exchange']['pair_whitelist'] = final_list
self._klines = self.exchange.refresh_tickers(final_list, self.strategy.ticker_interval)
# Query trades from persistence layer
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
@ -302,13 +304,7 @@ class FreqtradeBot(object):
amount_reserve_percent += self.strategy.stoploss
# it should not be more than 50%
amount_reserve_percent = max(amount_reserve_percent, 0.5)
return min(min_stake_amounts)/amount_reserve_percent
async def async_get_tickers(self, exchange, pairs):
input_coroutines = [exchange.async_get_ticker_history(symbol, self.strategy.ticker_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
return tickers
#await exchange.close()
return min(min_stake_amounts) / amount_reserve_percent
def create_trade(self) -> bool:
"""
@ -337,17 +333,13 @@ class FreqtradeBot(object):
if not whitelist:
raise DependencyException('No currency pairs in whitelist')
# fetching kline history for all pairs asynchronously and wait till all done
data = asyncio.get_event_loop().run_until_complete(self.exchange.async_get_tickers_history(whitelist, self.strategy.ticker_interval))
# list of pairs having buy signals
buy_pairs = []
# running get_signal on historical data fetched
# to find buy signals
for _pair, thistory in data:
(buy, sell) = self.strategy.get_signal(_pair, interval, thistory)
for _pair in whitelist:
(buy, sell) = self.strategy.get_signal(_pair, interval, self._klines.get(_pair))
if buy and not sell:
buy_pairs.append(_pair)
@ -518,7 +510,8 @@ class FreqtradeBot(object):
(buy, sell) = (False, False)
experimental = self.config.get('experimental', {})
if experimental.get('use_sell_signal') or experimental.get('ignore_roi_if_buy_signal'):
ticker = self.exchange.get_ticker_history(trade.pair, self.strategy.ticker_interval)
# ticker = self.exchange.get_ticker_history(trade.pair, self.strategy.ticker_interval)
ticker = self._klines.get(trade.pair)
(buy, sell) = self.strategy.get_signal(trade.pair, self.strategy.ticker_interval,
ticker)

View File

@ -44,6 +44,7 @@ def patch_get_signal(freqtrade: FreqtradeBot, value=(True, False)) -> None:
"""
freqtrade.strategy.get_signal = lambda e, s, t: value
freqtrade.exchange.get_ticker_history = lambda p, i: None
freqtrade.exchange.refresh_tickers = lambda pl, i: {}
def patch_RPCManager(mocker) -> MagicMock: