Added candle_type to a lot of methods, wrote some tests

This commit is contained in:
Sam Germain
2021-11-21 01:43:05 -06:00
parent e2f98a8dab
commit 920151934a
27 changed files with 495 additions and 253 deletions

View File

@@ -5,7 +5,7 @@ import itertools
import logging
from datetime import datetime, timezone
from operator import itemgetter
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
import pandas as pd
from pandas import DataFrame, to_datetime

View File

@@ -41,7 +41,13 @@ class DataProvider:
"""
self.__slice_index = limit_index
def _set_cached_df(self, pair: str, timeframe: str, dataframe: DataFrame) -> None:
def _set_cached_df(
self,
pair: str,
timeframe: str,
dataframe: DataFrame,
candle_type: str = ''
) -> None:
"""
Store cached Dataframe.
Using private method as this should never be used by a user
@@ -50,7 +56,8 @@ class DataProvider:
:param timeframe: Timeframe to get data for
:param dataframe: analyzed dataframe
"""
self.__cached_pairs[(pair, timeframe)] = (dataframe, datetime.now(timezone.utc))
self.__cached_pairs[(pair, timeframe, candle_type)] = (
dataframe, datetime.now(timezone.utc))
def add_pairlisthandler(self, pairlists) -> None:
"""
@@ -58,13 +65,18 @@ class DataProvider:
"""
self._pairlists = pairlists
def historic_ohlcv(self, pair: str, timeframe: str = None) -> DataFrame:
def historic_ohlcv(
self,
pair: str,
timeframe: str = None,
candle_type: str = ''
) -> DataFrame:
"""
Get stored historical candle (OHLCV) data
:param pair: pair to get the data for
:param timeframe: timeframe to get data for
"""
saved_pair = (pair, str(timeframe))
saved_pair = (pair, str(timeframe), candle_type)
if saved_pair not in self.__cached_pairs_backtesting:
timerange = TimeRange.parse_timerange(None if self._config.get(
'timerange') is None else str(self._config.get('timerange')))
@@ -77,11 +89,17 @@ class DataProvider:
timeframe=timeframe or self._config['timeframe'],
datadir=self._config['datadir'],
timerange=timerange,
data_format=self._config.get('dataformat_ohlcv', 'json')
data_format=self._config.get('dataformat_ohlcv', 'json'),
candle_type=candle_type
)
return self.__cached_pairs_backtesting[saved_pair].copy()
def get_pair_dataframe(self, pair: str, timeframe: str = None) -> DataFrame:
def get_pair_dataframe(
self,
pair: str,
timeframe: str = None,
candle_type: str = ''
) -> DataFrame:
"""
Return pair candle (OHLCV) data, either live or cached historical -- depending
on the runmode.
@@ -91,12 +109,12 @@ class DataProvider:
"""
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
# Get live OHLCV data.
data = self.ohlcv(pair=pair, timeframe=timeframe)
data = self.ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
else:
# Get historical OHLCV data (cached on disk).
data = self.historic_ohlcv(pair=pair, timeframe=timeframe)
data = self.historic_ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
if len(data) == 0:
logger.warning(f"No data found for ({pair}, {timeframe}).")
logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).")
return data
def get_analyzed_dataframe(
@@ -114,7 +132,7 @@ class DataProvider:
combination.
Returns empty dataframe and Epoch 0 (1970-01-01) if no dataframe was cached.
"""
pair_key = (pair, timeframe)
pair_key = (pair, timeframe, candle_type)
if pair_key in self.__cached_pairs:
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
df, date = self.__cached_pairs[pair_key]
@@ -200,8 +218,10 @@ class DataProvider:
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
return self._exchange.klines((pair, timeframe or self._config['timeframe']),
copy=copy)
return self._exchange.klines(
(pair, timeframe or self._config['timeframe'], candle_type),
copy=copy
)
else:
return DataFrame()

View File

@@ -54,6 +54,7 @@ def load_pair_history(pair: str,
fill_missing=fill_up_missing,
drop_incomplete=drop_incomplete,
startup_candles=startup_candles,
candle_type=candle_type
)
@@ -91,7 +92,8 @@ def load_data(datadir: Path,
datadir=datadir, timerange=timerange,
fill_up_missing=fill_up_missing,
startup_candles=startup_candles,
data_handler=data_handler
data_handler=data_handler,
candle_type=candle_type
)
if not hist.empty:
result[pair] = hist
@@ -124,7 +126,8 @@ def refresh_data(datadir: Path,
process = f'{idx}/{len(pairs)}'
_download_pair_history(pair=pair, process=process,
timeframe=timeframe, datadir=datadir,
timerange=timerange, exchange=exchange, data_handler=data_handler)
timerange=timerange, exchange=exchange, data_handler=data_handler,
candle_type=candle_type)
def _load_cached_data_for_updating(
@@ -150,7 +153,8 @@ def _load_cached_data_for_updating(
# Intentionally don't pass timerange in - since we need to load the full dataset.
data = data_handler.ohlcv_load(pair, timeframe=timeframe,
timerange=None, fill_missing=False,
drop_incomplete=True, warn_no_data=False)
drop_incomplete=True, warn_no_data=False,
candle_type=candle_type)
if not data.empty:
if start and start < data.iloc[0]['date']:
# Earlier data than existing data requested, redownload all
@@ -194,7 +198,8 @@ def _download_pair_history(pair: str, *,
# data, since_ms = _load_cached_data_for_updating_old(datadir, pair, timeframe, timerange)
data, since_ms = _load_cached_data_for_updating(pair, timeframe, timerange,
data_handler=data_handler)
data_handler=data_handler,
candle_type=candle_type)
logger.debug("Current Start: %s",
f"{data.iloc[0]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')

View File

@@ -119,8 +119,8 @@ class Edge:
)
# Download informative pairs too
res = defaultdict(list)
for p, t in self.strategy.gather_informative_pairs():
res[t].append(p)
for pair, timeframe, _ in self.strategy.gather_informative_pairs():
res[timeframe].append(pair)
for timeframe, inf_pairs in res.items():
timerange_startup = deepcopy(self._timerange)
timerange_startup.subtract_start(timeframe_to_seconds(

View File

@@ -204,7 +204,7 @@ class Binance(Exchange):
since_ms: int, is_new_pair: bool = False,
raise_: bool = False,
candle_type: str = ''
) -> Tuple[str, str, List]:
) -> Tuple[str, str, str, List]:
"""
Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date
Does not work for other exchanges, which don't return the earliest data when called with "0"

View File

@@ -92,7 +92,7 @@ class Exchange:
self._config.update(config)
# Holds last candle refreshed time of each pair
self._pairs_last_refresh_time: Dict[Tuple[str, str], int] = {}
self._pairs_last_refresh_time: Dict[Tuple[str, str, str], int] = {}
# Timestamp of last markets refresh
self._last_markets_refresh: int = 0
@@ -105,7 +105,7 @@ class Exchange:
self._buy_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=1800)
# Holds candles
self._klines: Dict[Tuple[str, str], DataFrame] = {}
self._klines: Dict[Tuple[str, str, str], DataFrame] = {}
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
@@ -359,7 +359,7 @@ class Exchange:
or (self.trading_mode == TradingMode.FUTURES and self.market_is_future(market))
)
def klines(self, pair_interval: Tuple[str, str], copy: bool = True) -> DataFrame:
def klines(self, pair_interval: Tuple[str, str, str], copy: bool = True) -> DataFrame:
if pair_interval in self._klines:
return self._klines[pair_interval].copy() if copy else self._klines[pair_interval]
else:
@@ -1321,7 +1321,8 @@ class Exchange:
:param since_ms: Timestamp in milliseconds to get history from
:return: List with candle (OHLCV) data
"""
pair, timeframe, data = asyncio.get_event_loop().run_until_complete(
data: List
pair, timeframe, candle_type, data = asyncio.get_event_loop().run_until_complete(
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
since_ms=since_ms, is_new_pair=is_new_pair,
candle_type=candle_type))
@@ -1337,15 +1338,15 @@ class Exchange:
:param since_ms: Timestamp in milliseconds to get history from
:return: OHLCV DataFrame
"""
ticks = self.get_historic_ohlcv(pair, timeframe, since_ms=since_ms)
ticks = self.get_historic_ohlcv(pair, timeframe, since_ms=since_ms, candle_type=candle_type)
return ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=self._ohlcv_partial_candle)
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, is_new_pair: bool,
since_ms: int, is_new_pair: bool = False,
raise_: bool = False,
candle_type: str = ''
) -> Tuple[str, str, List]:
) -> Tuple[str, str, str, List]:
"""
Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
@@ -1374,12 +1375,12 @@ class Exchange:
continue
else:
# Deconstruct tuple if it's not an exception
p, _, new_data = res
if p == pair:
p, _, c, new_data = res
if p == pair and c == candle_type:
data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, data
return pair, timeframe, candle_type, data
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True,
@@ -1399,8 +1400,8 @@ class Exchange:
input_coroutines = []
cached_pairs = []
# Gather coroutines to run
for pair, timeframe in set(pair_list):
if ((pair, timeframe) not in self._klines
for pair, timeframe, candle_type in set(pair_list):
if ((pair, timeframe, candle_type) not in self._klines
or self._now_is_time_to_refresh(pair, timeframe)):
if not since_ms and self.required_candle_call_count > 1:
# Multiple calls for one pair - to get more history
@@ -1411,17 +1412,17 @@ class Exchange:
if since_ms:
input_coroutines.append(self._async_get_historic_ohlcv(
pair, timeframe, since_ms=since_ms, raise_=True))
pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type))
else:
# One call ... "regular" refresh
input_coroutines.append(self._async_get_candle_history(
pair, timeframe, since_ms=since_ms, candle_type=candle_type,))
pair, timeframe, since_ms=since_ms, candle_type=candle_type))
else:
logger.debug(
"Using cached candle (OHLCV) data for pair %s, timeframe %s ...",
pair, timeframe
pair, timeframe, candle_type
)
cached_pairs.append((pair, timeframe))
cached_pairs.append((pair, timeframe, candle_type))
results = asyncio.get_event_loop().run_until_complete(
asyncio.gather(*input_coroutines, return_exceptions=True))
@@ -1433,20 +1434,23 @@ class Exchange:
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
continue
# Deconstruct tuple (has 3 elements)
pair, timeframe, ticks = res
pair, timeframe, c_type, ticks = res
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000
self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
ohlcv_df = ohlcv_to_dataframe(
ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=self._ohlcv_partial_candle)
results_df[(pair, timeframe)] = ohlcv_df
results_df[(pair, timeframe, c_type)] = ohlcv_df
if cache:
self._klines[(pair, timeframe)] = ohlcv_df
self._klines[(pair, timeframe, c_type)] = ohlcv_df
# Return cached klines
for pair, timeframe in cached_pairs:
results_df[(pair, timeframe)] = self.klines((pair, timeframe), copy=False)
for pair, timeframe, c_type in cached_pairs:
results_df[(pair, timeframe, c_type)] = self.klines(
(pair, timeframe, c_type),
copy=False
)
return results_df
@@ -1459,8 +1463,12 @@ class Exchange:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
return not ((self._pairs_last_refresh_time.get((pair, timeframe), 0)
+ interval_in_sec) >= arrow.utcnow().int_timestamp)
return not (
(self._pairs_last_refresh_time.get(
(pair, timeframe, candle_type),
0
) + interval_in_sec) >= arrow.utcnow().int_timestamp
)
@retrier_async
async def _async_get_candle_history(
@@ -1501,9 +1509,9 @@ class Exchange:
data = sorted(data, key=lambda x: x[0])
except IndexError:
logger.exception("Error loading %s. Result was %s.", pair, data)
return pair, timeframe, []
return pair, timeframe, candle_type, []
logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe)
return pair, timeframe, data
return pair, timeframe, candle_type, data
except ccxt.NotSupported as e:
raise OperationalException(

View File

@@ -72,7 +72,7 @@ class AgeFilter(IPairList):
:return: new allowlist
"""
needed_pairs = [
(p, '1d') for p in pairlist
(p, '1d', '') for p in pairlist
if p not in self._symbolsChecked and p not in self._symbolsCheckFailed]
if not needed_pairs:
# Remove pairs that have been removed before
@@ -88,7 +88,7 @@ class AgeFilter(IPairList):
candles = self._exchange.refresh_latest_ohlcv(needed_pairs, since_ms=since_ms, cache=False)
if self._enabled:
for p in deepcopy(pairlist):
daily_candles = candles[(p, '1d')] if (p, '1d') in candles else None
daily_candles = candles[(p, '1d', '')] if (p, '1d', '') in candles else None
if not self._validate_pair_loc(p, daily_candles):
pairlist.remove(p)
self.log_once(f"Validated {len(pairlist)} pairs.", logger.info)

View File

@@ -67,7 +67,7 @@ class VolatilityFilter(IPairList):
:param tickers: Tickers (from exchange.get_tickers()). May be cached.
:return: new allowlist
"""
needed_pairs = [(p, '1d') for p in pairlist if p not in self._pair_cache]
needed_pairs = [(p, '1d', '') for p in pairlist if p not in self._pair_cache]
since_ms = (arrow.utcnow()
.floor('day')
@@ -81,7 +81,7 @@ class VolatilityFilter(IPairList):
if self._enabled:
for p in deepcopy(pairlist):
daily_candles = candles[(p, '1d')] if (p, '1d') in candles else None
daily_candles = candles[(p, '1d', '')] if (p, '1d', '') in candles else None
if not self._validate_pair_loc(p, daily_candles):
pairlist.remove(p)
return pairlist

View File

@@ -160,10 +160,9 @@ class VolumePairList(IPairList):
f"{self._lookback_timeframe}, starting from {format_ms_time(since_ms)} "
f"till {format_ms_time(to_ms)}", logger.info)
needed_pairs = [
(p, self._lookback_timeframe) for p in
[
s['symbol'] for s in filtered_tickers
] if p not in self._pair_cache
(p, self._lookback_timeframe, '') for p in
[s['symbol'] for s in filtered_tickers]
if p not in self._pair_cache
]
# Get all candles
@@ -174,8 +173,8 @@ class VolumePairList(IPairList):
)
for i, p in enumerate(filtered_tickers):
pair_candles = candles[
(p['symbol'], self._lookback_timeframe)
] if (p['symbol'], self._lookback_timeframe) in candles else None
(p['symbol'], self._lookback_timeframe, '')
] if (p['symbol'], self._lookback_timeframe, '') in candles else None
# in case of candle data calculate typical price and quoteVolume for candle
if pair_candles is not None and not pair_candles.empty:
pair_candles['typical_price'] = (pair_candles['high'] + pair_candles['low']

View File

@@ -65,7 +65,7 @@ class RangeStabilityFilter(IPairList):
:param tickers: Tickers (from exchange.get_tickers()). May be cached.
:return: new allowlist
"""
needed_pairs = [(p, '1d') for p in pairlist if p not in self._pair_cache]
needed_pairs = [(p, '1d', '') for p in pairlist if p not in self._pair_cache]
since_ms = (arrow.utcnow()
.floor('day')
@@ -79,7 +79,7 @@ class RangeStabilityFilter(IPairList):
if self._enabled:
for p in deepcopy(pairlist):
daily_candles = candles[(p, '1d')] if (p, '1d') in candles else None
daily_candles = candles[(p, '1d', '')] if (p, '1d', '') in candles else None
if not self._validate_pair_loc(p, daily_candles):
pairlist.remove(p)
return pairlist

View File

@@ -138,4 +138,4 @@ class PairListManager():
"""
Create list of pair tuples with (pair, timeframe)
"""
return [(pair, timeframe or self._config['timeframe']) for pair in pairs]
return [(pair, timeframe or self._config['timeframe'], '') for pair in pairs]

View File

@@ -14,6 +14,7 @@ class InformativeData(NamedTuple):
timeframe: str
fmt: Union[str, Callable[[Any], str], None]
ffill: bool
candle_type: str = ''
def informative(timeframe: str, asset: str = '',

View File

@@ -424,14 +424,18 @@ class IStrategy(ABC, HyperStrategyMixin):
informative_pairs = self.informative_pairs()
for inf_data, _ in self._ft_informative:
if inf_data.asset:
pair_tf = (_format_pair_name(self.config, inf_data.asset), inf_data.timeframe)
pair_tf = (
_format_pair_name(self.config, inf_data.asset),
inf_data.timeframe,
inf_data.candle_type
)
informative_pairs.append(pair_tf)
else:
if not self.dp:
raise OperationalException('@informative decorator with unspecified asset '
'requires DataProvider instance.')
for pair in self.dp.current_whitelist():
informative_pairs.append((pair, inf_data.timeframe))
informative_pairs.append((pair, inf_data.timeframe, inf_data.candle_type))
return list(set(informative_pairs))
def get_strategy_name(self) -> str: