Use List of Lists instead of list of Dicts for trades data

This commit is contained in:
Matthias 2020-03-31 20:20:10 +02:00
parent 1659ddcc5d
commit d76bb1ccf4
4 changed files with 58 additions and 34 deletions

View File

@ -261,10 +261,14 @@ def _download_trades_history(exchange: Exchange,
trades = data_handler.trades_load(pair) trades = data_handler.trades_load(pair)
from_id = trades[-1]['id'] if trades else None # TradesList columns are defined in constants.DEFAULT_TRADES_COLUMNS
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = trades[-1][1] if trades else None
logger.debug("Current Start: %s", trades[0]['datetime'] if trades else 'None') logger.debug("Current Start: %s", trades[0][0] if trades else 'None')
logger.debug("Current End: %s", trades[-1]['datetime'] if trades else 'None') logger.debug("Current End: %s", trades[-1][0] if trades else 'None')
logger.info(f"Current Amount of trades: {len(trades)}")
# Default since_ms to 30 days if nothing is given # Default since_ms to 30 days if nothing is given
new_trades = exchange.get_historic_trades(pair=pair, new_trades = exchange.get_historic_trades(pair=pair,
@ -276,8 +280,8 @@ def _download_trades_history(exchange: Exchange,
trades.extend(new_trades[1]) trades.extend(new_trades[1])
data_handler.trades_store(pair, data=trades) data_handler.trades_store(pair, data=trades)
logger.debug("New Start: %s", trades[0]['datetime']) logger.debug("New Start: %s", trades[0][0])
logger.debug("New End: %s", trades[-1]['datetime']) logger.debug("New End: %s", trades[-1][0])
logger.info(f"New Amount of trades: {len(trades)}") logger.info(f"New Amount of trades: {len(trades)}")
return True return True

View File

@ -8,7 +8,7 @@ from abc import ABC, abstractclassmethod, abstractmethod
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Type from typing import List, Optional, Type
from pandas import DataFrame from pandas import DataFrame
@ -18,6 +18,9 @@ from freqtrade.exchange import timeframe_to_seconds
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Type for trades list
TradeList = List[List]
class IDataHandler(ABC): class IDataHandler(ABC):
@ -89,23 +92,25 @@ class IDataHandler(ABC):
""" """
@abstractmethod @abstractmethod
def trades_store(self, pair: str, data: List[Dict]) -> None: def trades_store(self, pair: str, data: TradeList) -> None:
""" """
Store trades data (list of Dicts) to file Store trades data (list of Dicts) to file
:param pair: Pair - used for filename :param pair: Pair - used for filename
:param data: List of Dicts containing trade data :param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
""" """
@abstractmethod @abstractmethod
def trades_append(self, pair: str, data: List[Dict]): def trades_append(self, pair: str, data: TradeList):
""" """
Append data to existing files Append data to existing files
:param pair: Pair - used for filename :param pair: Pair - used for filename
:param data: List of Dicts containing trade data :param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
""" """
@abstractmethod @abstractmethod
def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> List[Dict]: def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
""" """
Load a pair from file, either .json.gz or .json Load a pair from file, either .json.gz or .json
:param pair: Load trades for this pair :param pair: Load trades for this pair

View File

@ -1,6 +1,7 @@
import logging
import re import re
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import List, Optional
import numpy as np import numpy as np
from pandas import DataFrame, read_json, to_datetime from pandas import DataFrame, read_json, to_datetime
@ -8,8 +9,11 @@ from pandas import DataFrame, read_json, to_datetime
from freqtrade import misc from freqtrade import misc
from freqtrade.configuration import TimeRange from freqtrade.configuration import TimeRange
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.converter import trades_dict_to_list
from .idatahandler import IDataHandler from .idatahandler import IDataHandler, TradeList
logger = logging.getLogger(__name__)
class JsonDataHandler(IDataHandler): class JsonDataHandler(IDataHandler):
@ -113,24 +117,26 @@ class JsonDataHandler(IDataHandler):
# Check if regex found something and only return these results to avoid exceptions. # Check if regex found something and only return these results to avoid exceptions.
return [match[0].replace('_', '/') for match in _tmp if match] return [match[0].replace('_', '/') for match in _tmp if match]
def trades_store(self, pair: str, data: List[Dict]) -> None: def trades_store(self, pair: str, data: TradeList) -> None:
""" """
Store trades data (list of Dicts) to file Store trades data (list of Dicts) to file
:param pair: Pair - used for filename :param pair: Pair - used for filename
:param data: List of Dicts containing trade data :param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
""" """
filename = self._pair_trades_filename(self._datadir, pair) filename = self._pair_trades_filename(self._datadir, pair)
misc.file_dump_json(filename, data, is_zip=self._use_zip) misc.file_dump_json(filename, data, is_zip=self._use_zip)
def trades_append(self, pair: str, data: List[Dict]): def trades_append(self, pair: str, data: TradeList):
""" """
Append data to existing files Append data to existing files
:param pair: Pair - used for filename :param pair: Pair - used for filename
:param data: List of Dicts containing trade data :param data: List of Lists containing trade data,
column sequence as in DEFAULT_TRADES_COLUMNS
""" """
raise NotImplementedError() raise NotImplementedError()
def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> List[Dict]: def trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
""" """
Load a pair from file, either .json.gz or .json Load a pair from file, either .json.gz or .json
# TODO: respect timerange ... # TODO: respect timerange ...
@ -140,9 +146,15 @@ class JsonDataHandler(IDataHandler):
""" """
filename = self._pair_trades_filename(self._datadir, pair) filename = self._pair_trades_filename(self._datadir, pair)
tradesdata = misc.file_load_json(filename) tradesdata = misc.file_load_json(filename)
if not tradesdata: if not tradesdata:
return [] return []
if isinstance(tradesdata[0], dict):
# Convert trades dict to list
logger.info("Old trades format detected - converting")
tradesdata = trades_dict_to_list(tradesdata)
pass
return tradesdata return tradesdata
def trades_purge(self, pair: str) -> bool: def trades_purge(self, pair: str) -> bool:

View File

@ -18,13 +18,12 @@ from ccxt.base.decimal_to_precision import (ROUND_DOWN, ROUND_UP, TICK_SIZE,
TRUNCATE, decimal_to_precision) TRUNCATE, decimal_to_precision)
from pandas import DataFrame from pandas import DataFrame
from freqtrade.data.converter import ohlcv_to_dataframe from freqtrade.data.converter import ohlcv_to_dataframe, trades_dict_to_list
from freqtrade.exceptions import (DependencyException, InvalidOrderException, from freqtrade.exceptions import (DependencyException, InvalidOrderException,
OperationalException, TemporaryError) OperationalException, TemporaryError)
from freqtrade.exchange.common import BAD_EXCHANGES, retrier, retrier_async from freqtrade.exchange.common import BAD_EXCHANGES, retrier, retrier_async
from freqtrade.misc import deep_merge_dicts from freqtrade.misc import deep_merge_dicts
CcxtModuleType = Any CcxtModuleType = Any
@ -758,7 +757,7 @@ class Exchange:
@retrier_async @retrier_async
async def _async_fetch_trades(self, pair: str, async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None, since: Optional[int] = None,
params: Optional[dict] = None) -> List[Dict]: params: Optional[dict] = None) -> List[List]:
""" """
Asyncronously gets trade history using fetch_trades. Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange. Handles exchange errors, does one call to the exchange.
@ -778,7 +777,7 @@ class Exchange:
'(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else '' '(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else ''
) )
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000) trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
return trades return trades_dict_to_list(trades)
except ccxt.NotSupported as e: except ccxt.NotSupported as e:
raise OperationalException( raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.' f'Exchange {self._api.name} does not support fetching historical trade data.'
@ -792,7 +791,7 @@ class Exchange:
async def _async_get_trade_history_id(self, pair: str, async def _async_get_trade_history_id(self, pair: str,
until: int, until: int,
since: Optional[int] = None, since: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]: from_id: Optional[str] = None) -> Tuple[str, List[List]]:
""" """
Asyncronously gets trade history using fetch_trades Asyncronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`) use this when exchange uses id-based iteration (check `self._trades_pagination`)
@ -803,7 +802,7 @@ class Exchange:
returns tuple: (pair, trades-list) returns tuple: (pair, trades-list)
""" """
trades: List[Dict] = [] trades: List[List] = []
if not from_id: if not from_id:
# Fetch first elements using timebased method to get an ID to paginate on # Fetch first elements using timebased method to get an ID to paginate on
@ -812,7 +811,9 @@ class Exchange:
# e.g. Binance returns the "last 1000" candles within a 1h time interval # e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades. # - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since) t = await self._async_fetch_trades(pair, since=since)
from_id = t[-1]['id'] # DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = t[-1][1]
trades.extend(t[:-1]) trades.extend(t[:-1])
while True: while True:
t = await self._async_fetch_trades(pair, t = await self._async_fetch_trades(pair,
@ -820,21 +821,21 @@ class Exchange:
if len(t): if len(t):
# Skip last id since its the key for the next call # Skip last id since its the key for the next call
trades.extend(t[:-1]) trades.extend(t[:-1])
if from_id == t[-1]['id'] or t[-1]['timestamp'] > until: if from_id == t[-1][1] or t[-1][0] > until:
logger.debug(f"Stopping because from_id did not change. " logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1]['timestamp']} > {until}") f"Reached {t[-1][0]} > {until}")
# Reached the end of the defined-download period - add last trade as well. # Reached the end of the defined-download period - add last trade as well.
trades.extend(t[-1:]) trades.extend(t[-1:])
break break
from_id = t[-1]['id'] from_id = t[-1][1]
else: else:
break break
return (pair, trades) return (pair, trades)
async def _async_get_trade_history_time(self, pair: str, until: int, async def _async_get_trade_history_time(self, pair: str, until: int,
since: Optional[int] = None) -> Tuple[str, List]: since: Optional[int] = None) -> Tuple[str, List[List]]:
""" """
Asyncronously gets trade history using fetch_trades, Asyncronously gets trade history using fetch_trades,
when the exchange uses time-based iteration (check `self._trades_pagination`) when the exchange uses time-based iteration (check `self._trades_pagination`)
@ -844,16 +845,18 @@ class Exchange:
returns tuple: (pair, trades-list) returns tuple: (pair, trades-list)
""" """
trades: List[Dict] = [] trades: List[List] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
while True: while True:
t = await self._async_fetch_trades(pair, since=since) t = await self._async_fetch_trades(pair, since=since)
if len(t): if len(t):
since = t[-1]['timestamp'] since = t[-1][1]
trades.extend(t) trades.extend(t)
# Reached the end of the defined-download period # Reached the end of the defined-download period
if until and t[-1]['timestamp'] > until: if until and t[-1][0] > until:
logger.debug( logger.debug(
f"Stopping because until was reached. {t[-1]['timestamp']} > {until}") f"Stopping because until was reached. {t[-1][0]} > {until}")
break break
else: else:
break break
@ -863,7 +866,7 @@ class Exchange:
async def _async_get_trade_history(self, pair: str, async def _async_get_trade_history(self, pair: str,
since: Optional[int] = None, since: Optional[int] = None,
until: Optional[int] = None, until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]: from_id: Optional[str] = None) -> Tuple[str, List[List]]:
""" """
Async wrapper handling downloading trades using either time or id based methods. Async wrapper handling downloading trades using either time or id based methods.
""" """