Rename datahandler module to history module

Also move previous history.py into this module - so everything is
bundled
This commit is contained in:
Matthias
2019-12-28 09:59:47 +01:00
parent b37b5c3d90
commit 6860491189
8 changed files with 71 additions and 68 deletions

View File

@@ -0,0 +1,14 @@
"""
Handle historic data (ohlcv).
Includes:
* load data for a pair (or a list of pairs) from disk
* download data from exchange and store to disk
"""
from .history_utils import (convert_trades_to_ohlcv, # noqa: F401
get_timerange, load_data, load_pair_history,
refresh_backtest_ohlcv_data,
refresh_backtest_trades_data, refresh_data,
validate_backtest_data)
from .idatahandler import get_datahandler, get_datahandlerclass # noqa: F401

View File

@@ -0,0 +1,375 @@
import logging
import operator
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import arrow
from pandas import DataFrame
from freqtrade import OperationalException
from freqtrade.configuration import TimeRange
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.data.converter import parse_ticker_dataframe, trades_to_ohlcv
from freqtrade.data.history.idatahandler import IDataHandler, get_datahandler
from freqtrade.exchange import Exchange
logger = logging.getLogger(__name__)
def load_pair_history(pair: str,
timeframe: str,
datadir: Path, *,
timerange: Optional[TimeRange] = None,
fill_up_missing: bool = True,
drop_incomplete: bool = True,
startup_candles: int = 0,
data_format: str = None,
data_handler: IDataHandler = None,
) -> DataFrame:
"""
Load cached ticker history for the given pair.
:param pair: Pair to load data for
:param timeframe: Ticker timeframe (e.g. "5m")
:param datadir: Path to the data storage location.
:param data_format: Format of the data. Ignored if data_handler is set.
:param timerange: Limit data to be loaded to this timerange
:param fill_up_missing: Fill missing values with "No action"-candles
:param drop_incomplete: Drop last candle assuming it may be incomplete.
:param startup_candles: Additional candles to load at the start of the period
:param data_handler: Initialized data-handler to use.
Will be initialized from data_format if not set
:return: DataFrame with ohlcv data, or empty DataFrame
"""
data_handler = get_datahandler(datadir, data_format, data_handler)
return data_handler.ohlcv_load(pair=pair,
timeframe=timeframe,
timerange=timerange,
fill_missing=fill_up_missing,
drop_incomplete=drop_incomplete,
startup_candles=startup_candles,
)
def load_data(datadir: Path,
timeframe: str,
pairs: List[str], *,
timerange: Optional[TimeRange] = None,
fill_up_missing: bool = True,
startup_candles: int = 0,
fail_without_data: bool = False,
data_format: str = 'json',
) -> Dict[str, DataFrame]:
"""
Load ticker history data for a list of pairs.
:param datadir: Path to the data storage location.
:param timeframe: Ticker Timeframe (e.g. "5m")
:param pairs: List of pairs to load
:param timerange: Limit data to be loaded to this timerange
:param fill_up_missing: Fill missing values with "No action"-candles
:param startup_candles: Additional candles to load at the start of the period
:param fail_without_data: Raise OperationalException if no data is found.
:param data_handler: Initialized data-handler to use.
:return: dict(<pair>:<Dataframe>)
"""
result: Dict[str, DataFrame] = {}
if startup_candles > 0 and timerange:
logger.info(f'Using indicator startup period: {startup_candles} ...')
data_handler = get_datahandler(datadir, data_format)
for pair in pairs:
hist = load_pair_history(pair=pair, timeframe=timeframe,
datadir=datadir, timerange=timerange,
fill_up_missing=fill_up_missing,
startup_candles=startup_candles,
data_handler=data_handler
)
if not hist.empty:
result[pair] = hist
if fail_without_data and not result:
raise OperationalException("No data found. Terminating.")
return result
def refresh_data(datadir: Path,
timeframe: str,
pairs: List[str],
exchange: Exchange,
data_format: str = None,
timerange: Optional[TimeRange] = None,
) -> None:
"""
Refresh ticker history data for a list of pairs.
:param datadir: Path to the data storage location.
:param timeframe: Ticker Timeframe (e.g. "5m")
:param pairs: List of pairs to load
:param exchange: Exchange object
:param timerange: Limit data to be loaded to this timerange
"""
data_handler = get_datahandler(datadir, data_format)
for pair in pairs:
_download_pair_history(pair=pair, timeframe=timeframe,
datadir=datadir, timerange=timerange,
exchange=exchange, data_handler=data_handler)
def _load_cached_data_for_updating(pair: str, timeframe: str, timerange: Optional[TimeRange],
data_handler: IDataHandler) -> Tuple[DataFrame, Optional[int]]:
"""
Load cached data to download more data.
If timerange is passed in, checks whether data from an before the stored data will be
downloaded.
If that's the case then what's available should be completely overwritten.
Otherwise downloads always start at the end of the available data to avoid data gaps.
Note: Only used by download_pair_history().
"""
start = None
if timerange:
if timerange.starttype == 'date':
# TODO: convert to date for conversation
start = datetime.fromtimestamp(timerange.startts, tz=timezone.utc)
# Intentionally don't pass timerange in - since we need to load the full dataset.
data = data_handler.ohlcv_load(pair, timeframe=timeframe,
timerange=None, fill_missing=False,
drop_incomplete=True, warn_no_data=False)
if not data.empty:
if start and start < data.iloc[0]['date']:
# Earlier data than existing data requested, redownload all
data = DataFrame(columns=DEFAULT_DATAFRAME_COLUMNS)
else:
start = data.iloc[-1]['date']
start_ms = int(start.timestamp() * 1000) if start else None
return data, start_ms
def _download_pair_history(datadir: Path,
exchange: Exchange,
pair: str, *,
timeframe: str = '5m',
timerange: Optional[TimeRange] = None,
data_handler: IDataHandler = None) -> bool:
"""
Download latest candles from the exchange for the pair and timeframe passed in parameters
The data is downloaded starting from the last correct data that
exists in a cache. If timerange starts earlier than the data in the cache,
the full data will be redownloaded
Based on @Rybolov work: https://github.com/rybolov/freqtrade-data
:param pair: pair to download
:param timeframe: Ticker Timeframe (e.g 5m)
:param timerange: range of time to download
:return: bool with success state
"""
data_handler = get_datahandler(datadir, data_handler=data_handler)
try:
logger.info(
f'Download history data for pair: "{pair}", timeframe: {timeframe} '
f'and store in {datadir}.'
)
# data, since_ms = _load_cached_data_for_updating_old(datadir, pair, timeframe, timerange)
data, since_ms = _load_cached_data_for_updating(pair, timeframe, timerange,
data_handler=data_handler)
logger.debug("Current Start: %s",
f"{data.iloc[0]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')
logger.debug("Current End: %s",
f"{data.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')
# Default since_ms to 30 days if nothing is given
new_data = exchange.get_historic_ohlcv(pair=pair,
timeframe=timeframe,
since_ms=since_ms if since_ms else
int(arrow.utcnow().shift(
days=-30).float_timestamp) * 1000
)
# TODO: Maybe move parsing to exchange class (?)
new_dataframe = parse_ticker_dataframe(new_data, timeframe, pair,
fill_missing=False, drop_incomplete=True)
if data.empty:
data = new_dataframe
else:
data = data.append(new_dataframe)
logger.debug("New Start: %s",
f"{data.iloc[0]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')
logger.debug("New End: %s",
f"{data.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}" if not data.empty else 'None')
data_handler.ohlcv_store(pair, timeframe, data=data)
return True
except Exception as e:
logger.error(
f'Failed to download history data for pair: "{pair}", timeframe: {timeframe}. '
f'Error: {e}'
)
return False
def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes: List[str],
datadir: Path, timerange: Optional[TimeRange] = None,
erase=False, data_format: str = None) -> List[str]:
"""
Refresh stored ohlcv data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand.
:return: List of pairs that are not available.
"""
pairs_not_available = []
data_handler = get_datahandler(datadir, data_format)
for pair in pairs:
if pair not in exchange.markets:
pairs_not_available.append(pair)
logger.info(f"Skipping pair {pair}...")
continue
for timeframe in timeframes:
if erase:
if data_handler.ohlcv_purge(pair, timeframe):
logger.info(
f'Deleting existing data for pair {pair}, interval {timeframe}.')
logger.info(f'Downloading pair {pair}, interval {timeframe}.')
_download_pair_history(datadir=datadir, exchange=exchange,
pair=pair, timeframe=str(timeframe),
timerange=timerange, data_handler=data_handler)
return pairs_not_available
def _download_trades_history(exchange: Exchange,
pair: str, *,
timerange: Optional[TimeRange] = None,
data_handler: IDataHandler
) -> bool:
"""
Download trade history from the exchange.
Appends to previously downloaded trades data.
"""
try:
since = timerange.startts * 1000 if timerange and timerange.starttype == 'date' else None
trades = data_handler.trades_load(pair)
from_id = trades[-1]['id'] if trades else None
logger.debug("Current Start: %s", trades[0]['datetime'] if trades else 'None')
logger.debug("Current End: %s", trades[-1]['datetime'] if trades else 'None')
# Default since_ms to 30 days if nothing is given
new_trades = exchange.get_historic_trades(pair=pair,
since=since if since else
int(arrow.utcnow().shift(
days=-30).float_timestamp) * 1000,
from_id=from_id,
)
trades.extend(new_trades[1])
data_handler.trades_store(pair, data=trades)
logger.debug("New Start: %s", trades[0]['datetime'])
logger.debug("New End: %s", trades[-1]['datetime'])
logger.info(f"New Amount of trades: {len(trades)}")
return True
except Exception as e:
logger.error(
f'Failed to download historic trades for pair: "{pair}". '
f'Error: {e}'
)
return False
def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir: Path,
timerange: TimeRange, erase=False,
data_format: str = 'jsongz') -> List[str]:
"""
Refresh stored trades data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand.
:return: List of pairs that are not available.
"""
pairs_not_available = []
data_handler = get_datahandler(datadir, data_format=data_format)
for pair in pairs:
if pair not in exchange.markets:
pairs_not_available.append(pair)
logger.info(f"Skipping pair {pair}...")
continue
if erase:
if data_handler.trades_purge(pair):
logger.info(f'Deleting existing data for pair {pair}.')
logger.info(f'Downloading trades for pair {pair}.')
_download_trades_history(exchange=exchange,
pair=pair,
timerange=timerange,
data_handler=data_handler)
return pairs_not_available
def convert_trades_to_ohlcv(pairs: List[str], timeframes: List[str],
datadir: Path, timerange: TimeRange, erase=False,
data_format_ohlcv: str = 'json',
data_format_trades: str = 'jsongz') -> None:
"""
Convert stored trades data to ohlcv data
"""
data_handler_trades = get_datahandler(datadir, data_format=data_format_trades)
data_handler_ohlcv = get_datahandler(datadir, data_format=data_format_ohlcv)
for pair in pairs:
trades = data_handler_trades.trades_load(pair)
for timeframe in timeframes:
if erase:
if data_handler_ohlcv.ohlcv_purge(pair, timeframe):
logger.info(f'Deleting existing data for pair {pair}, interval {timeframe}.')
ohlcv = trades_to_ohlcv(trades, timeframe)
# Store ohlcv
data_handler_ohlcv.ohlcv_store(pair, timeframe, data=ohlcv)
def get_timerange(data: Dict[str, DataFrame]) -> Tuple[arrow.Arrow, arrow.Arrow]:
"""
Get the maximum common timerange for the given backtest data.
:param data: dictionary with preprocessed backtesting data
:return: tuple containing min_date, max_date
"""
timeranges = [
(arrow.get(frame['date'].min()), arrow.get(frame['date'].max()))
for frame in data.values()
]
return (min(timeranges, key=operator.itemgetter(0))[0],
max(timeranges, key=operator.itemgetter(1))[1])
def validate_backtest_data(data: DataFrame, pair: str, min_date: datetime,
max_date: datetime, timeframe_min: int) -> bool:
"""
Validates preprocessed backtesting data for missing values and shows warnings about it that.
:param data: preprocessed backtesting data (as DataFrame)
:param pair: pair used for log output.
:param min_date: start-date of the data
:param max_date: end-date of the data
:param timeframe_min: ticker Timeframe in minutes
"""
# total difference in minutes / timeframe-minutes
expected_frames = int((max_date - min_date).total_seconds() // 60 // timeframe_min)
found_missing = False
dflen = len(data)
if dflen < expected_frames:
found_missing = True
logger.warning("%s has missing frames: expected %s, got %s, that's %s missing values",
pair, expected_frames, dflen, expected_frames - dflen)
return found_missing

View File

@@ -0,0 +1,125 @@
"""
Abstract datahandler interface.
It's subclasses handle and storing data from disk.
"""
import logging
from abc import ABC, abstractclassmethod, abstractmethod
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Type
from pandas import DataFrame
from freqtrade.configuration import TimeRange
from freqtrade.data.converter import clean_ohlcv_dataframe, trim_dataframe
from freqtrade.exchange import timeframe_to_seconds
logger = logging.getLogger(__name__)
class IDataHandler(ABC):
def __init__(self, datadir: Path) -> None:
self._datadir = datadir
# TODO: create abstract interface
def ohlcv_load(self, pair, timeframe: str,
timerange: Optional[TimeRange] = None,
fill_missing: bool = True,
drop_incomplete: bool = True,
startup_candles: int = 0,
warn_no_data: bool = True
) -> DataFrame:
"""
Load cached ticker history for the given pair.
:param pair: Pair to load data for
:param timeframe: Ticker timeframe (e.g. "5m")
:param timerange: Limit data to be loaded to this timerange
:param fill_missing: Fill missing values with "No action"-candles
:param drop_incomplete: Drop last candle assuming it may be incomplete.
:param startup_candles: Additional candles to load at the start of the period
:param warn_no_data: Log a warning message when no data is found
:return: DataFrame with ohlcv data, or empty DataFrame
"""
# Fix startup period
timerange_startup = deepcopy(timerange)
if startup_candles > 0 and timerange_startup:
timerange_startup.subtract_start(timeframe_to_seconds(timeframe) * startup_candles)
pairdf = self._ohlcv_load(pair, timeframe,
timerange=timerange_startup)
if pairdf.empty:
if warn_no_data:
logger.warning(
f'No history data for pair: "{pair}", timeframe: {timeframe}. '
'Use `freqtrade download-data` to download the data'
)
return pairdf
else:
enddate = pairdf.iloc[-1]['date']
if timerange_startup:
self._validate_pairdata(pair, pairdf, timerange_startup)
pairdf = trim_dataframe(pairdf, timerange_startup)
# incomplete candles should only be dropped if we didn't trim the end beforehand.
return clean_ohlcv_dataframe(pairdf, timeframe,
pair=pair,
fill_missing=fill_missing,
drop_incomplete=(drop_incomplete and
enddate == pairdf.iloc[-1]['date']))
def _validate_pairdata(self, pair, pairdata: DataFrame, timerange: TimeRange):
"""
Validates pairdata for missing data at start end end and logs warnings.
:param pairdata: Dataframe to validate
:param timerange: Timerange specified for start and end dates
"""
if timerange.starttype == 'date':
start = datetime.fromtimestamp(timerange.startts, tz=timezone.utc)
if pairdata.iloc[0]['date'] > start:
logger.warning(f"Missing data at start for pair {pair}, "
f"data starts at {pairdata.iloc[0]['date']:%Y-%m-%d %H:%M:%S}")
if timerange.stoptype == 'date':
stop = datetime.fromtimestamp(timerange.stopts, tz=timezone.utc)
if pairdata.iloc[-1]['date'] < stop:
logger.warning(f"Missing data at end for pair {pair}, "
f"data ends at {pairdata.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}")
def get_datahandlerclass(datatype: str) -> Type[IDataHandler]:
"""
Get datahandler class.
Could be done using Resolvers, but since this may be called often and resolvers
are rather expensive, doing this directly should improve performance.
:param datatype: datatype to use.
:return: Datahandler class
"""
if datatype == 'json':
from .jsondatahandler import JsonDataHandler
return JsonDataHandler
elif datatype == 'jsongz':
from .jsondatahandler import JsonGzDataHandler
return JsonGzDataHandler
else:
raise ValueError(f"No datahandler for datatype {datatype} available.")
def get_datahandler(datadir: Path, data_format: str = None,
data_handler: IDataHandler = None) -> IDataHandler:
"""
:param datadir: Folder to save data
:data_format: dataformat to use
:data_handler: returns this datahandler if it exists or initializes a new one
"""
if not data_handler:
HandlerClass = get_datahandlerclass(data_format or 'json')
data_handler = HandlerClass(datadir)
return data_handler

View File

@@ -0,0 +1,176 @@
import re
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from pandas import DataFrame, read_json, to_datetime
from freqtrade import misc
from freqtrade.configuration import TimeRange
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from .idatahandler import IDataHandler
class JsonDataHandler(IDataHandler):
_use_zip = False
_columns = DEFAULT_DATAFRAME_COLUMNS
@classmethod
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str) -> List[str]:
"""
Returns a list of all pairs with ohlcv data available in this datadir
for the specified timeframe
:param datadir: Directory to search for ohlcv files
:param timeframe: Timeframe to search pairs for
:return: List of Pairs
"""
_tmp = [re.search(r'^(\S+)(?=\-' + timeframe + '.json)', p.name)
for p in datadir.glob(f"*{timeframe}.{cls._get_file_extension()}")]
# Check if regex found something and only return these results
return [match[0].replace('_', '/') for match in _tmp if match]
def ohlcv_store(self, pair: str, timeframe: str, data: DataFrame) -> None:
"""
Store data in json format "values".
format looks as follows:
[[<date>,<open>,<high>,<low>,<close>]]
:param pair: Pair - used to generate filename
:timeframe: Timeframe - used to generate filename
:data: Dataframe containing OHLCV data
:return: None
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
_data = data.copy()
# Convert date to int
_data['date'] = _data['date'].astype(np.int64) // 1000 // 1000
# Reset index, select only appropriate columns and save as json
_data.reset_index(drop=True).loc[:, self._columns].to_json(
filename, orient="values",
compression='gzip' if self._use_zip else None)
def _ohlcv_load(self, pair: str, timeframe: str,
timerange: Optional[TimeRange] = None,
) -> DataFrame:
"""
Internal method used to load data for one pair from disk.
Implements the loading and conversation to a Pandas dataframe.
Timerange trimming and dataframe validation happens outside of this method.
:param pair: Pair to load data
:param timeframe: Ticker timeframe (e.g. "5m")
:param timerange: Limit data to be loaded to this timerange.
:return: DataFrame with ohlcv data, or empty DataFrame
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
if not filename.exists():
return DataFrame(columns=self._columns)
pairdata = read_json(filename, orient='values')
pairdata.columns = self._columns
pairdata['date'] = to_datetime(pairdata['date'],
unit='ms',
utc=True,
infer_datetime_format=True)
return pairdata
def ohlcv_purge(self, pair: str, timeframe: str) -> bool:
"""
Remove data for this pair
:param pair: Delete data for this pair.
:param timeframe: Ticker timeframe (e.g. "5m")
:return: True when deleted, false if file did not exist.
"""
filename = self._pair_data_filename(self._datadir, pair, timeframe)
if filename.exists():
filename.unlink()
return True
return False
def ohlcv_append(self, pair: str, timeframe: str, data: DataFrame) -> None:
"""
Append data to existing data structures
:param pair: Pair
:param timeframe: Timeframe this ohlcv data is for
:param data: Data to append.
"""
raise NotImplementedError()
@classmethod
def trades_get_pairs(cls, datadir: Path) -> List[str]:
"""
Returns a list of all pairs for which trade data is available in this
:param datadir: Directory to search for ohlcv files
:return: List of Pairs
"""
_tmp = [re.search(r'^(\S+)(?=\-trades.json)', p.name)
for p in datadir.glob(f"*trades.{cls._get_file_extension()}")]
# Check if regex found something and only return these results to avoid exceptions.
return [match[0].replace('_', '/') for match in _tmp if match]
def trades_store(self, pair: str, data: List[Dict]) -> None:
"""
Store trades data (list of Dicts) to file
:param pair: Pair - used for filename
:param data: List of Dicts containing trade data
"""
filename = self._pair_trades_filename(self._datadir, pair)
misc.file_dump_json(filename, data, is_zip=self._use_zip)
def trades_append(self, pair: str, data: List[Dict]):
"""
Append data to existing files
:param pair: Pair - used for filename
:param data: List of Dicts containing trade data
"""
raise NotImplementedError()
def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> List[Dict]:
"""
Load a pair from file, either .json.gz or .json
# TODO: respect timerange ...
:param pair: Load trades for this pair
:param timerange: Timerange to load trades for - currently not implemented
:return: List of trades
"""
filename = self._pair_trades_filename(self._datadir, pair)
tradesdata = misc.file_load_json(filename)
if not tradesdata:
return []
return tradesdata
def trades_purge(self, pair: str) -> bool:
"""
Remove data for this pair
:param pair: Delete data for this pair.
:return: True when deleted, false if file did not exist.
"""
filename = self._pair_trades_filename(self._datadir, pair)
if filename.exists():
filename.unlink()
return True
return False
@classmethod
def _pair_data_filename(cls, datadir: Path, pair: str, timeframe: str) -> Path:
pair_s = pair.replace("/", "_")
filename = datadir.joinpath(f'{pair_s}-{timeframe}.{cls._get_file_extension()}')
return filename
@classmethod
def _get_file_extension(cls):
return "json.gz" if cls._use_zip else "json"
@classmethod
def _pair_trades_filename(cls, datadir: Path, pair: str) -> Path:
pair_s = pair.replace("/", "_")
filename = datadir.joinpath(f'{pair_s}-trades.{cls._get_file_extension()}')
return filename
class JsonGzDataHandler(JsonDataHandler):
_use_zip = True