updated historic data filenames to include the candle type

This commit is contained in:
Sam Germain 2021-11-07 00:35:27 -06:00
parent c8162479d6
commit ee2ad8ca97
8 changed files with 247 additions and 125 deletions

View File

@ -5,7 +5,7 @@ import itertools
import logging
from datetime import datetime, timezone
from operator import itemgetter
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
import pandas as pd
from pandas import DataFrame, to_datetime
@ -17,7 +17,8 @@ logger = logging.getLogger(__name__)
def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
fill_missing: bool = True, drop_incomplete: bool = True) -> DataFrame:
fill_missing: bool = True, drop_incomplete: bool = True,
candle_type: Optional[str] = "") -> DataFrame:
"""
Converts a list with candle (OHLCV) data (in format returned by ccxt.fetch_ohlcv)
to a Dataframe
@ -42,12 +43,14 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
'volume': 'float'})
return clean_ohlcv_dataframe(df, timeframe, pair,
fill_missing=fill_missing,
drop_incomplete=drop_incomplete)
drop_incomplete=drop_incomplete,
candle_type=candle_type)
def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool = True,
drop_incomplete: bool = True) -> DataFrame:
drop_incomplete: bool = True,
candle_type: Optional[str] = "") -> DataFrame:
"""
Cleanse a OHLCV dataframe by
* Grouping it by date (removes duplicate tics)
@ -75,12 +78,17 @@ def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
logger.debug('Dropping last candle')
if fill_missing:
return ohlcv_fill_up_missing_data(data, timeframe, pair)
return ohlcv_fill_up_missing_data(data, timeframe, pair, candle_type)
else:
return data
def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame:
def ohlcv_fill_up_missing_data(
dataframe: DataFrame,
timeframe: str,
pair: str,
candle_type: Optional[str] = ""
) -> DataFrame:
"""
Fills up missing data with 0 volume rows,
using the previous close as price for "open", "high" "low" and "close", volume is set to 0
@ -261,7 +269,13 @@ def convert_trades_format(config: Dict[str, Any], convert_from: str, convert_to:
src.trades_purge(pair=pair)
def convert_ohlcv_format(config: Dict[str, Any], convert_from: str, convert_to: str, erase: bool):
def convert_ohlcv_format(
config: Dict[str, Any],
convert_from: str,
convert_to: str,
erase: bool,
candle_type: Optional[str] = ""
):
"""
Convert OHLCV from one format to another
:param config: Config dictionary
@ -279,8 +293,11 @@ def convert_ohlcv_format(config: Dict[str, Any], convert_from: str, convert_to:
config['pairs'] = []
# Check timeframes or fall back to timeframe.
for timeframe in timeframes:
config['pairs'].extend(src.ohlcv_get_pairs(config['datadir'],
timeframe))
config['pairs'].extend(src.ohlcv_get_pairs(
config['datadir'],
timeframe,
candle_type=candle_type
))
logger.info(f"Converting candle (OHLCV) data for {config['pairs']}")
for timeframe in timeframes:
@ -289,10 +306,16 @@ def convert_ohlcv_format(config: Dict[str, Any], convert_from: str, convert_to:
timerange=None,
fill_missing=False,
drop_incomplete=False,
startup_candles=0)
logger.info(f"Converting {len(data)} candles for {pair}")
startup_candles=0,
candle_type=candle_type)
logger.info(f"Converting {len(data)} {candle_type} candles for {pair}")
if len(data) > 0:
trg.ohlcv_store(pair=pair, timeframe=timeframe, data=data)
trg.ohlcv_store(
pair=pair,
timeframe=timeframe,
data=data,
candle_type=candle_type
)
if erase and convert_from != convert_to:
logger.info(f"Deleting source data for {pair} / {timeframe}")
src.ohlcv_purge(pair=pair, timeframe=timeframe)
src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type)

View File

@ -34,7 +34,12 @@ class HDF5DataHandler(IDataHandler):
if match and len(match.groups()) > 1]
@classmethod
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str) -> List[str]:
def ohlcv_get_pairs(
cls,
datadir: Path,
timeframe: str,
candle_type: Optional[str] = ""
) -> List[str]:
"""
Returns a list of all pairs with ohlcv data available in this datadir
for the specified timeframe
@ -43,12 +48,23 @@ class HDF5DataHandler(IDataHandler):
:return: List of Pairs
"""
_tmp = [re.search(r'^(\S+)(?=\-' + timeframe + '.h5)', p.name)
if candle_type:
candle_type = f"-{candle_type}"
else:
candle_type = ""
_tmp = [re.search(r'^(\S+)(?=\-' + timeframe + candle_type + '.h5)', p.name)
for p in datadir.glob(f"*{timeframe}.h5")]
# Check if regex found something and only return these results
return [match[0].replace('_', '/') for match in _tmp if match]
def ohlcv_store(self, pair: str, timeframe: str, data: pd.DataFrame) -> None:
def ohlcv_store(
self,
pair: str,
timeframe: str,
data: pd.DataFrame,
candle_type: Optional[str] = ""
) -> None:
"""
Store data in hdf5 file.
:param pair: Pair - used to generate filename
@ -59,7 +75,7 @@ class HDF5DataHandler(IDataHandler):
key = self._pair_ohlcv_key(pair, timeframe)
_data = data.copy()
filename = self._pair_data_filename(self._datadir, pair, timeframe)
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
ds = pd.HDFStore(filename, mode='a', complevel=9, complib='blosc')
ds.put(key, _data.loc[:, self._columns], format='table', data_columns=['date'])
@ -67,7 +83,8 @@ class HDF5DataHandler(IDataHandler):
ds.close()
def _ohlcv_load(self, pair: str, timeframe: str,
timerange: Optional[TimeRange] = None) -> pd.DataFrame:
timerange: Optional[TimeRange] = None,
candle_type: Optional[str] = "") -> pd.DataFrame:
"""
Internal method used to load data for one pair from disk.
Implements the loading and conversion to a Pandas dataframe.
@ -80,7 +97,12 @@ class HDF5DataHandler(IDataHandler):
:return: DataFrame with ohlcv data, or empty DataFrame
"""
key = self._pair_ohlcv_key(pair, timeframe)
filename = self._pair_data_filename(self._datadir, pair, timeframe)
filename = self._pair_data_filename(
self._datadir,
pair,
timeframe,
candle_type=candle_type
)
if not filename.exists():
return pd.DataFrame(columns=self._columns)
@ -99,20 +121,26 @@ class HDF5DataHandler(IDataHandler):
'low': 'float', 'close': 'float', 'volume': 'float'})
return pairdata
def ohlcv_purge(self, pair: str, timeframe: str) -> bool:
def ohlcv_purge(self, pair: str, timeframe: str, candle_type: Optional[str] = "") -> bool:
"""
Remove data for this pair
:param pair: Delete data for this pair.
:param timeframe: Timeframe (e.g. "5m")
:return: True when deleted, false if file did not exist.
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
if filename.exists():
filename.unlink()
return True
return False
def ohlcv_append(self, pair: str, timeframe: str, data: pd.DataFrame) -> None:
def ohlcv_append(
self,
pair: str,
timeframe: str,
data: pd.DataFrame,
candle_type: Optional[str] = ""
) -> None:
"""
Append data to existing data structures
:param pair: Pair
@ -201,9 +229,17 @@ class HDF5DataHandler(IDataHandler):
return f"{pair}/trades"
@classmethod
def _pair_data_filename(cls, datadir: Path, pair: str, timeframe: str) -> Path:
def _pair_data_filename(
cls,
datadir: Path,
pair: str,
timeframe: str,
candle_type: Optional[str] = ""
) -> Path:
pair_s = misc.pair_to_filename(pair)
filename = datadir.joinpath(f'{pair_s}-{timeframe}.h5')
if candle_type:
candle_type = f"-{candle_type}"
filename = datadir.joinpath(f'{pair_s}-{timeframe}{candle_type}.h5')
return filename
@classmethod

View File

@ -161,7 +161,8 @@ def _download_pair_history(pair: str, *,
process: str = '',
new_pairs_days: int = 30,
data_handler: IDataHandler = None,
timerange: Optional[TimeRange] = None) -> bool:
timerange: Optional[TimeRange] = None,
candle_type: Optional[str] = "") -> bool:
"""
Download latest candles from the exchange for the pair and timeframe passed in parameters
The data is downloaded starting from the last correct data that
@ -198,25 +199,28 @@ def _download_pair_history(pair: str, *,
since_ms=since_ms if since_ms else
arrow.utcnow().shift(
days=-new_pairs_days).int_timestamp * 1000,
is_new_pair=data.empty
is_new_pair=data.empty,
candle_type=candle_type,
)
# TODO: Maybe move parsing to exchange class (?)
new_dataframe = ohlcv_to_dataframe(new_data, timeframe, pair,
fill_missing=False, drop_incomplete=True)
fill_missing=False, drop_incomplete=True,
candle_type=candle_type)
if data.empty:
data = new_dataframe
else:
# Run cleaning again to ensure there were no duplicate candles
# Especially between existing and new data.
data = clean_ohlcv_dataframe(data.append(new_dataframe), timeframe, pair,
fill_missing=False, drop_incomplete=False)
fill_missing=False, drop_incomplete=False,
candle_type=candle_type)
logger.debug("New Start: %s",
f"{data.iloc[0]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')
logger.debug("New End: %s",
f"{data.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')
data_handler.ohlcv_store(pair, timeframe, data=data)
data_handler.ohlcv_store(pair, timeframe, data=data, candle_type=candle_type)
return True
except Exception:
@ -229,7 +233,8 @@ def _download_pair_history(pair: str, *,
def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes: List[str],
datadir: Path, timerange: Optional[TimeRange] = None,
new_pairs_days: int = 30, erase: bool = False,
data_format: str = None) -> List[str]:
data_format: str = None,
candle_type: Optional[str] = "") -> List[str]:
"""
Refresh stored ohlcv data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand.
@ -245,7 +250,7 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes
for timeframe in timeframes:
if erase:
if data_handler.ohlcv_purge(pair, timeframe):
if data_handler.ohlcv_purge(pair, timeframe, candle_type=candle_type):
logger.info(
f'Deleting existing data for pair {pair}, interval {timeframe}.')
@ -254,7 +259,8 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes
_download_pair_history(pair=pair, process=process,
datadir=datadir, exchange=exchange,
timerange=timerange, data_handler=data_handler,
timeframe=str(timeframe), new_pairs_days=new_pairs_days)
timeframe=str(timeframe), new_pairs_days=new_pairs_days,
candle_type=candle_type)
return pairs_not_available
@ -353,10 +359,16 @@ def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir:
return pairs_not_available
def convert_trades_to_ohlcv(pairs: List[str], timeframes: List[str],
datadir: Path, timerange: TimeRange, erase: bool = False,
data_format_ohlcv: str = 'json',
data_format_trades: str = 'jsongz') -> None:
def convert_trades_to_ohlcv(
pairs: List[str],
timeframes: List[str],
datadir: Path,
timerange: TimeRange,
erase: bool = False,
data_format_ohlcv: str = 'json',
data_format_trades: str = 'jsongz',
candle_type: Optional[str] = ""
) -> None:
"""
Convert stored trades data to ohlcv data
"""
@ -367,12 +379,12 @@ def convert_trades_to_ohlcv(pairs: List[str], timeframes: List[str],
trades = data_handler_trades.trades_load(pair)
for timeframe in timeframes:
if erase:
if data_handler_ohlcv.ohlcv_purge(pair, timeframe):
if data_handler_ohlcv.ohlcv_purge(pair, timeframe, candle_type=candle_type):
logger.info(f'Deleting existing data for pair {pair}, interval {timeframe}.')
try:
ohlcv = trades_to_ohlcv(trades, timeframe)
# Store ohlcv
data_handler_ohlcv.ohlcv_store(pair, timeframe, data=ohlcv)
data_handler_ohlcv.ohlcv_store(pair, timeframe, data=ohlcv, candle_type=candle_type)
except ValueError:
logger.exception(f'Could not convert {pair} to OHLCV.')

View File

@ -35,7 +35,12 @@ class IDataHandler(ABC):
"""
@abstractclassmethod
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str) -> List[str]:
def ohlcv_get_pairs(
cls,
datadir: Path,
timeframe: str,
candle_type: Optional[str] = ""
) -> List[str]:
"""
Returns a list of all pairs with ohlcv data available in this datadir
for the specified timeframe
@ -45,7 +50,13 @@ class IDataHandler(ABC):
"""
@abstractmethod
def ohlcv_store(self, pair: str, timeframe: str, data: DataFrame) -> None:
def ohlcv_store(
self,
pair: str,
timeframe: str,
data: DataFrame,
candle_type: Optional[str] = ""
) -> None:
"""
Store ohlcv data.
:param pair: Pair - used to generate filename
@ -57,6 +68,7 @@ class IDataHandler(ABC):
@abstractmethod
def _ohlcv_load(self, pair: str, timeframe: str,
timerange: Optional[TimeRange] = None,
candle_type: Optional[str] = ""
) -> DataFrame:
"""
Internal method used to load data for one pair from disk.
@ -71,7 +83,7 @@ class IDataHandler(ABC):
"""
@abstractmethod
def ohlcv_purge(self, pair: str, timeframe: str) -> bool:
def ohlcv_purge(self, pair: str, timeframe: str, candle_type: Optional[str] = "") -> bool:
"""
Remove data for this pair
:param pair: Delete data for this pair.
@ -80,7 +92,13 @@ class IDataHandler(ABC):
"""
@abstractmethod
def ohlcv_append(self, pair: str, timeframe: str, data: DataFrame) -> None:
def ohlcv_append(
self,
pair: str,
timeframe: str,
data: DataFrame,
candle_type: Optional[str] = ""
) -> None:
"""
Append data to existing data structures
:param pair: Pair
@ -146,7 +164,8 @@ class IDataHandler(ABC):
fill_missing: bool = True,
drop_incomplete: bool = True,
startup_candles: int = 0,
warn_no_data: bool = True
warn_no_data: bool = True,
candle_type: Optional[str] = ""
) -> DataFrame:
"""
Load cached candle (OHLCV) data for the given pair.
@ -165,9 +184,13 @@ class IDataHandler(ABC):
if startup_candles > 0 and timerange_startup:
timerange_startup.subtract_start(timeframe_to_seconds(timeframe) * startup_candles)
pairdf = self._ohlcv_load(pair, timeframe,
timerange=timerange_startup)
if self._check_empty_df(pairdf, pair, timeframe, warn_no_data):
pairdf = self._ohlcv_load(
pair,
timeframe,
timerange=timerange_startup,
candle_type=candle_type
)
if self._check_empty_df(pairdf, pair, timeframe, warn_no_data, candle_type):
return pairdf
else:
enddate = pairdf.iloc[-1]['date']
@ -175,7 +198,13 @@ class IDataHandler(ABC):
if timerange_startup:
self._validate_pairdata(pair, pairdf, timerange_startup)
pairdf = trim_dataframe(pairdf, timerange_startup)
if self._check_empty_df(pairdf, pair, timeframe, warn_no_data):
if self._check_empty_df(
pairdf,
pair,
timeframe,
warn_no_data,
candle_type
):
return pairdf
# incomplete candles should only be dropped if we didn't trim the end beforehand.
@ -183,11 +212,19 @@ class IDataHandler(ABC):
pair=pair,
fill_missing=fill_missing,
drop_incomplete=(drop_incomplete and
enddate == pairdf.iloc[-1]['date']))
self._check_empty_df(pairdf, pair, timeframe, warn_no_data)
enddate == pairdf.iloc[-1]['date']),
candle_type=candle_type)
self._check_empty_df(pairdf, pair, timeframe, warn_no_data, candle_type=candle_type)
return pairdf
def _check_empty_df(self, pairdf: DataFrame, pair: str, timeframe: str, warn_no_data: bool):
def _check_empty_df(
self,
pairdf: DataFrame,
pair: str,
timeframe: str,
warn_no_data: bool,
candle_type: Optional[str] = ""
):
"""
Warn on empty dataframe
"""
@ -200,7 +237,13 @@ class IDataHandler(ABC):
return True
return False
def _validate_pairdata(self, pair, pairdata: DataFrame, timerange: TimeRange):
def _validate_pairdata(
self,
pair,
pairdata: DataFrame,
timerange: TimeRange,
candle_type: Optional[str] = ""
):
"""
Validates pairdata for missing data at start end end and logs warnings.
:param pairdata: Dataframe to validate

View File

@ -35,7 +35,12 @@ class JsonDataHandler(IDataHandler):
if match and len(match.groups()) > 1]
@classmethod
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str) -> List[str]:
def ohlcv_get_pairs(
cls,
datadir: Path,
timeframe: str,
candle_type: Optional[str] = ""
) -> List[str]:
"""
Returns a list of all pairs with ohlcv data available in this datadir
for the specified timeframe
@ -43,13 +48,23 @@ class JsonDataHandler(IDataHandler):
:param timeframe: Timeframe to search pairs for
:return: List of Pairs
"""
if candle_type:
candle_type = f"-{candle_type}"
else:
candle_type = ""
_tmp = [re.search(r'^(\S+)(?=\-' + timeframe + '.json)', p.name)
_tmp = [re.search(r'^(\S+)(?=\-' + timeframe + candle_type + '.json)', p.name)
for p in datadir.glob(f"*{timeframe}.{cls._get_file_extension()}")]
# Check if regex found something and only return these results
return [match[0].replace('_', '/') for match in _tmp if match]
def ohlcv_store(self, pair: str, timeframe: str, data: DataFrame) -> None:
def ohlcv_store(
self,
pair: str,
timeframe: str,
data: DataFrame,
candle_type: Optional[str] = ""
) -> None:
"""
Store data in json format "values".
format looks as follows:
@ -59,7 +74,12 @@ class JsonDataHandler(IDataHandler):
:param data: Dataframe containing OHLCV data
:return: None
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
filename = self._pair_data_filename(
self._datadir,
pair,
timeframe,
candle_type
)
_data = data.copy()
# Convert date to int
_data['date'] = _data['date'].view(np.int64) // 1000 // 1000
@ -71,6 +91,7 @@ class JsonDataHandler(IDataHandler):
def _ohlcv_load(self, pair: str, timeframe: str,
timerange: Optional[TimeRange] = None,
candle_type: Optional[str] = ""
) -> DataFrame:
"""
Internal method used to load data for one pair from disk.
@ -83,7 +104,7 @@ class JsonDataHandler(IDataHandler):
all data where possible.
:return: DataFrame with ohlcv data, or empty DataFrame
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type)
if not filename.exists():
return DataFrame(columns=self._columns)
try:
@ -100,20 +121,26 @@ class JsonDataHandler(IDataHandler):
infer_datetime_format=True)
return pairdata
def ohlcv_purge(self, pair: str, timeframe: str) -> bool:
def ohlcv_purge(self, pair: str, timeframe: str, candle_type: Optional[str] = "") -> bool:
"""
Remove data for this pair
:param pair: Delete data for this pair.
:param timeframe: Timeframe (e.g. "5m")
:return: True when deleted, false if file did not exist.
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type=candle_type)
if filename.exists():
filename.unlink()
return True
return False
def ohlcv_append(self, pair: str, timeframe: str, data: DataFrame) -> None:
def ohlcv_append(
self,
pair: str,
timeframe: str,
data: DataFrame,
candle_type: Optional[str] = ""
) -> None:
"""
Append data to existing data structures
:param pair: Pair
@ -187,9 +214,18 @@ class JsonDataHandler(IDataHandler):
return False
@classmethod
def _pair_data_filename(cls, datadir: Path, pair: str, timeframe: str) -> Path:
def _pair_data_filename(
cls,
datadir: Path,
pair: str,
timeframe: str,
candle_type: Optional[str] = ""
) -> Path:
pair_s = misc.pair_to_filename(pair)
filename = datadir.joinpath(f'{pair_s}-{timeframe}.{cls._get_file_extension()}')
if candle_type:
candle_type = f"-{candle_type}"
filename = datadir.joinpath(
f'{pair_s}-{timeframe}{candle_type}.{cls._get_file_extension()}')
return filename
@classmethod

View File

@ -207,15 +207,15 @@ class Binance(Exchange):
since_ms: int,
is_new_pair: bool,
raise_: bool = False,
price: Optional[str] = None
candle_type: Optional[str] = ""
) -> Tuple[str, str, List]:
"""
Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date
Does not work for other exchanges, which don't return the earliest data when called with "0"
:param price: "mark" if retrieving the mark price cnadles
:param candle_type: "mark" if retrieving the mark price cnadles
"""
if is_new_pair:
x = await self._async_get_candle_history(pair, timeframe, 0, price)
x = await self._async_get_candle_history(pair, timeframe, 0, candle_type)
if x and x[2] and x[2][0] and x[2][0][0] > since_ms:
# Set starting date to first available candle.
since_ms = x[2][0][0]
@ -228,7 +228,7 @@ class Binance(Exchange):
since_ms=since_ms,
is_new_pair=is_new_pair,
raise_=raise_,
price=price
candle_type=candle_type
)
def funding_fee_cutoff(self, open_date: datetime):

View File

@ -1309,14 +1309,9 @@ class Exchange:
# Historic data
def get_historic_ohlcv(
self,
pair: str,
timeframe: str,
since_ms: int,
is_new_pair: bool = False,
price: Optional[str] = None
) -> List:
def get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, is_new_pair: bool = False,
candle_type: Optional[str] = "") -> List:
"""
Get candle history using asyncio and returns the list of candles.
Handles all async work for this.
@ -1324,52 +1319,37 @@ class Exchange:
:param pair: Pair to download
:param timeframe: Timeframe to get data for
:param since_ms: Timestamp in milliseconds to get history from
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
:return: List with candle (OHLCV) data
"""
pair, timeframe, data = asyncio.get_event_loop().run_until_complete(
self._async_get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=since_ms,
is_new_pair=is_new_pair,
price=price
))
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
since_ms=since_ms, is_new_pair=is_new_pair,
candle_type=candle_type))
logger.info(f"Downloaded data for {pair} with length {len(data)}.")
return data
def get_historic_ohlcv_as_df(
self,
pair: str,
timeframe: str,
since_ms: int,
price: Optional[str] = None
) -> DataFrame:
def get_historic_ohlcv_as_df(self, pair: str, timeframe: str,
since_ms: int, candle_type: Optional[str] = "") -> DataFrame:
"""
Minimal wrapper around get_historic_ohlcv - converting the result into a dataframe
:param pair: Pair to download
:param timeframe: Timeframe to get data for
:param since_ms: Timestamp in milliseconds to get history from
:param price: "mark" if retrieving the mark price candles, "index" for index price candles
:return: OHLCV DataFrame
"""
ticks = self.get_historic_ohlcv(pair, timeframe, since_ms=since_ms, price=price)
ticks = self.get_historic_ohlcv(pair, timeframe, since_ms=since_ms)
return ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=self._ohlcv_partial_candle)
drop_incomplete=self._ohlcv_partial_candle,
candle_type=candle_type)
async def _async_get_historic_ohlcv(
self,
pair: str,
timeframe: str,
since_ms: int,
is_new_pair: bool,
raise_: bool = False,
price: Optional[str] = None
) -> Tuple[str, str, List]:
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, is_new_pair: bool,
raise_: bool = False,
candle_type: Optional[str] = ""
) -> Tuple[str, str, List]:
"""
Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
"""
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(timeframe)
@ -1378,13 +1358,8 @@ class Exchange:
one_call,
arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True)
)
input_coroutines = [
self._async_get_candle_history(
pair,
timeframe,
since,
price
) for since in
input_coroutines = [self._async_get_candle_history(
pair, timeframe, since, candle_type=candle_type) for since in
range(since_ms, arrow.utcnow().int_timestamp * 1000, one_call)]
data: List = []
@ -1407,13 +1382,10 @@ class Exchange:
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, data
def refresh_latest_ohlcv(
self,
pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None,
cache: bool = True,
price: Optional[str] = None
) -> Dict[Tuple[str, str], DataFrame]:
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True,
candle_type: Optional[str] = ""
) -> Dict[Tuple[str, str], DataFrame]:
"""
Refresh in-memory OHLCV asynchronously and set `_klines` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
@ -1421,7 +1393,6 @@ class Exchange:
:param pair_list: List of 2 element tuples containing pair, interval to refresh
:param since_ms: time since when to download, in milliseconds
:param cache: Assign result to _klines. Usefull for one-off downloads like for pairlists
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
:return: Dict of [{(pair, timeframe): Dataframe}]
"""
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
@ -1445,7 +1416,7 @@ class Exchange:
else:
# One call ... "regular" refresh
input_coroutines.append(self._async_get_candle_history(
pair, timeframe, since_ms=since_ms, price=price))
pair, timeframe, since_ms=since_ms, candle_type=candle_type,))
else:
logger.debug(
"Using cached candle (OHLCV) data for pair %s, timeframe %s ...",
@ -1470,7 +1441,8 @@ class Exchange:
# keeping parsed dataframe in cache
ohlcv_df = ohlcv_to_dataframe(
ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=self._ohlcv_partial_candle)
drop_incomplete=self._ohlcv_partial_candle,
candle_type=candle_type)
results_df[(pair, timeframe)] = ohlcv_df
if cache:
self._klines[(pair, timeframe)] = ohlcv_df
@ -1493,11 +1465,11 @@ class Exchange:
pair: str,
timeframe: str,
since_ms: Optional[int] = None,
price: Optional[str] = None
candle_type: Optional[str] = "",
) -> Tuple[str, str, List]:
"""
Asynchronously get candle history data using fetch_ohlcv
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
:param candle_type: "mark" if retrieving the mark price cnadles, "index" for index price candles
returns tuple: (pair, timeframe, ohlcv_list)
"""
try:
@ -1507,12 +1479,12 @@ class Exchange:
"Fetching pair %s, interval %s, since %s %s...",
pair, timeframe, since_ms, s
)
# TODO-lev: Does this put price into params correctly?
params = self._ft_has.get('ohlcv_params', {price: price})
params = self._ft_has.get('ohlcv_params', {})
data = await self._api_async.fetch_ohlcv(pair, timeframe=timeframe,
since=since_ms,
limit=self.ohlcv_candle_limit(timeframe),
params=params)
params=params,
candle_type=candle_type)
# Some exchanges sort OHLCV in ASC order and others in DESC.
# Ex: Bittrex returns the list of OHLCV in ASC order (oldest first, newest last)

View File

@ -1568,7 +1568,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name):
]
pair = 'ETH/BTC'
async def mock_candle_hist(pair, timeframe, since_ms, price=None):
async def mock_candle_hist(pair, timeframe, since_ms, candle_type=None):
return pair, timeframe, ohlcv
exchange._async_get_candle_history = Mock(wraps=mock_candle_hist)
@ -1625,7 +1625,7 @@ def test_get_historic_ohlcv_as_df(default_conf, mocker, exchange_name):
]
pair = 'ETH/BTC'
async def mock_candle_hist(pair, timeframe, since_ms, price=None):
async def mock_candle_hist(pair, timeframe, since_ms, candle_type=None):
return pair, timeframe, ohlcv
exchange._async_get_candle_history = Mock(wraps=mock_candle_hist)