2018-02-04 07:33:54 +00:00
|
|
|
"""
|
|
|
|
Various tool function for Freqtrade and scripts
|
|
|
|
"""
|
2018-07-04 07:31:35 +00:00
|
|
|
import gzip
|
2017-11-08 21:43:47 +00:00
|
|
|
import logging
|
2018-03-17 21:44:47 +00:00
|
|
|
import re
|
2018-01-21 12:44:30 +00:00
|
|
|
from datetime import datetime
|
2019-08-16 11:04:07 +00:00
|
|
|
from pathlib import Path
|
2020-02-02 04:00:40 +00:00
|
|
|
from typing import Any
|
2019-08-21 04:58:56 +00:00
|
|
|
from typing.io import IO
|
2018-03-17 21:12:42 +00:00
|
|
|
|
2018-01-21 12:44:30 +00:00
|
|
|
import numpy as np
|
2018-12-28 09:01:16 +00:00
|
|
|
import rapidjson
|
2017-11-08 21:43:47 +00:00
|
|
|
|
2020-09-28 17:39:41 +00:00
|
|
|
|
2017-11-11 15:47:19 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2017-09-08 22:31:40 +00:00
|
|
|
|
2018-03-17 21:12:42 +00:00
|
|
|
def shorten_date(_date: str) -> str:
|
2018-02-04 07:33:54 +00:00
|
|
|
"""
|
|
|
|
Trim the date so it fits on small screens
|
|
|
|
"""
|
|
|
|
new_date = re.sub('seconds?', 'sec', _date)
|
|
|
|
new_date = re.sub('minutes?', 'min', new_date)
|
|
|
|
new_date = re.sub('hours?', 'h', new_date)
|
|
|
|
new_date = re.sub('days?', 'd', new_date)
|
|
|
|
new_date = re.sub('^an?', '1', new_date)
|
|
|
|
return new_date
|
2017-09-08 22:31:40 +00:00
|
|
|
|
|
|
|
|
2018-01-21 12:44:30 +00:00
|
|
|
############################################
|
|
|
|
# Used by scripts #
|
|
|
|
# Matplotlib doesn't support ::datetime64, #
|
|
|
|
# so we need to convert it into ::datetime #
|
|
|
|
############################################
|
2018-03-17 21:12:42 +00:00
|
|
|
def datesarray_to_datetimearray(dates: np.ndarray) -> np.ndarray:
|
2018-01-21 12:44:30 +00:00
|
|
|
"""
|
|
|
|
Convert an pandas-array of timestamps into
|
|
|
|
An numpy-array of datetimes
|
|
|
|
:return: numpy-array of datetime
|
|
|
|
"""
|
2019-01-27 09:47:02 +00:00
|
|
|
return dates.dt.to_pydatetime()
|
2018-01-21 12:44:30 +00:00
|
|
|
|
|
|
|
|
2020-10-03 11:27:06 +00:00
|
|
|
def file_dump_json(filename: Path, data: Any, is_zip: bool = False, log: bool = True) -> None:
|
2017-09-08 22:31:40 +00:00
|
|
|
"""
|
2018-02-04 07:33:54 +00:00
|
|
|
Dump JSON data into a file
|
|
|
|
:param filename: file to create
|
|
|
|
:param data: JSON Data to save
|
2017-09-08 22:31:40 +00:00
|
|
|
:return:
|
|
|
|
"""
|
2018-04-22 07:57:48 +00:00
|
|
|
|
2018-03-31 15:28:54 +00:00
|
|
|
if is_zip:
|
2019-08-16 11:04:07 +00:00
|
|
|
if filename.suffix != '.gz':
|
|
|
|
filename = filename.with_suffix('.gz')
|
2020-10-03 11:27:06 +00:00
|
|
|
if log:
|
|
|
|
logger.info(f'dumping json to "{filename}"')
|
2019-12-25 09:35:23 +00:00
|
|
|
|
2018-03-30 21:30:23 +00:00
|
|
|
with gzip.open(filename, 'w') as fp:
|
2018-12-28 09:01:16 +00:00
|
|
|
rapidjson.dump(data, fp, default=str, number_mode=rapidjson.NM_NATIVE)
|
2018-03-31 15:28:54 +00:00
|
|
|
else:
|
2020-10-03 11:27:06 +00:00
|
|
|
if log:
|
|
|
|
logger.info(f'dumping json to "{filename}"')
|
2018-03-31 15:28:54 +00:00
|
|
|
with open(filename, 'w') as fp:
|
2018-12-28 09:01:16 +00:00
|
|
|
rapidjson.dump(data, fp, default=str, number_mode=rapidjson.NM_NATIVE)
|
2018-03-25 11:38:17 +00:00
|
|
|
|
2018-12-28 09:46:48 +00:00
|
|
|
logger.debug(f'done json to "{filename}"')
|
|
|
|
|
2018-03-25 11:38:17 +00:00
|
|
|
|
2020-02-02 04:00:40 +00:00
|
|
|
def json_load(datafile: IO) -> Any:
|
2018-12-28 09:04:28 +00:00
|
|
|
"""
|
|
|
|
load data with rapidjson
|
|
|
|
Use this to have a consistent experience,
|
|
|
|
sete number_mode to "NM_NATIVE" for greatest speed
|
|
|
|
"""
|
2018-12-28 09:25:12 +00:00
|
|
|
return rapidjson.load(datafile, number_mode=rapidjson.NM_NATIVE)
|
|
|
|
|
|
|
|
|
|
|
|
def file_load_json(file):
|
|
|
|
|
2019-08-25 12:30:02 +00:00
|
|
|
if file.suffix != ".gz":
|
|
|
|
gzipfile = file.with_suffix(file.suffix + '.gz')
|
|
|
|
else:
|
|
|
|
gzipfile = file
|
2018-12-28 09:25:12 +00:00
|
|
|
# Try gzip file first, otherwise regular json file.
|
|
|
|
if gzipfile.is_file():
|
2020-03-08 10:35:31 +00:00
|
|
|
logger.debug(f"Loading historical data from file {gzipfile}")
|
|
|
|
with gzip.open(gzipfile) as datafile:
|
|
|
|
pairdata = json_load(datafile)
|
2018-12-28 09:25:12 +00:00
|
|
|
elif file.is_file():
|
2020-03-08 10:35:31 +00:00
|
|
|
logger.debug(f"Loading historical data from file {file}")
|
|
|
|
with open(file) as datafile:
|
|
|
|
pairdata = json_load(datafile)
|
2018-12-28 09:25:12 +00:00
|
|
|
else:
|
|
|
|
return None
|
|
|
|
return pairdata
|
2018-12-28 09:04:28 +00:00
|
|
|
|
|
|
|
|
2020-01-04 02:07:51 +00:00
|
|
|
def pair_to_filename(pair: str) -> str:
|
2020-01-05 09:36:08 +00:00
|
|
|
for ch in ['/', '-', ' ', '.', '@', '$', '+', ':']:
|
2020-01-04 02:07:51 +00:00
|
|
|
pair = pair.replace(ch, '_')
|
|
|
|
return pair
|
|
|
|
|
|
|
|
|
2018-05-30 20:38:09 +00:00
|
|
|
def format_ms_time(date: int) -> str:
|
2018-03-25 11:38:17 +00:00
|
|
|
"""
|
|
|
|
convert MS date to readable format.
|
|
|
|
: epoch-string in ms
|
|
|
|
"""
|
|
|
|
return datetime.fromtimestamp(date/1000.0).strftime('%Y-%m-%dT%H:%M:%S')
|
2019-02-19 12:14:47 +00:00
|
|
|
|
|
|
|
|
|
|
|
def deep_merge_dicts(source, destination):
|
|
|
|
"""
|
2019-06-09 12:04:19 +00:00
|
|
|
Values from Source override destination, destination is returned (and modified!!)
|
|
|
|
Sample:
|
2019-02-19 12:14:47 +00:00
|
|
|
>>> a = { 'first' : { 'rows' : { 'pass' : 'dog', 'number' : '1' } } }
|
|
|
|
>>> b = { 'first' : { 'rows' : { 'fail' : 'cat', 'number' : '5' } } }
|
|
|
|
>>> merge(b, a) == { 'first' : { 'rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } }
|
|
|
|
True
|
|
|
|
"""
|
|
|
|
for key, value in source.items():
|
|
|
|
if isinstance(value, dict):
|
|
|
|
# get node or create one
|
|
|
|
node = destination.setdefault(key, {})
|
|
|
|
deep_merge_dicts(value, node)
|
|
|
|
else:
|
|
|
|
destination[key] = value
|
|
|
|
|
|
|
|
return destination
|
2019-08-20 19:17:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
def round_dict(d, n):
|
|
|
|
"""
|
|
|
|
Rounds float values in the dict to n digits after the decimal point.
|
|
|
|
"""
|
|
|
|
return {k: (round(v, n) if isinstance(v, float) else v) for k, v in d.items()}
|
2019-10-13 10:12:20 +00:00
|
|
|
|
|
|
|
|
2020-07-15 17:49:51 +00:00
|
|
|
def safe_value_fallback(obj: dict, key1: str, key2: str, default_value=None):
|
|
|
|
"""
|
|
|
|
Search a value in obj, return this if it's not None.
|
|
|
|
Then search key2 in obj - return that if it's not none - then use default_value.
|
|
|
|
Else falls back to None.
|
|
|
|
"""
|
|
|
|
if key1 in obj and obj[key1] is not None:
|
|
|
|
return obj[key1]
|
|
|
|
else:
|
|
|
|
if key2 in obj and obj[key2] is not None:
|
|
|
|
return obj[key2]
|
|
|
|
return default_value
|
|
|
|
|
|
|
|
|
|
|
|
def safe_value_fallback2(dict1: dict, dict2: dict, key1: str, key2: str, default_value=None):
|
2020-04-09 17:34:48 +00:00
|
|
|
"""
|
|
|
|
Search a value in dict1, return this if it's not None.
|
|
|
|
Fall back to dict2 - return key2 from dict2 if it's not None.
|
|
|
|
Else falls back to None.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if key1 in dict1 and dict1[key1] is not None:
|
|
|
|
return dict1[key1]
|
|
|
|
else:
|
|
|
|
if key2 in dict2 and dict2[key2] is not None:
|
|
|
|
return dict2[key2]
|
|
|
|
return default_value
|
|
|
|
|
|
|
|
|
2020-02-02 04:00:40 +00:00
|
|
|
def plural(num: float, singular: str, plural: str = None) -> str:
|
2019-10-13 10:12:20 +00:00
|
|
|
return singular if (num == 1 or num == -1) else plural or singular + 's'
|
2019-11-01 15:04:44 +00:00
|
|
|
|
|
|
|
|
2020-02-02 04:00:40 +00:00
|
|
|
def render_template(templatefile: str, arguments: dict = {}) -> str:
|
2019-11-01 15:04:44 +00:00
|
|
|
|
|
|
|
from jinja2 import Environment, PackageLoader, select_autoescape
|
|
|
|
|
|
|
|
env = Environment(
|
|
|
|
loader=PackageLoader('freqtrade', 'templates'),
|
|
|
|
autoescape=select_autoescape(['html', 'xml'])
|
|
|
|
)
|
2019-11-16 21:00:50 +00:00
|
|
|
template = env.get_template(templatefile)
|
2019-11-01 15:04:44 +00:00
|
|
|
return template.render(**arguments)
|
2020-03-01 08:30:30 +00:00
|
|
|
|
|
|
|
|
|
|
|
def render_template_with_fallback(templatefile: str, templatefallbackfile: str,
|
|
|
|
arguments: dict = {}) -> str:
|
|
|
|
"""
|
|
|
|
Use templatefile if possible, otherwise fall back to templatefallbackfile
|
|
|
|
"""
|
|
|
|
from jinja2.exceptions import TemplateNotFound
|
|
|
|
try:
|
|
|
|
return render_template(templatefile, arguments)
|
|
|
|
except TemplateNotFound:
|
|
|
|
return render_template(templatefallbackfile, arguments)
|