Refactor async-refresh to it's own function

This commit is contained in:
Matthias 2018-07-31 20:43:32 +02:00
parent a486b1d01c
commit 31870abd25
2 changed files with 21 additions and 28 deletions

View File

@ -2,7 +2,7 @@
""" Cryptocurrency Exchanges support """ """ Cryptocurrency Exchanges support """
import logging import logging
from random import randint from random import randint
from typing import List, Dict, Any, Optional from typing import List, Dict, Tuple, Any, Optional
from datetime import datetime from datetime import datetime
from math import floor, ceil from math import floor, ceil
@ -334,23 +334,23 @@ class Exchange(object):
logger.info("returning cached ticker-data for %s", pair) logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair] return self._cached_ticker[pair]
async def async_get_tickers_history(self, pairs, tick_interval): async def async_get_tickers_history(self, pairs, tick_interval):
# COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ? # COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ?
# loop = asyncio.new_event_loop() # loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop) # asyncio.set_event_loop(loop)
input_coroutines = [self.async_get_ticker_history(symbol, tick_interval) for symbol in pairs] input_coroutines = [self.async_get_ticker_history(
symbol, tick_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True) tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
# await self._api_async.close() # await self._api_async.close()
return tickers return tickers
async def async_get_ticker_history(self, pair: str, tick_interval: str, async def async_get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]: since_ms: Optional[int] = None) -> Tuple[str, List]:
try: try:
# fetch ohlcv asynchronously # fetch ohlcv asynchronously
print("fetching %s ..." % pair) logger.debug("fetching %s ...", pair)
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, since=since_ms) data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, since=since_ms)
print("done fetching %s ..." % pair) logger.debug("done fetching %s ...", pair)
return pair, data return pair, data
except ccxt.NotSupported as e: except ccxt.NotSupported as e:
@ -363,6 +363,12 @@ class Exchange(object):
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch ticker data. Msg: {e}') raise OperationalException(f'Could not fetch ticker data. Msg: {e}')
def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> Dict:
logger.debug("Refreshing klines for %d pairs", len(pair_list))
datatups = asyncio.get_event_loop().run_until_complete(
self.async_get_tickers_history(pair_list, ticker_interval))
return {pair: data for (pair, data) in datatups}
@retrier @retrier
def get_ticker_history(self, pair: str, tick_interval: str, def get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]: since_ms: Optional[int] = None) -> List[Dict]:

View File

@ -8,7 +8,6 @@ import time
import traceback import traceback
from datetime import datetime from datetime import datetime
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
import asyncio
import arrow import arrow
import requests import requests
@ -149,9 +148,7 @@ class FreqtradeBot(object):
final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list
self.config['exchange']['pair_whitelist'] = final_list self.config['exchange']['pair_whitelist'] = final_list
datatups = asyncio.get_event_loop().run_until_complete( self._klines = self.exchange.refresh_tickers(final_list, self.strategy.ticker_interval)
self.exchange.async_get_tickers_history(final_list, self.strategy.ticker_interval))
self._klines = {pair: data for (pair, data) in datatups}
# Query trades from persistence layer # Query trades from persistence layer
trades = Trade.query.filter(Trade.is_open.is_(True)).all() trades = Trade.query.filter(Trade.is_open.is_(True)).all()
@ -308,12 +305,6 @@ class FreqtradeBot(object):
amount_reserve_percent = max(amount_reserve_percent, 0.5) amount_reserve_percent = max(amount_reserve_percent, 0.5)
return min(min_stake_amounts) / amount_reserve_percent return min(min_stake_amounts) / amount_reserve_percent
async def async_get_tickers(self, exchange, pairs):
input_coroutines = [exchange.async_get_ticker_history(symbol, self.strategy.ticker_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
return tickers
#await exchange.close()
def create_trade(self) -> bool: def create_trade(self) -> bool:
""" """
Checks the implemented trading indicator(s) for a randomly picked pair, Checks the implemented trading indicator(s) for a randomly picked pair,
@ -341,17 +332,13 @@ class FreqtradeBot(object):
if not whitelist: if not whitelist:
raise DependencyException('No currency pairs in whitelist') raise DependencyException('No currency pairs in whitelist')
# fetching kline history for all pairs asynchronously and wait till all done
# data = asyncio.get_event_loop().run_until_complete(self.exchange.async_get_tickers_history(whitelist, self.strategy.ticker_interval))
# list of pairs having buy signals # list of pairs having buy signals
buy_pairs = [] buy_pairs = []
# running get_signal on historical data fetched # running get_signal on historical data fetched
# to find buy signals # to find buy signals
for _pair, thistory in self._klines.items(): for _pair in whitelist:
(buy, sell) = self.strategy.get_signal(_pair, interval, thistory) (buy, sell) = self.strategy.get_signal(_pair, interval, self._klines[_pair])
if buy and not sell: if buy and not sell:
buy_pairs.append(_pair) buy_pairs.append(_pair)