Merge branch 'develop' into time_in_force

This commit is contained in:
Matthias
2018-12-17 06:37:46 +01:00
committed by GitHub
26 changed files with 174 additions and 139 deletions

View File

@@ -7,12 +7,14 @@ from typing import List, Dict, Tuple, Any, Optional
from datetime import datetime
from math import floor, ceil
import arrow
import asyncio
import ccxt
import ccxt.async_support as ccxt_async
import arrow
from pandas import DataFrame
from freqtrade import constants, OperationalException, DependencyException, TemporaryError
from freqtrade.exchange.exchange_helpers import parse_ticker_dataframe
logger = logging.getLogger(__name__)
@@ -64,14 +66,8 @@ def retrier(f):
class Exchange(object):
# Current selected exchange
_api: ccxt.Exchange = None
_api_async: ccxt_async.Exchange = None
_conf: Dict = {}
# Holds all open sell orders for dry_run
_dry_run_open_orders: Dict[str, Any] = {}
def __init__(self, config: dict) -> None:
"""
Initializes this module with the given config,
@@ -87,15 +83,19 @@ class Exchange(object):
self._pairs_last_refresh_time: Dict[str, int] = {}
# Holds candles
self.klines: Dict[str, Any] = {}
self._klines: Dict[str, DataFrame] = {}
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
if config['dry_run']:
logger.info('Instance is running with dry_run enabled')
exchange_config = config['exchange']
self._api = self._init_ccxt(exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config'))
self._api_async = self._init_ccxt(exchange_config, ccxt_async,
ccxt_kwargs=exchange_config.get('ccxt_async_config'))
self._api: ccxt.Exchange = self._init_ccxt(
exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config'))
self._api_async: ccxt_async.Exchange = self._init_ccxt(
exchange_config, ccxt_async, ccxt_kwargs=exchange_config.get('ccxt_async_config'))
logger.info('Using Exchange "%s"', self.name)
@@ -129,12 +129,12 @@ class Exchange(object):
raise OperationalException(f'Exchange {name} is not supported')
ex_config = {
'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''),
'enableRateLimit': exchange_config.get('ccxt_rate_limit', True)
}
'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''),
'enableRateLimit': exchange_config.get('ccxt_rate_limit', True)
}
if ccxt_kwargs:
logger.info('Applying additional ccxt config: %s', ccxt_kwargs)
ex_config.update(ccxt_kwargs)
@@ -158,6 +158,12 @@ class Exchange(object):
"""exchange ccxt id"""
return self._api.id
def klines(self, pair: str) -> DataFrame:
if pair in self._klines:
return self._klines[pair].copy()
else:
return None
def set_sandbox(self, api, exchange_config: dict, name: str):
if exchange_config.get('sandbox'):
if api.urls.get('test'):
@@ -513,9 +519,9 @@ class Exchange(object):
# Combine tickers
data: List = []
for tick in tickers:
if tick[0] == pair:
data.extend(tick[1])
for p, ticker in tickers:
if p == pair:
data.extend(ticker)
# Sort data again after extending the result - above calls return in "async order" order
data = sorted(data, key=lambda x: x[0])
logger.info("downloaded %s with length %s.", pair, len(data))
@@ -523,7 +529,7 @@ class Exchange(object):
def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None:
"""
Refresh tickers asyncronously and return the result.
Refresh tickers asyncronously and set `_klines` of this object with the result
"""
logger.debug("Refreshing klines for %d pairs", len(pair_list))
asyncio.get_event_loop().run_until_complete(
@@ -532,9 +538,27 @@ class Exchange(object):
async def async_get_candles_history(self, pairs: List[str],
tick_interval: str) -> List[Tuple[str, List]]:
"""Download ohlcv history for pair-list asyncronously """
input_coroutines = [self._async_get_candle_history(
symbol, tick_interval) for symbol in pairs]
# Calculating ticker interval in second
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
input_coroutines = []
# Gather corotines to run
for pair in pairs:
if not (self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp and pair in self._klines):
input_coroutines.append(self._async_get_candle_history(pair, tick_interval))
else:
logger.debug("Using cached klines data for %s ...", pair)
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
# handle caching
for pair, ticks in tickers:
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[pair] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
self._klines[pair] = parse_ticker_dataframe(ticks)
return tickers
@retrier_async
@@ -544,20 +568,8 @@ class Exchange(object):
# fetch ohlcv asynchronously
logger.debug("fetching %s since %s ...", pair, since_ms)
# Calculating ticker interval in second
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
# If (last update time) + (interval in second) is greater or equal than now
# that means we don't have to hit the API as there is no new candle
# so we fetch it from local cache
if (not since_ms and
self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp):
data = self.klines[pair]
logger.debug("Using cached klines data for %s ...", pair)
else:
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
since=since_ms)
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
since=since_ms)
# Because some exchange sort Tickers ASC and other DESC.
# Ex: Bittrex returns a list of tickers ASC (oldest first, newest last)
@@ -566,13 +578,6 @@ class Exchange(object):
if data and data[0][0] > data[-1][0]:
data = sorted(data, key=lambda x: x[0])
# keeping last candle time as last refreshed time of the pair
if data:
self._pairs_last_refresh_time[pair] = data[-1][0] // 1000
# keeping candles in cache
self.klines[pair] = data
logger.debug("done fetching %s ...", pair)
return pair, data

View File

@@ -14,6 +14,7 @@ def parse_ticker_dataframe(ticker: list) -> DataFrame:
:param ticker: ticker list, as returned by exchange.async_get_candle_history
:return: DataFrame
"""
logger.debug("Parsing tickerlist to dataframe")
cols = ['date', 'open', 'high', 'low', 'close', 'volume']
frame = DataFrame(ticker, columns=cols)