Merge pull request #1400 from freqtrade/feat/exchange_styling

Feat/exchange styling
This commit is contained in:
Misagh 2018-12-11 19:32:49 +01:00 committed by GitHub
commit 5a7451a823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 62 deletions

View File

@ -144,20 +144,6 @@ class Edge():
self._cached_pairs = self._process_expectancy(trades_df) self._cached_pairs = self._process_expectancy(trades_df)
self._last_updated = arrow.utcnow().timestamp self._last_updated = arrow.utcnow().timestamp
# Not a nice hack but probably simplest solution:
# When backtest load data it loads the delta between disk and exchange
# The problem is that exchange consider that recent.
# it is but it is incomplete (c.f. _async_get_candle_history)
# So it causes get_signal to exit cause incomplete ticker_hist
# A patch to that would be update _pairs_last_refresh_time of exchange
# so it will download again all pairs
# Another solution is to add new data to klines instead of reassigning it:
# self.klines[pair].update(data) instead of self.klines[pair] = data in exchange package.
# But that means indexing timestamp and having a verification so that
# there is no empty range between two timestaps (recently added and last
# one)
self.exchange._pairs_last_refresh_time = {}
return True return True
def stake_amount(self, pair: str, free_capital: float, def stake_amount(self, pair: str, free_capital: float,

View File

@ -64,14 +64,8 @@ def retrier(f):
class Exchange(object): class Exchange(object):
# Current selected exchange
_api: ccxt.Exchange = None
_api_async: ccxt_async.Exchange = None
_conf: Dict = {} _conf: Dict = {}
# Holds all open sell orders for dry_run
_dry_run_open_orders: Dict[str, Any] = {}
def __init__(self, config: dict) -> None: def __init__(self, config: dict) -> None:
""" """
Initializes this module with the given config, Initializes this module with the given config,
@ -89,13 +83,17 @@ class Exchange(object):
# Holds candles # Holds candles
self.klines: Dict[str, Any] = {} self.klines: Dict[str, Any] = {}
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
if config['dry_run']: if config['dry_run']:
logger.info('Instance is running with dry_run enabled') logger.info('Instance is running with dry_run enabled')
exchange_config = config['exchange'] exchange_config = config['exchange']
self._api = self._init_ccxt(exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config')) self._api: ccxt.Exchange = self._init_ccxt(
self._api_async = self._init_ccxt(exchange_config, ccxt_async, exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config'))
ccxt_kwargs=exchange_config.get('ccxt_async_config')) self._api_async: ccxt_async.Exchange = self._init_ccxt(
exchange_config, ccxt_async, ccxt_kwargs=exchange_config.get('ccxt_async_config'))
logger.info('Using Exchange "%s"', self.name) logger.info('Using Exchange "%s"', self.name)
@ -128,12 +126,12 @@ class Exchange(object):
raise OperationalException(f'Exchange {name} is not supported') raise OperationalException(f'Exchange {name} is not supported')
ex_config = { ex_config = {
'apiKey': exchange_config.get('key'), 'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'), 'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'), 'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''), 'uid': exchange_config.get('uid', ''),
'enableRateLimit': exchange_config.get('ccxt_rate_limit', True) 'enableRateLimit': exchange_config.get('ccxt_rate_limit', True)
} }
if ccxt_kwargs: if ccxt_kwargs:
logger.info('Applying additional ccxt config: %s', ccxt_kwargs) logger.info('Applying additional ccxt config: %s', ccxt_kwargs)
ex_config.update(ccxt_kwargs) ex_config.update(ccxt_kwargs)
@ -491,9 +489,9 @@ class Exchange(object):
# Combine tickers # Combine tickers
data: List = [] data: List = []
for tick in tickers: for p, ticker in tickers:
if tick[0] == pair: if p == pair:
data.extend(tick[1]) data.extend(ticker)
# Sort data again after extending the result - above calls return in "async order" order # Sort data again after extending the result - above calls return in "async order" order
data = sorted(data, key=lambda x: x[0]) data = sorted(data, key=lambda x: x[0])
logger.info("downloaded %s with length %s.", pair, len(data)) logger.info("downloaded %s with length %s.", pair, len(data))
@ -501,7 +499,7 @@ class Exchange(object):
def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None: def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None:
""" """
Refresh tickers asyncronously and return the result. Refresh tickers asyncronously and set `klines` of this object with the result
""" """
logger.debug("Refreshing klines for %d pairs", len(pair_list)) logger.debug("Refreshing klines for %d pairs", len(pair_list))
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
@ -510,9 +508,27 @@ class Exchange(object):
async def async_get_candles_history(self, pairs: List[str], async def async_get_candles_history(self, pairs: List[str],
tick_interval: str) -> List[Tuple[str, List]]: tick_interval: str) -> List[Tuple[str, List]]:
"""Download ohlcv history for pair-list asyncronously """ """Download ohlcv history for pair-list asyncronously """
input_coroutines = [self._async_get_candle_history( # Calculating ticker interval in second
symbol, tick_interval) for symbol in pairs] interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
input_coroutines = []
# Gather corotines to run
for pair in pairs:
if not (self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp and pair in self.klines):
input_coroutines.append(self._async_get_candle_history(pair, tick_interval))
else:
logger.debug("Using cached klines data for %s ...", pair)
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True) tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
# handle caching
for pair, ticks in tickers:
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[pair] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
self.klines[pair] = ticks
return tickers return tickers
@retrier_async @retrier_async
@ -522,20 +538,8 @@ class Exchange(object):
# fetch ohlcv asynchronously # fetch ohlcv asynchronously
logger.debug("fetching %s since %s ...", pair, since_ms) logger.debug("fetching %s since %s ...", pair, since_ms)
# Calculating ticker interval in second data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 since=since_ms)
# If (last update time) + (interval in second) is greater or equal than now
# that means we don't have to hit the API as there is no new candle
# so we fetch it from local cache
if (not since_ms and
self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp):
data = self.klines[pair]
logger.debug("Using cached klines data for %s ...", pair)
else:
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
since=since_ms)
# Because some exchange sort Tickers ASC and other DESC. # Because some exchange sort Tickers ASC and other DESC.
# Ex: Bittrex returns a list of tickers ASC (oldest first, newest last) # Ex: Bittrex returns a list of tickers ASC (oldest first, newest last)
@ -544,13 +548,6 @@ class Exchange(object):
if data and data[0][0] > data[-1][0]: if data and data[0][0] > data[-1][0]:
data = sorted(data, key=lambda x: x[0]) data = sorted(data, key=lambda x: x[0])
# keeping last candle time as last refreshed time of the pair
if data:
self._pairs_last_refresh_time[pair] = data[-1][0] // 1000
# keeping candles in cache
self.klines[pair] = data
logger.debug("done fetching %s ...", pair) logger.debug("done fetching %s ...", pair)
return pair, data return pair, data

View File

@ -737,7 +737,7 @@ def test_get_history(default_conf, mocker, caplog):
def test_refresh_tickers(mocker, default_conf, caplog) -> None: def test_refresh_tickers(mocker, default_conf, caplog) -> None:
tick = [ tick = [
[ [
1511686200000, # unix timestamp ms arrow.utcnow().timestamp * 1000, # unix timestamp ms
1, # open 1, # open
2, # high 2, # high
3, # low 3, # low
@ -757,9 +757,16 @@ def test_refresh_tickers(mocker, default_conf, caplog) -> None:
assert log_has(f'Refreshing klines for {len(pairs)} pairs', caplog.record_tuples) assert log_has(f'Refreshing klines for {len(pairs)} pairs', caplog.record_tuples)
assert exchange.klines assert exchange.klines
assert exchange._api_async.fetch_ohlcv.call_count == 2
for pair in pairs: for pair in pairs:
assert exchange.klines[pair] assert exchange.klines[pair]
# test caching
exchange.refresh_tickers(['IOTA/ETH', 'XRP/ETH'], '5m')
assert exchange._api_async.fetch_ohlcv.call_count == 2
assert log_has(f"Using cached klines data for {pairs[0]} ...", caplog.record_tuples)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test__async_get_candle_history(default_conf, mocker, caplog): async def test__async_get_candle_history(default_conf, mocker, caplog):
@ -788,10 +795,6 @@ async def test__async_get_candle_history(default_conf, mocker, caplog):
assert res[1] == tick assert res[1] == tick
assert exchange._api_async.fetch_ohlcv.call_count == 1 assert exchange._api_async.fetch_ohlcv.call_count == 1
assert not log_has(f"Using cached klines data for {pair} ...", caplog.record_tuples) assert not log_has(f"Using cached klines data for {pair} ...", caplog.record_tuples)
# test caching
res = await exchange._async_get_candle_history(pair, "5m")
assert exchange._api_async.fetch_ohlcv.call_count == 1
assert log_has(f"Using cached klines data for {pair} ...", caplog.record_tuples)
# exchange = Exchange(default_conf) # exchange = Exchange(default_conf)
await async_ccxt_exception(mocker, default_conf, MagicMock(), await async_ccxt_exception(mocker, default_conf, MagicMock(),

View File

@ -283,13 +283,15 @@ def test_download_pairs_exception(ticker_history, mocker, caplog, default_conf)
def test_download_backtesting_testdata(ticker_history, mocker, default_conf) -> None: def test_download_backtesting_testdata(ticker_history, mocker, default_conf) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
exchange = get_patched_exchange(mocker, default_conf) exchange = get_patched_exchange(mocker, default_conf)
# Tst that pairs-cached is not touched.
assert not exchange._pairs_last_refresh_time
# Download a 1 min ticker file # Download a 1 min ticker file
file1 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'XEL_BTC-1m.json') file1 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'XEL_BTC-1m.json')
_backup_file(file1) _backup_file(file1)
download_backtesting_testdata(None, exchange, pair="XEL/BTC", tick_interval='1m') download_backtesting_testdata(None, exchange, pair="XEL/BTC", tick_interval='1m')
assert os.path.isfile(file1) is True assert os.path.isfile(file1) is True
_clean_test_file(file1) _clean_test_file(file1)
assert not exchange._pairs_last_refresh_time
# Download a 5 min ticker file # Download a 5 min ticker file
file2 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'STORJ_BTC-5m.json') file2 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'STORJ_BTC-5m.json')
@ -298,6 +300,7 @@ def test_download_backtesting_testdata(ticker_history, mocker, default_conf) ->
download_backtesting_testdata(None, exchange, pair="STORJ/BTC", tick_interval='5m') download_backtesting_testdata(None, exchange, pair="STORJ/BTC", tick_interval='5m')
assert os.path.isfile(file2) is True assert os.path.isfile(file2) is True
_clean_test_file(file2) _clean_test_file(file2)
assert not exchange._pairs_last_refresh_time
def test_download_backtesting_testdata2(mocker, default_conf) -> None: def test_download_backtesting_testdata2(mocker, default_conf) -> None: