Merge pull request #3129 from freqtrade/trades_to_list

convert dl-trades datadownload to list
This commit is contained in:
Matthias
2020-05-13 09:41:26 +02:00
committed by GitHub
12 changed files with 221 additions and 90 deletions

View File

@@ -24,6 +24,9 @@ AVAILABLE_DATAHANDLERS = ['json', 'jsongz']
DRY_RUN_WALLET = 1000
MATH_CLOSE_PREC = 1e-14 # Precision used for float comparisons
DEFAULT_DATAFRAME_COLUMNS = ['date', 'open', 'high', 'low', 'close', 'volume']
# Don't modify sequence of DEFAULT_TRADES_COLUMNS
# it has wide consequences for stored trades files
DEFAULT_TRADES_COLUMNS = ['timestamp', 'id', 'type', 'side', 'price', 'amount', 'cost']
USERPATH_HYPEROPTS = 'hyperopts'
USERPATH_STRATEGIES = 'strategies'

View File

@@ -1,14 +1,17 @@
"""
Functions to convert data from one format to another
"""
import itertools
import logging
from datetime import datetime, timezone
from typing import Any, Dict
from operator import itemgetter
from typing import Any, Dict, List
import pandas as pd
from pandas import DataFrame, to_datetime
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.constants import (DEFAULT_DATAFRAME_COLUMNS,
DEFAULT_TRADES_COLUMNS)
logger = logging.getLogger(__name__)
@@ -154,7 +157,27 @@ def order_book_to_dataframe(bids: list, asks: list) -> DataFrame:
return frame
def trades_to_ohlcv(trades: list, timeframe: str) -> DataFrame:
def trades_remove_duplicates(trades: List[List]) -> List[List]:
"""
Removes duplicates from the trades list.
Uses itertools.groupby to avoid converting to pandas.
Tests show it as being pretty efficient on lists of 4M Lists.
:param trades: List of Lists with constants.DEFAULT_TRADES_COLUMNS as columns
:return: same format as above, but with duplicates removed
"""
return [i for i, _ in itertools.groupby(sorted(trades, key=itemgetter(0)))]
def trades_dict_to_list(trades: List[Dict]) -> List[List]:
"""
Convert fetch_trades result into a List (to be more memory efficient).
:param trades: List of trades, as returned by ccxt.fetch_trades.
:return: List of Lists, with constants.DEFAULT_TRADES_COLUMNS as columns
"""
return [[t[col] for col in DEFAULT_TRADES_COLUMNS] for t in trades]
def trades_to_ohlcv(trades: List, timeframe: str) -> DataFrame:
"""
Converts trades list to OHLCV list
TODO: This should get a dedicated test
@@ -164,9 +187,10 @@ def trades_to_ohlcv(trades: list, timeframe: str) -> DataFrame:
"""
from freqtrade.exchange import timeframe_to_minutes
timeframe_minutes = timeframe_to_minutes(timeframe)
df = pd.DataFrame(trades)
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.set_index('datetime')
df = pd.DataFrame(trades, columns=DEFAULT_TRADES_COLUMNS)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms',
utc=True,)
df = df.set_index('timestamp')
df_new = df['price'].resample(f'{timeframe_minutes}min').ohlc()
df_new['volume'] = df['amount'].resample(f'{timeframe_minutes}min').sum()

View File

@@ -9,10 +9,13 @@ from pandas import DataFrame
from freqtrade.configuration import TimeRange
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.data.converter import ohlcv_to_dataframe, trades_to_ohlcv
from freqtrade.data.converter import (ohlcv_to_dataframe,
trades_remove_duplicates,
trades_to_ohlcv)
from freqtrade.data.history.idatahandler import IDataHandler, get_datahandler
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import Exchange
from freqtrade.misc import format_ms_time
logger = logging.getLogger(__name__)
@@ -257,27 +260,40 @@ def _download_trades_history(exchange: Exchange,
"""
try:
since = timerange.startts * 1000 if timerange and timerange.starttype == 'date' else None
since = timerange.startts * 1000 if \
(timerange and timerange.starttype == 'date') else int(arrow.utcnow().shift(
days=-30).float_timestamp) * 1000
trades = data_handler.trades_load(pair)
from_id = trades[-1]['id'] if trades else None
# TradesList columns are defined in constants.DEFAULT_TRADES_COLUMNS
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
logger.debug("Current Start: %s", trades[0]['datetime'] if trades else 'None')
logger.debug("Current End: %s", trades[-1]['datetime'] if trades else 'None')
from_id = trades[-1][1] if trades else None
if trades and since < trades[-1][0]:
# Reset since to the last available point
# - 5 seconds (to ensure we're getting all trades)
since = trades[-1][0] - (5 * 1000)
logger.info(f"Using last trade date -5s - Downloading trades for {pair} "
f"since: {format_ms_time(since)}.")
logger.debug(f"Current Start: {format_ms_time(trades[0][0]) if trades else 'None'}")
logger.debug(f"Current End: {format_ms_time(trades[-1][0]) if trades else 'None'}")
logger.info(f"Current Amount of trades: {len(trades)}")
# Default since_ms to 30 days if nothing is given
new_trades = exchange.get_historic_trades(pair=pair,
since=since if since else
int(arrow.utcnow().shift(
days=-30).float_timestamp) * 1000,
since=since,
from_id=from_id,
)
trades.extend(new_trades[1])
# Remove duplicates to make sure we're not storing data we don't need
trades = trades_remove_duplicates(trades)
data_handler.trades_store(pair, data=trades)
logger.debug("New Start: %s", trades[0]['datetime'])
logger.debug("New End: %s", trades[-1]['datetime'])
logger.debug(f"New Start: {format_ms_time(trades[0][0])}")
logger.debug(f"New End: {format_ms_time(trades[-1][0])}")
logger.info(f"New Amount of trades: {len(trades)}")
return True

View File

@@ -8,16 +8,20 @@ from abc import ABC, abstractclassmethod, abstractmethod
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Type
from typing import List, Optional, Type
from pandas import DataFrame
from freqtrade.configuration import TimeRange
from freqtrade.data.converter import clean_ohlcv_dataframe, trim_dataframe
from freqtrade.data.converter import (clean_ohlcv_dataframe,
trades_remove_duplicates, trim_dataframe)
from freqtrade.exchange import timeframe_to_seconds
logger = logging.getLogger(__name__)
# Type for trades list
TradeList = List[List]
class IDataHandler(ABC):
@@ -89,23 +93,25 @@ class IDataHandler(ABC):
"""
@abstractmethod
def trades_store(self, pair: str, data: List[Dict]) -> None:
def trades_store(self, pair: str, data: TradeList) -> None:
"""
Store trades data (list of Dicts) to file
:param pair: Pair - used for filename
:param data: List of Dicts containing trade data
:param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
"""
@abstractmethod
def trades_append(self, pair: str, data: List[Dict]):
def trades_append(self, pair: str, data: TradeList):
"""
Append data to existing files
:param pair: Pair - used for filename
:param data: List of Dicts containing trade data
:param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
"""
@abstractmethod
def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> List[Dict]:
def _trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
"""
Load a pair from file, either .json.gz or .json
:param pair: Load trades for this pair
@@ -121,6 +127,16 @@ class IDataHandler(ABC):
:return: True when deleted, false if file did not exist.
"""
def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
"""
Load a pair from file, either .json.gz or .json
Removes duplicates in the process.
:param pair: Load trades for this pair
:param timerange: Timerange to load trades for - currently not implemented
:return: List of trades
"""
return trades_remove_duplicates(self._trades_load(pair, timerange=timerange))
def ohlcv_load(self, pair, timeframe: str,
timerange: Optional[TimeRange] = None,
fill_missing: bool = True,

View File

@@ -1,6 +1,7 @@
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional
from typing import List, Optional
import numpy as np
from pandas import DataFrame, read_json, to_datetime
@@ -8,8 +9,11 @@ from pandas import DataFrame, read_json, to_datetime
from freqtrade import misc
from freqtrade.configuration import TimeRange
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.data.converter import trades_dict_to_list
from .idatahandler import IDataHandler
from .idatahandler import IDataHandler, TradeList
logger = logging.getLogger(__name__)
class JsonDataHandler(IDataHandler):
@@ -113,24 +117,26 @@ class JsonDataHandler(IDataHandler):
# Check if regex found something and only return these results to avoid exceptions.
return [match[0].replace('_', '/') for match in _tmp if match]
def trades_store(self, pair: str, data: List[Dict]) -> None:
def trades_store(self, pair: str, data: TradeList) -> None:
"""
Store trades data (list of Dicts) to file
:param pair: Pair - used for filename
:param data: List of Dicts containing trade data
:param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
"""
filename = self._pair_trades_filename(self._datadir, pair)
misc.file_dump_json(filename, data, is_zip=self._use_zip)
def trades_append(self, pair: str, data: List[Dict]):
def trades_append(self, pair: str, data: TradeList):
"""
Append data to existing files
:param pair: Pair - used for filename
:param data: List of Dicts containing trade data
:param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
"""
raise NotImplementedError()
def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> List[Dict]:
def _trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
"""
Load a pair from file, either .json.gz or .json
# TODO: respect timerange ...
@@ -140,9 +146,15 @@ class JsonDataHandler(IDataHandler):
"""
filename = self._pair_trades_filename(self._datadir, pair)
tradesdata = misc.file_load_json(filename)
if not tradesdata:
return []
if isinstance(tradesdata[0], dict):
# Convert trades dict to list
logger.info("Old trades format detected - converting")
tradesdata = trades_dict_to_list(tradesdata)
pass
return tradesdata
def trades_purge(self, pair: str) -> bool:

View File

@@ -18,13 +18,12 @@ from ccxt.base.decimal_to_precision import (ROUND_DOWN, ROUND_UP, TICK_SIZE,
TRUNCATE, decimal_to_precision)
from pandas import DataFrame
from freqtrade.data.converter import ohlcv_to_dataframe
from freqtrade.data.converter import ohlcv_to_dataframe, trades_dict_to_list
from freqtrade.exceptions import (DependencyException, InvalidOrderException,
OperationalException, TemporaryError)
from freqtrade.exchange.common import BAD_EXCHANGES, retrier, retrier_async
from freqtrade.misc import deep_merge_dicts
CcxtModuleType = Any
@@ -769,7 +768,7 @@ class Exchange:
@retrier_async
async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None,
params: Optional[dict] = None) -> List[Dict]:
params: Optional[dict] = None) -> List[List]:
"""
Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange.
@@ -789,7 +788,7 @@ class Exchange:
'(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else ''
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
return trades
return trades_dict_to_list(trades)
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.'
@@ -803,7 +802,7 @@ class Exchange:
async def _async_get_trade_history_id(self, pair: str,
until: int,
since: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]:
from_id: Optional[str] = None) -> Tuple[str, List[List]]:
"""
Asyncronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`)
@@ -814,7 +813,7 @@ class Exchange:
returns tuple: (pair, trades-list)
"""
trades: List[Dict] = []
trades: List[List] = []
if not from_id:
# Fetch first elements using timebased method to get an ID to paginate on
@@ -823,7 +822,9 @@ class Exchange:
# e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since)
from_id = t[-1]['id']
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = t[-1][1]
trades.extend(t[:-1])
while True:
t = await self._async_fetch_trades(pair,
@@ -831,21 +832,21 @@ class Exchange:
if len(t):
# Skip last id since its the key for the next call
trades.extend(t[:-1])
if from_id == t[-1]['id'] or t[-1]['timestamp'] > until:
if from_id == t[-1][1] or t[-1][0] > until:
logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1]['timestamp']} > {until}")
f"Reached {t[-1][0]} > {until}")
# Reached the end of the defined-download period - add last trade as well.
trades.extend(t[-1:])
break
from_id = t[-1]['id']
from_id = t[-1][1]
else:
break
return (pair, trades)
async def _async_get_trade_history_time(self, pair: str, until: int,
since: Optional[int] = None) -> Tuple[str, List]:
since: Optional[int] = None) -> Tuple[str, List[List]]:
"""
Asyncronously gets trade history using fetch_trades,
when the exchange uses time-based iteration (check `self._trades_pagination`)
@@ -855,16 +856,18 @@ class Exchange:
returns tuple: (pair, trades-list)
"""
trades: List[Dict] = []
trades: List[List] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
while True:
t = await self._async_fetch_trades(pair, since=since)
if len(t):
since = t[-1]['timestamp']
since = t[-1][1]
trades.extend(t)
# Reached the end of the defined-download period
if until and t[-1]['timestamp'] > until:
if until and t[-1][0] > until:
logger.debug(
f"Stopping because until was reached. {t[-1]['timestamp']} > {until}")
f"Stopping because until was reached. {t[-1][0]} > {until}")
break
else:
break
@@ -874,7 +877,7 @@ class Exchange:
async def _async_get_trade_history(self, pair: str,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]:
from_id: Optional[str] = None) -> Tuple[str, List[List]]:
"""
Async wrapper handling downloading trades using either time or id based methods.
"""