This commit is contained in:
Samuel Husso 2018-08-01 16:55:28 +00:00 committed by GitHub
commit 75604c358d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 4 deletions

View File

@ -7,7 +7,9 @@ from datetime import datetime
from math import floor, ceil from math import floor, ceil
import ccxt import ccxt
import ccxt.async_support as ccxt_async
import arrow import arrow
import asyncio
from freqtrade import constants, OperationalException, DependencyException, TemporaryError from freqtrade import constants, OperationalException, DependencyException, TemporaryError
@ -41,10 +43,21 @@ def retrier(f):
return wrapper return wrapper
def run_async_task(f, **kwargs):
"""
Run async task and its kwargs.
Usage:
run_async_task(async_f, arg1='foo', arg2='bar')
"""
logger.info('Running async task method %s with args %s', f, kwargs)
return asyncio.get_event_loop().run_until_complete(f(**kwargs))
class Exchange(object): class Exchange(object):
# Current selected exchange # Current selected exchange
_api: ccxt.Exchange = None _api: ccxt.Exchange = None
_api_async: ccxt_async.Exchange = None
_conf: Dict = {} _conf: Dict = {}
_cached_ticker: Dict[str, Any] = {} _cached_ticker: Dict[str, Any] = {}
@ -65,6 +78,7 @@ class Exchange(object):
exchange_config = config['exchange'] exchange_config = config['exchange']
self._api = self._init_ccxt(exchange_config) self._api = self._init_ccxt(exchange_config)
self._api_async = self._init_ccxt(exchange_config, ccxt_async)
logger.info('Using Exchange "%s"', self.name) logger.info('Using Exchange "%s"', self.name)
@ -75,7 +89,25 @@ class Exchange(object):
# Check if timeframe is available # Check if timeframe is available
self.validate_timeframes(config['ticker_interval']) self.validate_timeframes(config['ticker_interval'])
def _init_ccxt(self, exchange_config: dict) -> ccxt.Exchange: self.__loop = None
self._ticker_history = {}
self._fetching_progress = False
for _pair in config['exchange']['pair_whitelist']:
print(f'initting {_pair} for history cache')
self._ticker_history[_pair] = {}
logger.info('Ticker_history cache for given pairs%s', self._ticker_history)
@property
def loop(self):
return self.__loop
@loop.setter
def loop(self, loop):
if not self.__loop:
logger.info('** Setting exchange asyncio loop **')
self.__loop = loop
def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt) -> ccxt.Exchange:
""" """
Initialize ccxt with given config and return valid Initialize ccxt with given config and return valid
ccxt instance. ccxt instance.
@ -83,10 +115,10 @@ class Exchange(object):
# Find matching class for the given exchange name # Find matching class for the given exchange name
name = exchange_config['name'] name = exchange_config['name']
if name not in ccxt.exchanges: if name not in ccxt_module.exchanges:
raise OperationalException(f'Exchange {name} is not supported') raise OperationalException(f'Exchange {name} is not supported')
try: try:
api = getattr(ccxt, name.lower())({ api = getattr(ccxt_module, name.lower())({
'apiKey': exchange_config.get('key'), 'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'), 'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'), 'password': exchange_config.get('password'),
@ -329,6 +361,23 @@ class Exchange(object):
logger.info("returning cached ticker-data for %s", pair) logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair] return self._cached_ticker[pair]
async def get_ticker_history_async(self, tick_interval: str,
since_ms: Optional[int] = None):
"""Fetch exchange ticker history for configured pairs"""
if self._fetching_progress:
return self._ticker_history
data = []
self._fetching_progress = True
for _pair in self._ticker_history:
logger.info('*'*20)
logger.info('Async: Fetching history for pair %s', _pair)
logger.info('*'*20)
data = await self._api_async.fetch_ohlcv(_pair, timeframe=tick_interval)
logger.info('Async data fetched len %s', len(data))
print(f'***{_pair} Some data appears...... {data[0]}')
return data
@retrier @retrier
def get_ticker_history(self, pair: str, tick_interval: str, def get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]: since_ms: Optional[int] = None) -> List[Dict]:

View File

@ -12,10 +12,11 @@ from typing import Any, Callable, Dict, List, Optional
import arrow import arrow
import requests import requests
from cachetools import TTLCache, cached from cachetools import TTLCache, cached
import asyncio
from freqtrade import (DependencyException, OperationalException, from freqtrade import (DependencyException, OperationalException,
TemporaryError, __version__, constants, persistence) TemporaryError, __version__, constants, persistence)
from freqtrade.exchange import Exchange from freqtrade.exchange import Exchange, run_async_task
from freqtrade.persistence import Trade from freqtrade.persistence import Trade
from freqtrade.rpc import RPCManager, RPCMessageType from freqtrade.rpc import RPCManager, RPCMessageType
from freqtrade.state import State from freqtrade.state import State
@ -43,6 +44,8 @@ class FreqtradeBot(object):
__version__, __version__,
) )
self.loop = asyncio.get_event_loop()
# Init bot states # Init bot states
self.state = State.STOPPED self.state = State.STOPPED
@ -52,6 +55,7 @@ class FreqtradeBot(object):
self.rpc: RPCManager = RPCManager(self) self.rpc: RPCManager = RPCManager(self)
self.persistence = None self.persistence = None
self.exchange = Exchange(self.config) self.exchange = Exchange(self.config)
self.exchange.loop = self.loop
self._init_modules() self._init_modules()
def _init_modules(self) -> None: def _init_modules(self) -> None:
@ -146,6 +150,10 @@ class FreqtradeBot(object):
final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list
self.config['exchange']['pair_whitelist'] = final_list self.config['exchange']['pair_whitelist'] = final_list
# async update ticker history
run_async_task(self.exchange.get_ticker_history_async,
tick_interval = self.strategy.ticker_interval)
# Query trades from persistence layer # Query trades from persistence layer
trades = Trade.query.filter(Trade.is_open.is_(True)).all() trades = Trade.query.filter(Trade.is_open.is_(True)).all()