Compare commits
43 Commits
JOSS_v2
...
add-spice-
Author | SHA1 | Date | |
---|---|---|---|
|
ecdb466887 | ||
|
011759d1b7 | ||
|
7cdd510cf9 | ||
|
1e5df9611b | ||
|
f3dcbb9736 | ||
|
06f4f2db0a | ||
|
d362332527 | ||
|
e337d4b78a | ||
|
bc09c812a8 | ||
|
0460f362fb | ||
|
d42fb15608 | ||
|
a5bf34587a | ||
|
fab6b2f105 | ||
|
1cabfe8d0a | ||
|
1595e5fd8a | ||
|
b92b98af29 | ||
|
3e08c6e540 | ||
|
6e179c7699 | ||
|
7c702dd106 | ||
|
92a1d58df8 | ||
|
f475c6c305 | ||
|
638515bce5 | ||
|
678272e2ef | ||
|
cea017e79f | ||
|
b7f26e4f96 | ||
|
02e238a944 | ||
|
edb942f662 | ||
|
9b1fb02df8 | ||
|
760f3f157d | ||
|
c31f322349 | ||
|
aca03e38f6 | ||
|
8b1e5daf22 | ||
|
7b390b8edb | ||
|
91e2a05aff | ||
|
793c54db9d | ||
|
b1e92933f4 | ||
|
12a9fda885 | ||
|
a7312dec03 | ||
|
ff300d5c85 | ||
|
4d93a6b757 | ||
|
dac07c5609 | ||
|
fb2d190865 | ||
|
b209490009 |
71
docs/freqai-spice-rack.md
Normal file
71
docs/freqai-spice-rack.md
Normal file
@@ -0,0 +1,71 @@
|
||||
# Using the `spice_rack`
|
||||
|
||||
!!! Note:
|
||||
`spice_rack` indicators should not be used exclusively for entries and exits, the following example is just a demonstration of syntax. `spice_rack` indicators should **always** be used to support existing strategies.
|
||||
|
||||
The `spice_rack` is aimed at users who do not wish to deal with setting up `FreqAI` confgs, but instead prefer to interact with `FreqAI` similar to a `talib` indicator. In this case, the user can instead simply add two keys to their config:
|
||||
|
||||
```json
|
||||
"freqai_spice_rack": true,
|
||||
"freqai_identifier": "spicey-id",
|
||||
```
|
||||
|
||||
Which tells `FreqAI` to set up a pre-set `FreqAI` instance automatically under the hood with preset parameters. Now the user can access a suite of custom `FreqAI` supercharged indicators inside their strategy by placing the following code into `populate_indicators`:
|
||||
|
||||
```python
|
||||
dataframe['dissimilarity_index'] = self.freqai.spice_rack(
|
||||
'DI_values', dataframe, metadata, self)
|
||||
dataframe['extrema'] = self.freqai.spice_rack(
|
||||
'&s-extrema', dataframe, metadata, self)
|
||||
self.freqai.close_spice_rack() # user must close the spicerack
|
||||
```
|
||||
|
||||
Users can then use these columns in concert with all their own additional indicators added to `populate_indicators` in their entry/exit criteria and strategy callback methods the same way as any typical indicator. For example:
|
||||
|
||||
```python
|
||||
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
|
||||
|
||||
df.loc[
|
||||
(
|
||||
(df['dissimilarity_index'] < 1) &
|
||||
(df['extrema'] < -0.1)
|
||||
),
|
||||
'enter_long'] = 1
|
||||
|
||||
df.loc[
|
||||
(
|
||||
(df['dissimilarity_index'] < 1) &
|
||||
(df['extrema'] > 0.1)
|
||||
),
|
||||
'enter_short'] = 1
|
||||
|
||||
return df
|
||||
|
||||
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
|
||||
|
||||
df.loc[
|
||||
(
|
||||
(df['dissimilarity_index'] < 1) &
|
||||
(df['extrema'] > 0.1)
|
||||
),
|
||||
|
||||
'exit_long'] = 1
|
||||
|
||||
df.loc[
|
||||
(
|
||||
|
||||
(df['dissimilarity_index'] < 1) &
|
||||
(df['extrema'] < -0.1)
|
||||
),
|
||||
'exit_short'] = 1
|
||||
|
||||
return df
|
||||
```
|
||||
|
||||
|
||||
## Available indicators
|
||||
|
||||
| Parameter | Description |
|
||||
|------------|-------------|
|
||||
| `DI_values` | **Required.** <br> The dissimilarity index of the current candle to the recent candles. More information available [here](freqai-feature-engineering.md#identifying-outliers-with-the-dissimilarity-index-di) <br> **Datatype:** Floats.
|
||||
| `extrema` | **Required.** <br> A continuous prediction from FreqAI which aims to help predict if the current candle is a maxima or a minma. FreqAI aims for 1 to be a maxima and -1 to be a minima - but the values should typically hover between -0.2 and 0.2. <br> **Datatype:** Floats.
|
@@ -11,7 +11,8 @@ from freqtrade.data.history import (convert_trades_to_ohlcv, refresh_backtest_oh
|
||||
refresh_backtest_trades_data)
|
||||
from freqtrade.enums import CandleType, RunMode, TradingMode
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.exchange import market_is_active, timeframe_to_minutes
|
||||
from freqtrade.exchange import Exchange, market_is_active, timeframe_to_minutes
|
||||
from freqtrade.freqai.utils import setup_freqai_spice_rack
|
||||
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist, expand_pairlist
|
||||
from freqtrade.resolvers import ExchangeResolver
|
||||
|
||||
@@ -48,6 +49,10 @@ def start_download_data(args: Dict[str, Any]) -> None:
|
||||
|
||||
# Init exchange
|
||||
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
|
||||
|
||||
if config.get('freqai_spice_rack', False):
|
||||
config = setup_freqai_spice_rack(config, exchange)
|
||||
|
||||
markets = [p for p, m in exchange.markets.items() if market_is_active(m)
|
||||
or config.get('include_inactive')]
|
||||
|
||||
@@ -63,37 +68,7 @@ def start_download_data(args: Dict[str, Any]) -> None:
|
||||
exchange.validate_timeframes(timeframe)
|
||||
|
||||
try:
|
||||
|
||||
if config.get('download_trades'):
|
||||
if config.get('trading_mode') == 'futures':
|
||||
raise OperationalException("Trade download not supported for futures.")
|
||||
pairs_not_available = refresh_backtest_trades_data(
|
||||
exchange, pairs=expanded_pairs, datadir=config['datadir'],
|
||||
timerange=timerange, new_pairs_days=config['new_pairs_days'],
|
||||
erase=bool(config.get('erase')), data_format=config['dataformat_trades'])
|
||||
|
||||
# Convert downloaded trade data to different timeframes
|
||||
convert_trades_to_ohlcv(
|
||||
pairs=expanded_pairs, timeframes=config['timeframes'],
|
||||
datadir=config['datadir'], timerange=timerange, erase=bool(config.get('erase')),
|
||||
data_format_ohlcv=config['dataformat_ohlcv'],
|
||||
data_format_trades=config['dataformat_trades'],
|
||||
)
|
||||
else:
|
||||
if not exchange.get_option('ohlcv_has_history', True):
|
||||
raise OperationalException(
|
||||
f"Historic klines not available for {exchange.name}. "
|
||||
"Please use `--dl-trades` instead for this exchange "
|
||||
"(will unfortunately take a long time)."
|
||||
)
|
||||
pairs_not_available = refresh_backtest_ohlcv_data(
|
||||
exchange, pairs=expanded_pairs, timeframes=config['timeframes'],
|
||||
datadir=config['datadir'], timerange=timerange,
|
||||
new_pairs_days=config['new_pairs_days'],
|
||||
erase=bool(config.get('erase')), data_format=config['dataformat_ohlcv'],
|
||||
trading_mode=config.get('trading_mode', 'spot'),
|
||||
prepend=config.get('prepend_data', False)
|
||||
)
|
||||
pairs_not_available = download_trades(exchange, expanded_pairs, config, timerange)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
sys.exit("SIGINT received, aborting ...")
|
||||
@@ -104,6 +79,42 @@ def start_download_data(args: Dict[str, Any]) -> None:
|
||||
f"on exchange {exchange.name}.")
|
||||
|
||||
|
||||
def download_trades(exchange: Exchange, expanded_pairs: list,
|
||||
config: Dict[str, Any], timerange: TimeRange) -> list:
|
||||
if config.get('download_trades'):
|
||||
if config.get('trading_mode') == 'futures':
|
||||
raise OperationalException("Trade download not supported for futures.")
|
||||
pairs_not_available = refresh_backtest_trades_data(
|
||||
exchange, pairs=expanded_pairs, datadir=config['datadir'],
|
||||
timerange=timerange, new_pairs_days=config['new_pairs_days'],
|
||||
erase=bool(config.get('erase')), data_format=config['dataformat_trades'])
|
||||
|
||||
# Convert downloaded trade data to different timeframes
|
||||
convert_trades_to_ohlcv(
|
||||
pairs=expanded_pairs, timeframes=config['timeframes'],
|
||||
datadir=config['datadir'], timerange=timerange, erase=bool(config.get('erase')),
|
||||
data_format_ohlcv=config['dataformat_ohlcv'],
|
||||
data_format_trades=config['dataformat_trades'],
|
||||
)
|
||||
else:
|
||||
if not exchange.get_option('ohlcv_has_history', True):
|
||||
raise OperationalException(
|
||||
f"Historic klines not available for {exchange.name}. "
|
||||
"Please use `--dl-trades` instead for this exchange "
|
||||
"(will unfortunately take a long time)."
|
||||
)
|
||||
pairs_not_available = refresh_backtest_ohlcv_data(
|
||||
exchange, pairs=expanded_pairs, timeframes=config['timeframes'],
|
||||
datadir=config['datadir'], timerange=timerange,
|
||||
new_pairs_days=config['new_pairs_days'],
|
||||
erase=bool(config.get('erase')), data_format=config['dataformat_ohlcv'],
|
||||
trading_mode=config.get('trading_mode', 'spot'),
|
||||
prepend=config.get('prepend_data', False)
|
||||
)
|
||||
|
||||
return pairs_not_available
|
||||
|
||||
|
||||
def start_convert_trades(args: Dict[str, Any]) -> None:
|
||||
|
||||
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
|
||||
|
@@ -18,12 +18,12 @@ import ccxt.async_support as ccxt_async
|
||||
from cachetools import TTLCache
|
||||
from ccxt import ROUND_DOWN, ROUND_UP, TICK_SIZE, TRUNCATE, decimal_to_precision
|
||||
from dateutil import parser
|
||||
from pandas import DataFrame
|
||||
from pandas import DataFrame, concat
|
||||
|
||||
from freqtrade.constants import (DEFAULT_AMOUNT_RESERVE_PERCENT, NON_OPEN_EXCHANGE_STATES, BuySell,
|
||||
Config, EntryExit, ListPairsWithTimeframes, MakerTaker,
|
||||
PairWithTimeframe)
|
||||
from freqtrade.data.converter import ohlcv_to_dataframe, trades_dict_to_list
|
||||
from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list
|
||||
from freqtrade.enums import OPTIMIZE_MODES, CandleType, MarginMode, TradingMode
|
||||
from freqtrade.exceptions import (DDosProtection, ExchangeError, InsufficientFundsError,
|
||||
InvalidOrderException, OperationalException, PricingError,
|
||||
@@ -184,8 +184,9 @@ class Exchange:
|
||||
# Initial markets load
|
||||
self._load_markets()
|
||||
self.validate_config(config)
|
||||
self._startup_candle_count: int = config.get('startup_candle_count', 0)
|
||||
self.required_candle_call_count = self.validate_required_startup_candles(
|
||||
config.get('startup_candle_count', 0), config.get('timeframe', ''))
|
||||
self._startup_candle_count, config.get('timeframe', ''))
|
||||
|
||||
# Converts the interval provided in minutes in config to seconds
|
||||
self.markets_refresh_interval: int = exchange_config.get(
|
||||
@@ -1850,10 +1851,22 @@ class Exchange:
|
||||
return pair, timeframe, candle_type, data
|
||||
|
||||
def _build_coroutine(self, pair: str, timeframe: str, candle_type: CandleType,
|
||||
since_ms: Optional[int]) -> Coroutine:
|
||||
since_ms: Optional[int], cache: bool) -> Coroutine:
|
||||
not_all_data = self.required_candle_call_count > 1
|
||||
if cache and (pair, timeframe, candle_type) in self._klines:
|
||||
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
|
||||
min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp()
|
||||
# Check if 1 call can get us updated candles without hole in the data.
|
||||
if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0):
|
||||
# Cache can be used - do one-off call.
|
||||
not_all_data = False
|
||||
else:
|
||||
# Time jump detected, evict cache
|
||||
logger.info(
|
||||
f"Time jump detected. Evicting cache for {pair}, {timeframe}, {candle_type}")
|
||||
del self._klines[(pair, timeframe, candle_type)]
|
||||
|
||||
if (not since_ms
|
||||
and (self._ft_has["ohlcv_require_since"] or self.required_candle_call_count > 1)):
|
||||
if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)):
|
||||
# Multiple calls for one pair - to get more history
|
||||
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
|
||||
timeframe, candle_type, since_ms)
|
||||
@@ -1878,10 +1891,8 @@ class Exchange:
|
||||
input_coroutines = []
|
||||
cached_pairs = []
|
||||
for pair, timeframe, candle_type in set(pair_list):
|
||||
if (
|
||||
timeframe not in self.timeframes
|
||||
and candle_type in (CandleType.SPOT, CandleType.FUTURES)
|
||||
):
|
||||
if (timeframe not in self.timeframes
|
||||
and candle_type in (CandleType.SPOT, CandleType.FUTURES)):
|
||||
logger.warning(
|
||||
f"Cannot download ({pair}, {timeframe}) combination as this timeframe is "
|
||||
f"not available on {self.name}. Available timeframes are "
|
||||
@@ -1890,8 +1901,9 @@ class Exchange:
|
||||
|
||||
if ((pair, timeframe, candle_type) not in self._klines or not cache
|
||||
or self._now_is_time_to_refresh(pair, timeframe, candle_type)):
|
||||
input_coroutines.append(self._build_coroutine(
|
||||
pair, timeframe, candle_type=candle_type, since_ms=since_ms))
|
||||
|
||||
input_coroutines.append(
|
||||
self._build_coroutine(pair, timeframe, candle_type, since_ms, cache))
|
||||
|
||||
else:
|
||||
logger.debug(
|
||||
@@ -1901,6 +1913,28 @@ class Exchange:
|
||||
|
||||
return input_coroutines, cached_pairs
|
||||
|
||||
def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
|
||||
cache: bool, drop_incomplete: bool) -> DataFrame:
|
||||
# keeping last candle time as last refreshed time of the pair
|
||||
if ticks and cache:
|
||||
self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000
|
||||
# keeping parsed dataframe in cache
|
||||
ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
|
||||
drop_incomplete=drop_incomplete)
|
||||
if cache:
|
||||
if (pair, timeframe, c_type) in self._klines:
|
||||
old = self._klines[(pair, timeframe, c_type)]
|
||||
# Reassign so we return the updated, combined df
|
||||
ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair,
|
||||
fill_missing=True, drop_incomplete=False)
|
||||
candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def'])
|
||||
# Age out old candles
|
||||
ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count)
|
||||
self._klines[(pair, timeframe, c_type)] = ohlcv_df
|
||||
else:
|
||||
self._klines[(pair, timeframe, c_type)] = ohlcv_df
|
||||
return ohlcv_df
|
||||
|
||||
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
|
||||
since_ms: Optional[int] = None, cache: bool = True,
|
||||
drop_incomplete: Optional[bool] = None
|
||||
@@ -1937,16 +1971,11 @@ class Exchange:
|
||||
continue
|
||||
# Deconstruct tuple (has 4 elements)
|
||||
pair, timeframe, c_type, ticks = res
|
||||
# keeping last candle time as last refreshed time of the pair
|
||||
if ticks:
|
||||
self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000
|
||||
# keeping parsed dataframe in cache
|
||||
ohlcv_df = ohlcv_to_dataframe(
|
||||
ticks, timeframe, pair=pair, fill_missing=True,
|
||||
drop_incomplete=drop_incomplete)
|
||||
ohlcv_df = self._process_ohlcv_df(
|
||||
pair, timeframe, c_type, ticks, cache, drop_incomplete)
|
||||
|
||||
results_df[(pair, timeframe, c_type)] = ohlcv_df
|
||||
if cache:
|
||||
self._klines[(pair, timeframe, c_type)] = ohlcv_df
|
||||
|
||||
# Return cached klines
|
||||
for pair, timeframe, c_type in cached_pairs:
|
||||
results_df[(pair, timeframe, c_type)] = self.klines(
|
||||
|
@@ -520,7 +520,7 @@ class FreqaiDataDrawer:
|
||||
f"Unable to load model, ensure model exists at " f"{dk.data_path} "
|
||||
)
|
||||
|
||||
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
|
||||
if self.config["freqai"]["feature_parameters"].get("principal_component_analysis", False):
|
||||
dk.pca = cloudpickle.load(
|
||||
open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "rb")
|
||||
)
|
||||
@@ -616,9 +616,9 @@ class FreqaiDataDrawer:
|
||||
pairs = self.freqai_info["feature_parameters"].get(
|
||||
"include_corr_pairlist", []
|
||||
)
|
||||
|
||||
for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
|
||||
base_dataframes[tf] = dk.slice_dataframe(timerange, historic_data[pair][tf])
|
||||
base_dataframes[tf] = dk.slice_dataframe(
|
||||
timerange, historic_data[pair][tf]).reset_index(drop=True)
|
||||
if pairs:
|
||||
for p in pairs:
|
||||
if pair in p:
|
||||
@@ -627,7 +627,7 @@ class FreqaiDataDrawer:
|
||||
corr_dataframes[p] = {}
|
||||
corr_dataframes[p][tf] = dk.slice_dataframe(
|
||||
timerange, historic_data[p][tf]
|
||||
)
|
||||
).reset_index(drop=True)
|
||||
|
||||
return corr_dataframes, base_dataframes
|
||||
|
||||
|
@@ -99,6 +99,7 @@ class FreqaiDataKitchen:
|
||||
self.train_dates: DataFrame = pd.DataFrame()
|
||||
self.unique_classes: Dict[str, list] = {}
|
||||
self.unique_class_list: list = []
|
||||
self.spice_dataframe: DataFrame = None
|
||||
|
||||
def set_paths(
|
||||
self,
|
||||
@@ -1259,3 +1260,11 @@ class FreqaiDataKitchen:
|
||||
f"Could not find backtesting prediction file at {path_to_predictionfile}"
|
||||
)
|
||||
return file_exists
|
||||
|
||||
def spice_extractor(self, indicator: str, dataframe: DataFrame) -> npt.NDArray:
|
||||
if indicator in dataframe.columns:
|
||||
return np.array(dataframe[indicator])
|
||||
else:
|
||||
logger.warning(f'User asked spice_rack for {indicator}, '
|
||||
f'but it is not available. Returning 0s')
|
||||
return np.zeros(len(dataframe.index))
|
||||
|
@@ -93,7 +93,7 @@ class IFreqaiModel(ABC):
|
||||
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
|
||||
self.continual_learning = self.freqai_info.get('continual_learning', False)
|
||||
self.plot_features = self.ft_params.get("plot_feature_importances", 0)
|
||||
|
||||
self.spice_rack_open: bool = False
|
||||
self._threads: List[threading.Thread] = []
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
@@ -142,7 +142,7 @@ class IFreqaiModel(ABC):
|
||||
dk = self.start_backtesting(dataframe, metadata, self.dk)
|
||||
|
||||
dataframe = dk.remove_features_from_df(dk.return_dataframe)
|
||||
self.clean_up()
|
||||
# self.clean_up()
|
||||
if self.live:
|
||||
self.inference_timer('stop')
|
||||
return dataframe
|
||||
@@ -211,7 +211,8 @@ class IFreqaiModel(ABC):
|
||||
new_trained_timerange, pair, strategy, dk, data_load_timerange
|
||||
)
|
||||
except Exception as msg:
|
||||
logger.warning(f'Training {pair} raised exception {msg}, skipping.')
|
||||
logger.warning(f"Training {pair} raised exception {msg.__class__.__name__}. "
|
||||
f"Message: {msg}, skipping.")
|
||||
|
||||
self.train_timer('stop')
|
||||
|
||||
@@ -731,6 +732,18 @@ class IFreqaiModel(ABC):
|
||||
f'Best approximation queue: {best_queue}')
|
||||
return best_queue
|
||||
|
||||
def spice_rack(self, indicator: str, dataframe: DataFrame,
|
||||
metadata: dict, strategy: IStrategy) -> NDArray:
|
||||
if not self.spice_rack_open:
|
||||
dataframe = self.start(dataframe, metadata, strategy)
|
||||
self.dk.spice_dataframe = dataframe
|
||||
self.spice_rack_open = True
|
||||
return self.dk.spice_extractor(indicator, dataframe)
|
||||
else:
|
||||
return self.dk.spice_extractor(indicator, self.dk.spice_dataframe)
|
||||
|
||||
def close_spice_rack(self):
|
||||
self.spice_rack_open = False
|
||||
# Following methods which are overridden by user made prediction models.
|
||||
# See freqai/prediction_models/CatboostPredictionModel.py for an example.
|
||||
|
||||
|
37
freqtrade/freqai/spice_rack/lightgbm_config.json
Normal file
37
freqtrade/freqai/spice_rack/lightgbm_config.json
Normal file
@@ -0,0 +1,37 @@
|
||||
{
|
||||
|
||||
"freqai": {
|
||||
"enabled": true,
|
||||
"purge_old_models": true,
|
||||
"train_period_days": 4,
|
||||
"backtest_period_days": 1,
|
||||
"identifier": "spicy-id",
|
||||
"feature_parameters": {
|
||||
"include_timeframes": [
|
||||
"30m",
|
||||
"1h",
|
||||
"4h"
|
||||
],
|
||||
"include_corr_pairlist": [
|
||||
"BTC/USD",
|
||||
"ETH/USD"
|
||||
],
|
||||
"label_period_candles": 20,
|
||||
"include_shifted_candles": 2,
|
||||
"DI_threshold": 0.9,
|
||||
"weight_factor": 0.9,
|
||||
"principal_component_analysis": true,
|
||||
"indicator_periods_candles": [
|
||||
10,
|
||||
20
|
||||
]
|
||||
},
|
||||
"data_split_parameters": {
|
||||
"test_size": 0,
|
||||
"random_state": 1
|
||||
},
|
||||
"model_training_parameters": {
|
||||
"n_estimators": 800
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,19 +1,24 @@
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import numpy as np
|
||||
# for spice rack
|
||||
import pandas as pd
|
||||
import talib.abstract as ta
|
||||
from scipy.signal import argrelextrema
|
||||
from technical import qtpylib
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import Config
|
||||
from freqtrade.data.dataprovider import DataProvider
|
||||
from freqtrade.data.history.history_utils import refresh_backtest_ohlcv_data
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.exchange import timeframe_to_seconds
|
||||
from freqtrade.exchange import Exchange, timeframe_to_seconds
|
||||
from freqtrade.exchange.exchange import market_is_active
|
||||
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
|
||||
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist
|
||||
from freqtrade.strategy import merge_informative_pair
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -89,6 +94,136 @@ def get_required_data_timerange(config: Config) -> TimeRange:
|
||||
return data_load_timerange
|
||||
|
||||
|
||||
def auto_populate_any_indicators(
|
||||
self, pair, df, tf, informative=None, set_generalized_indicators=False
|
||||
):
|
||||
"""
|
||||
This is a premade `populate_any_indicators()` function which is set in
|
||||
the user strategy is they enable `freqai_spice_rack: true` in their
|
||||
configuration file.
|
||||
"""
|
||||
|
||||
coin = pair.split('/')[0]
|
||||
|
||||
if informative is None:
|
||||
informative = self.dp.get_pair_dataframe(pair, tf)
|
||||
|
||||
# first loop is automatically duplicating indicators for time periods
|
||||
for t in self.freqai_info["feature_parameters"]["indicator_periods_candles"]:
|
||||
|
||||
t = int(t)
|
||||
informative[f"%-{coin}rsi-period_{t}"] = ta.RSI(informative, timeperiod=t)
|
||||
informative[f"%-{coin}mfi-period_{t}"] = ta.MFI(informative, timeperiod=t)
|
||||
informative[f"%-{coin}adx-period_{t}"] = ta.ADX(informative, timeperiod=t)
|
||||
informative[f"%-{coin}sma-period_{t}"] = ta.SMA(informative, timeperiod=t)
|
||||
informative[f"%-{coin}ema-period_{t}"] = ta.EMA(informative, timeperiod=t)
|
||||
|
||||
bollinger = qtpylib.bollinger_bands(
|
||||
qtpylib.typical_price(informative), window=t, stds=2.2
|
||||
)
|
||||
informative[f"{coin}bb_lowerband-period_{t}"] = bollinger["lower"]
|
||||
informative[f"{coin}bb_middleband-period_{t}"] = bollinger["mid"]
|
||||
informative[f"{coin}bb_upperband-period_{t}"] = bollinger["upper"]
|
||||
|
||||
informative[f"%-{coin}bb_width-period_{t}"] = (
|
||||
informative[f"{coin}bb_upperband-period_{t}"]
|
||||
- informative[f"{coin}bb_lowerband-period_{t}"]
|
||||
) / informative[f"{coin}bb_middleband-period_{t}"]
|
||||
informative[f"%-{coin}close-bb_lower-period_{t}"] = (
|
||||
informative["close"] / informative[f"{coin}bb_lowerband-period_{t}"]
|
||||
)
|
||||
|
||||
informative[f"%-{coin}roc-period_{t}"] = ta.ROC(informative, timeperiod=t)
|
||||
|
||||
informative[f"%-{coin}relative_volume-period_{t}"] = (
|
||||
informative["volume"] / informative["volume"].rolling(t).mean()
|
||||
)
|
||||
|
||||
informative[f"%-{coin}pct-change"] = informative["close"].pct_change()
|
||||
informative[f"%-{coin}raw_volume"] = informative["volume"]
|
||||
informative[f"%-{coin}raw_price"] = informative["close"]
|
||||
|
||||
indicators = [col for col in informative if col.startswith("%")]
|
||||
# This loop duplicates and shifts all indicators to add a sense of recency to data
|
||||
for n in range(self.freqai_info["feature_parameters"]["include_shifted_candles"] + 1):
|
||||
if n == 0:
|
||||
continue
|
||||
informative_shift = informative[indicators].shift(n)
|
||||
informative_shift = informative_shift.add_suffix("_shift-" + str(n))
|
||||
informative = pd.concat((informative, informative_shift), axis=1)
|
||||
|
||||
df = merge_informative_pair(df, informative, self.config["timeframe"], tf, ffill=True)
|
||||
skip_columns = [
|
||||
(s + "_" + tf) for s in ["date", "open", "high", "low", "close", "volume"]
|
||||
]
|
||||
df = df.drop(columns=skip_columns)
|
||||
if set_generalized_indicators:
|
||||
df["%-day_of_week"] = (df["date"].dt.dayofweek + 1) / 7
|
||||
df["%-hour_of_day"] = (df["date"].dt.hour + 1) / 25
|
||||
df["&s-extrema"] = 0
|
||||
min_peaks = argrelextrema(df["close"].values, np.less, order=80)
|
||||
max_peaks = argrelextrema(df["close"].values, np.greater, order=80)
|
||||
for mp in min_peaks[0]:
|
||||
df.at[mp, "&s-extrema"] = -1
|
||||
for mp in max_peaks[0]:
|
||||
df.at[mp, "&s-extrema"] = 1
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def setup_freqai_spice_rack(config: dict, exchange: Optional[Exchange]) -> Dict[str, Any]:
|
||||
import difflib
|
||||
import json
|
||||
from pathlib import Path
|
||||
auto_config = config.get('freqai_config', 'lightgbm_config.json')
|
||||
with open(Path(__file__).parent / Path('spice_rack') / auto_config) as json_file:
|
||||
freqai_config = json.load(json_file)
|
||||
config['freqai'] = freqai_config['freqai']
|
||||
config['freqai']['identifier'] = config['freqai_identifier']
|
||||
corr_pairs = config['freqai']['feature_parameters']['include_corr_pairlist']
|
||||
timeframes = config['freqai']['feature_parameters']['include_timeframes']
|
||||
new_corr_pairs = []
|
||||
new_tfs = []
|
||||
|
||||
if not exchange:
|
||||
logger.warning('No dataprovider available.')
|
||||
config['freqai']['enabled'] = False
|
||||
return config
|
||||
# find the closest pairs to what the default config wants
|
||||
for pair in corr_pairs:
|
||||
closest_pair = difflib.get_close_matches(
|
||||
pair,
|
||||
exchange.markets
|
||||
)
|
||||
if not closest_pair:
|
||||
logger.warning(f'Could not find {pair} in markets, removing from '
|
||||
f'corr_pairlist.')
|
||||
else:
|
||||
closest_pair = closest_pair[0]
|
||||
|
||||
new_corr_pairs.append(closest_pair)
|
||||
logger.info(f'Spice rack will use {closest_pair} as informative in FreqAI model.')
|
||||
|
||||
# find the closest matching timeframes to what the default config wants
|
||||
if timeframe_to_seconds(config['timeframe']) > timeframe_to_seconds('15m'):
|
||||
logger.warning('Default spice rack is designed for lower base timeframes (e.g. > '
|
||||
f'15m). But user passed {config["timeframe"]}.')
|
||||
new_tfs.append(config['timeframe'])
|
||||
|
||||
list_tfs = [timeframe_to_seconds(tf) for tf
|
||||
in exchange.timeframes]
|
||||
for tf in timeframes:
|
||||
tf_secs = timeframe_to_seconds(tf)
|
||||
closest_index = min(range(len(list_tfs)), key=lambda i: abs(list_tfs[i] - tf_secs))
|
||||
closest_tf = exchange.timeframes[closest_index]
|
||||
logger.info(f'Spice rack will use {closest_tf} as informative tf in FreqAI model.')
|
||||
new_tfs.append(closest_tf)
|
||||
|
||||
config['freqai']['feature_parameters'].update({'include_timeframes': new_tfs})
|
||||
config['freqai']['feature_parameters'].update({'include_corr_pairlist': new_corr_pairs})
|
||||
config.update({"freqaimodel": 'LightGBMRegressor'})
|
||||
return config
|
||||
|
||||
# Keep below for when we wish to download heterogeneously lengthed data for FreqAI.
|
||||
# def download_all_data_for_training(dp: DataProvider, config: Config) -> None:
|
||||
# """
|
||||
|
@@ -89,6 +89,10 @@ class Backtesting:
|
||||
self._exchange_name, self.config, load_leverage_tiers=True)
|
||||
self.dataprovider = DataProvider(self.config, self.exchange)
|
||||
|
||||
if config.get('freqai_spice_rack', False):
|
||||
from freqtrade.freqai.utils import setup_freqai_spice_rack
|
||||
self.config = setup_freqai_spice_rack(self.config, self.exchange)
|
||||
|
||||
if self.config.get('strategy_list'):
|
||||
if self.config.get('freqai', {}).get('enabled', False):
|
||||
logger.warning("Using --strategy-list with FreqAI REQUIRES all strategies "
|
||||
|
@@ -3,8 +3,8 @@ Module that define classes to convert Crypto-currency to FIAT
|
||||
e.g BTC to USD
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Dict, List
|
||||
|
||||
from cachetools import TTLCache
|
||||
@@ -46,7 +46,9 @@ class CryptoToFiatConverter(LoggingMixin):
|
||||
if CryptoToFiatConverter.__instance is None:
|
||||
CryptoToFiatConverter.__instance = object.__new__(cls)
|
||||
try:
|
||||
CryptoToFiatConverter._coingekko = CoinGeckoAPI()
|
||||
# Limit retires to 1 (0 and 1)
|
||||
# otherwise we risk bot impact if coingecko is down.
|
||||
CryptoToFiatConverter._coingekko = CoinGeckoAPI(retries=1)
|
||||
except BaseException:
|
||||
CryptoToFiatConverter._coingekko = None
|
||||
return CryptoToFiatConverter.__instance
|
||||
@@ -67,7 +69,7 @@ class CryptoToFiatConverter(LoggingMixin):
|
||||
logger.warning(
|
||||
"Too many requests for CoinGecko API, backing off and trying again later.")
|
||||
# Set backoff timestamp to 60 seconds in the future
|
||||
self._backoff = datetime.datetime.now().timestamp() + 60
|
||||
self._backoff = datetime.now().timestamp() + 60
|
||||
return
|
||||
# If the request is not a 429 error we want to raise the normal error
|
||||
logger.error(
|
||||
@@ -81,7 +83,7 @@ class CryptoToFiatConverter(LoggingMixin):
|
||||
|
||||
def _get_gekko_id(self, crypto_symbol):
|
||||
if not self._coinlistings:
|
||||
if self._backoff <= datetime.datetime.now().timestamp():
|
||||
if self._backoff <= datetime.now().timestamp():
|
||||
self._load_cryptomap()
|
||||
# Still not loaded.
|
||||
if not self._coinlistings:
|
||||
|
@@ -146,12 +146,28 @@ class IStrategy(ABC, HyperStrategyMixin):
|
||||
self._ft_informative.append((informative_data, cls_method))
|
||||
|
||||
def load_freqAI_model(self) -> None:
|
||||
if self.config.get('freqai', {}).get('enabled', False):
|
||||
spice_rack = self.config.get('freqai_spice_rack', False)
|
||||
if self.config.get('freqai', {}).get('enabled', False) or spice_rack:
|
||||
if spice_rack:
|
||||
from freqtrade.freqai.utils import setup_freqai_spice_rack
|
||||
self.config = setup_freqai_spice_rack(self.config, self.dp._exchange)
|
||||
# Import here to avoid importing this if freqAI is disabled
|
||||
from freqtrade.freqai.utils import download_all_data_for_training
|
||||
from freqtrade.resolvers.freqaimodel_resolver import FreqaiModelResolver
|
||||
self.freqai = FreqaiModelResolver.load_freqaimodel(self.config)
|
||||
self.freqai_info = self.config["freqai"]
|
||||
if not self.process_only_new_candles:
|
||||
logger.warning('User set process_only_new_candles to false, '
|
||||
'FreqAI requires true. Changing to true.')
|
||||
self.process_only_new_candles = True
|
||||
|
||||
if spice_rack:
|
||||
import types
|
||||
|
||||
from freqtrade.freqai.utils import auto_populate_any_indicators
|
||||
self.populate_any_indicators = types.MethodType( # type: ignore
|
||||
auto_populate_any_indicators, self)
|
||||
|
||||
self.freqai_info = self.config["freqai"]
|
||||
|
||||
# download the desired data in dry/live
|
||||
if self.config.get('runmode') in (RunMode.DRY_RUN, RunMode.LIVE):
|
||||
@@ -161,6 +177,7 @@ class IStrategy(ABC, HyperStrategyMixin):
|
||||
"already on disk."
|
||||
)
|
||||
download_all_data_for_training(self.dp, self.config)
|
||||
|
||||
else:
|
||||
# Gracious failures if freqAI is disabled but "start" is called.
|
||||
class DummyClass():
|
||||
|
@@ -5,6 +5,7 @@
|
||||
import numpy as np # noqa
|
||||
import pandas as pd # noqa
|
||||
from pandas import DataFrame
|
||||
from typing import Optional, Union
|
||||
|
||||
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter,
|
||||
IStrategy, IntParameter)
|
||||
|
@@ -29,6 +29,7 @@ nav:
|
||||
- Parameter table: freqai-parameter-table.md
|
||||
- Feature engineering: freqai-feature-engineering.md
|
||||
- Running FreqAI: freqai-running.md
|
||||
- Spice Rack: freqai-spice-rack.md
|
||||
- Developer guide: freqai-developers.md
|
||||
- Short / Leverage: leverage.md
|
||||
- Utility Sub-commands: utils.md
|
||||
|
327
scripts/ws_client.py
Normal file
327
scripts/ws_client.py
Normal file
@@ -0,0 +1,327 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple command line client for Testing/debugging
|
||||
a Freqtrade bot's message websocket
|
||||
|
||||
Should not import anything from freqtrade,
|
||||
so it can be used as a standalone script.
|
||||
"""
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import orjson
|
||||
import pandas
|
||||
import rapidjson
|
||||
import websockets
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
|
||||
logger = logging.getLogger("WebSocketClient")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def setup_logging(filename: str):
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(filename),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-c',
|
||||
'--config',
|
||||
help='Specify configuration file (default: %(default)s). ',
|
||||
dest='config',
|
||||
type=str,
|
||||
metavar='PATH',
|
||||
default='config.json'
|
||||
)
|
||||
parser.add_argument(
|
||||
'-l',
|
||||
'--logfile',
|
||||
help='The filename to log to.',
|
||||
dest='logfile',
|
||||
type=str,
|
||||
default='ws_client.log'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
return vars(args)
|
||||
|
||||
|
||||
def load_config(configfile):
|
||||
file = Path(configfile)
|
||||
if file.is_file():
|
||||
with file.open("r") as f:
|
||||
config = rapidjson.load(f, parse_mode=rapidjson.PM_COMMENTS |
|
||||
rapidjson.PM_TRAILING_COMMAS)
|
||||
return config
|
||||
else:
|
||||
logger.warning(f"Could not load config file {file}.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def readable_timedelta(delta):
|
||||
"""
|
||||
Convert a dateutil.relativedelta to a readable format
|
||||
|
||||
:param delta: A dateutil.relativedelta
|
||||
:returns: The readable time difference string
|
||||
"""
|
||||
attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds', 'microseconds']
|
||||
return ", ".join([
|
||||
'%d %s' % (getattr(delta, attr), attr if getattr(delta, attr) > 1 else attr[:-1])
|
||||
for attr in attrs if getattr(delta, attr)
|
||||
])
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def json_serialize(message):
|
||||
"""
|
||||
Serialize a message to JSON using orjson
|
||||
:param message: The message to serialize
|
||||
"""
|
||||
return str(orjson.dumps(message), "utf-8")
|
||||
|
||||
|
||||
def json_deserialize(message):
|
||||
"""
|
||||
Deserialize JSON to a dict
|
||||
:param message: The message to deserialize
|
||||
"""
|
||||
def json_to_dataframe(data: str) -> pandas.DataFrame:
|
||||
dataframe = pandas.read_json(data, orient='split')
|
||||
if 'date' in dataframe.columns:
|
||||
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True)
|
||||
|
||||
return dataframe
|
||||
|
||||
def _json_object_hook(z):
|
||||
if z.get('__type__') == 'dataframe':
|
||||
return json_to_dataframe(z.get('__value__'))
|
||||
return z
|
||||
|
||||
return rapidjson.loads(message, object_hook=_json_object_hook)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class ClientProtocol:
|
||||
logger = logging.getLogger("WebSocketClient.Protocol")
|
||||
_MESSAGE_COUNT = 0
|
||||
_LAST_RECEIVED_AT = 0 # The epoch we received a message most recently
|
||||
|
||||
async def on_connect(self, websocket):
|
||||
# On connection we have to send our initial requests
|
||||
initial_requests = [
|
||||
{
|
||||
"type": "subscribe", # The subscribe request should always be first
|
||||
"data": ["analyzed_df", "whitelist"] # The message types we want
|
||||
},
|
||||
{
|
||||
"type": "whitelist",
|
||||
"data": None,
|
||||
},
|
||||
{
|
||||
"type": "analyzed_df",
|
||||
"data": {"limit": 1500}
|
||||
}
|
||||
]
|
||||
|
||||
for request in initial_requests:
|
||||
await websocket.send(json_serialize(request))
|
||||
|
||||
async def on_message(self, websocket, name, message):
|
||||
deserialized = json_deserialize(message)
|
||||
|
||||
message_size = sys.getsizeof(message)
|
||||
message_type = deserialized.get('type')
|
||||
message_data = deserialized.get('data')
|
||||
|
||||
self.logger.info(
|
||||
f"Received message of type {message_type} [{message_size} bytes] @ [{name}]"
|
||||
)
|
||||
|
||||
time_difference = self._calculate_time_difference()
|
||||
|
||||
if self._MESSAGE_COUNT > 0:
|
||||
self.logger.info(f"Time since last message: {time_difference}")
|
||||
|
||||
message_handler = getattr(self, f"_handle_{message_type}", None) or self._handle_default
|
||||
await message_handler(name, message_type, message_data)
|
||||
|
||||
self._MESSAGE_COUNT += 1
|
||||
self.logger.info(f"[{self._MESSAGE_COUNT}] total messages..")
|
||||
self.logger.info("-" * 80)
|
||||
|
||||
def _calculate_time_difference(self):
|
||||
old_last_received_at = self._LAST_RECEIVED_AT
|
||||
self._LAST_RECEIVED_AT = time.time() * 1000
|
||||
time_delta = relativedelta(microseconds=(self._LAST_RECEIVED_AT - old_last_received_at))
|
||||
|
||||
return readable_timedelta(time_delta)
|
||||
|
||||
async def _handle_whitelist(self, name, type, data):
|
||||
self.logger.info(data)
|
||||
|
||||
async def _handle_analyzed_df(self, name, type, data):
|
||||
key, la, df = data['key'], data['la'], data['df']
|
||||
|
||||
if not df.empty:
|
||||
columns = ", ".join([str(column) for column in df.columns])
|
||||
|
||||
self.logger.info(key)
|
||||
self.logger.info(f"Last analyzed datetime: {la}")
|
||||
self.logger.info(f"Latest candle datetime: {df.iloc[-1]['date']}")
|
||||
self.logger.info(f"DataFrame length: {len(df)}")
|
||||
self.logger.info(f"DataFrame columns: {columns}")
|
||||
else:
|
||||
self.logger.info("Empty DataFrame")
|
||||
|
||||
async def _handle_default(self, name, type, data):
|
||||
self.logger.info("Unkown message of type {type} received...")
|
||||
self.logger.info(data)
|
||||
|
||||
|
||||
async def create_client(
|
||||
host,
|
||||
port,
|
||||
token,
|
||||
name='default',
|
||||
protocol=ClientProtocol(),
|
||||
sleep_time=10,
|
||||
ping_timeout=10,
|
||||
wait_timeout=30,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
Create a websocket client and listen for messages
|
||||
:param host: The host
|
||||
:param port: The port
|
||||
:param token: The websocket auth token
|
||||
:param name: The name of the producer
|
||||
:param **kwargs: Any extra kwargs passed to websockets.connect
|
||||
"""
|
||||
|
||||
while 1:
|
||||
try:
|
||||
websocket_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}"
|
||||
logger.info(f"Attempting to connect to {name} @ {host}:{port}")
|
||||
|
||||
async with websockets.connect(websocket_url, **kwargs) as ws:
|
||||
logger.info("Connection successful...")
|
||||
await protocol.on_connect(ws)
|
||||
|
||||
# Now listen for messages
|
||||
while 1:
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
ws.recv(),
|
||||
timeout=wait_timeout
|
||||
)
|
||||
|
||||
await protocol.on_message(ws, name, message)
|
||||
|
||||
except (
|
||||
asyncio.TimeoutError,
|
||||
websockets.exceptions.ConnectionClosed
|
||||
):
|
||||
# Try pinging
|
||||
try:
|
||||
pong = ws.ping()
|
||||
await asyncio.wait_for(
|
||||
pong,
|
||||
timeout=ping_timeout
|
||||
)
|
||||
logger.info("Connection still alive...")
|
||||
|
||||
continue
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Ping timed out, retrying in {sleep_time}s")
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
break
|
||||
|
||||
except (
|
||||
socket.gaierror,
|
||||
ConnectionRefusedError,
|
||||
websockets.exceptions.InvalidStatusCode,
|
||||
websockets.exceptions.InvalidMessage
|
||||
) as e:
|
||||
logger.error(f"Connection Refused - {e} retrying in {sleep_time}s")
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
continue
|
||||
|
||||
except (
|
||||
websockets.exceptions.ConnectionClosedError,
|
||||
websockets.exceptions.ConnectionClosedOK
|
||||
):
|
||||
# Just keep trying to connect again indefinitely
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
# An unforseen error has occurred, log and try reconnecting again
|
||||
logger.error("Unexpected error has occurred:")
|
||||
logger.exception(e)
|
||||
|
||||
await asyncio.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _main(args):
|
||||
setup_logging(args['logfile'])
|
||||
config = load_config(args['config'])
|
||||
|
||||
emc_config = config.get('external_message_consumer', {})
|
||||
|
||||
producers = emc_config.get('producers', [])
|
||||
producer = producers[0]
|
||||
|
||||
wait_timeout = emc_config.get('wait_timeout', 300)
|
||||
ping_timeout = emc_config.get('ping_timeout', 10)
|
||||
sleep_time = emc_config.get('sleep_time', 10)
|
||||
message_size_limit = (emc_config.get('message_size_limit', 8) << 20)
|
||||
|
||||
await create_client(
|
||||
producer['host'],
|
||||
producer['port'],
|
||||
producer['ws_token'],
|
||||
producer['name'],
|
||||
sleep_time=sleep_time,
|
||||
ping_timeout=ping_timeout,
|
||||
wait_timeout=wait_timeout,
|
||||
max_size=message_size_limit
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
try:
|
||||
asyncio.run(_main(args))
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Exiting...")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@@ -10,6 +10,7 @@ from unittest.mock import MagicMock, Mock, PropertyMock
|
||||
|
||||
import arrow
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pytest
|
||||
from telegram import Chat, Message, Update
|
||||
|
||||
@@ -19,6 +20,7 @@ from freqtrade.data.converter import ohlcv_to_dataframe
|
||||
from freqtrade.edge import PairInfo
|
||||
from freqtrade.enums import CandleType, MarginMode, RunMode, SignalDirection, TradingMode
|
||||
from freqtrade.exchange import Exchange
|
||||
from freqtrade.exchange.exchange import timeframe_to_minutes
|
||||
from freqtrade.freqtradebot import FreqtradeBot
|
||||
from freqtrade.persistence import LocalTrade, Order, Trade, init_db
|
||||
from freqtrade.resolvers import ExchangeResolver
|
||||
@@ -82,6 +84,33 @@ def get_args(args):
|
||||
return Arguments(args).get_parsed_arg()
|
||||
|
||||
|
||||
def generate_test_data(timeframe: str, size: int, start: str = '2020-07-05'):
|
||||
np.random.seed(42)
|
||||
tf_mins = timeframe_to_minutes(timeframe)
|
||||
|
||||
base = np.random.normal(20, 2, size=size)
|
||||
|
||||
date = pd.date_range(start, periods=size, freq=f'{tf_mins}min', tz='UTC')
|
||||
df = pd.DataFrame({
|
||||
'date': date,
|
||||
'open': base,
|
||||
'high': base + np.random.normal(2, 1, size=size),
|
||||
'low': base - np.random.normal(2, 1, size=size),
|
||||
'close': base + np.random.normal(0, 1, size=size),
|
||||
'volume': np.random.normal(200, size=size)
|
||||
}
|
||||
)
|
||||
df = df.dropna()
|
||||
return df
|
||||
|
||||
|
||||
def generate_test_data_raw(timeframe: str, size: int, start: str = '2020-07-05'):
|
||||
""" Generates data in the ohlcv format used by ccxt """
|
||||
df = generate_test_data(timeframe, size, start)
|
||||
df['date'] = df.loc[:, 'date'].view(np.int64) // 1000 // 1000
|
||||
return list(list(x) for x in zip(*(df[x].values.tolist() for x in df.columns)))
|
||||
|
||||
|
||||
# Source: https://stackoverflow.com/questions/29881236/how-to-mock-asyncio-coroutines
|
||||
# TODO: This should be replaced with AsyncMock once support for python 3.7 is dropped.
|
||||
def get_mock_coro(return_value=None, side_effect=None):
|
||||
|
@@ -22,7 +22,8 @@ from freqtrade.exchange.common import (API_FETCH_ORDER_RETRY_COUNT, API_RETRY_CO
|
||||
calculate_backoff, remove_credentials)
|
||||
from freqtrade.exchange.exchange import amount_to_contract_precision
|
||||
from freqtrade.resolvers.exchange_resolver import ExchangeResolver
|
||||
from tests.conftest import get_mock_coro, get_patched_exchange, log_has, log_has_re, num_log_has_re
|
||||
from tests.conftest import (generate_test_data_raw, get_mock_coro, get_patched_exchange, log_has,
|
||||
log_has_re, num_log_has_re)
|
||||
|
||||
|
||||
# Make sure to always keep one exchange here which is NOT subclassed!!
|
||||
@@ -2083,7 +2084,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
|
||||
def test_refresh_latest_ohlcv(mocker, default_conf, caplog, candle_type) -> None:
|
||||
ohlcv = [
|
||||
[
|
||||
(arrow.utcnow().int_timestamp - 1) * 1000, # unix timestamp ms
|
||||
(arrow.utcnow().shift(minutes=-5).int_timestamp) * 1000, # unix timestamp ms
|
||||
1, # open
|
||||
2, # high
|
||||
3, # low
|
||||
@@ -2140,10 +2141,22 @@ def test_refresh_latest_ohlcv(mocker, default_conf, caplog, candle_type) -> None
|
||||
assert len(res) == len(pairs)
|
||||
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 0
|
||||
exchange.required_candle_call_count = 1
|
||||
assert log_has(f"Using cached candle (OHLCV) data for {pairs[0][0]}, "
|
||||
f"{pairs[0][1]}, {candle_type} ...",
|
||||
caplog)
|
||||
caplog.clear()
|
||||
# Reset refresh times - must do 2 call per pair as cache is expired
|
||||
exchange._pairs_last_refresh_time = {}
|
||||
res = exchange.refresh_latest_ohlcv(
|
||||
[('IOTA/ETH', '5m', candle_type), ('XRP/ETH', '5m', candle_type)])
|
||||
assert len(res) == len(pairs)
|
||||
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 4
|
||||
|
||||
# cache - but disabled caching
|
||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||
exchange.required_candle_call_count = 1
|
||||
|
||||
pairlist = [
|
||||
('IOTA/ETH', '5m', candle_type),
|
||||
('XRP/ETH', '5m', candle_type),
|
||||
@@ -2159,6 +2172,7 @@ def test_refresh_latest_ohlcv(mocker, default_conf, caplog, candle_type) -> None
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 3
|
||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||
caplog.clear()
|
||||
|
||||
# Call with invalid timeframe
|
||||
res = exchange.refresh_latest_ohlcv([('IOTA/ETH', '3m', candle_type)], cache=False)
|
||||
if candle_type != CandleType.MARK:
|
||||
@@ -2169,6 +2183,91 @@ def test_refresh_latest_ohlcv(mocker, default_conf, caplog, candle_type) -> None
|
||||
assert len(res) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize('candle_type', [CandleType.FUTURES, CandleType.MARK, CandleType.SPOT])
|
||||
def test_refresh_latest_ohlcv_cache(mocker, default_conf, candle_type, time_machine) -> None:
|
||||
start = datetime(2021, 8, 1, 0, 0, 0, 0, tzinfo=timezone.utc)
|
||||
ohlcv = generate_test_data_raw('1h', 100, start.strftime('%Y-%m-%d'))
|
||||
time_machine.move_to(start + timedelta(hours=99, minutes=30))
|
||||
|
||||
exchange = get_patched_exchange(mocker, default_conf)
|
||||
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
||||
pair1 = ('IOTA/ETH', '1h', candle_type)
|
||||
pair2 = ('XRP/ETH', '1h', candle_type)
|
||||
pairs = [pair1, pair2]
|
||||
|
||||
# No caching
|
||||
assert not exchange._klines
|
||||
res = exchange.refresh_latest_ohlcv(pairs, cache=False)
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||
assert len(res) == 2
|
||||
assert len(res[pair1]) == 99
|
||||
assert len(res[pair2]) == 99
|
||||
assert not exchange._klines
|
||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||
|
||||
# With caching
|
||||
res = exchange.refresh_latest_ohlcv(pairs)
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||
assert len(res) == 2
|
||||
assert len(res[pair1]) == 99
|
||||
assert len(res[pair2]) == 99
|
||||
assert exchange._klines
|
||||
assert exchange._pairs_last_refresh_time[pair1] == ohlcv[-1][0] // 1000
|
||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||
|
||||
# Returned from cache
|
||||
res = exchange.refresh_latest_ohlcv(pairs)
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 0
|
||||
assert len(res) == 2
|
||||
assert len(res[pair1]) == 99
|
||||
assert len(res[pair2]) == 99
|
||||
assert exchange._pairs_last_refresh_time[pair1] == ohlcv[-1][0] // 1000
|
||||
|
||||
# Move time 1 candle further but result didn't change yet
|
||||
time_machine.move_to(start + timedelta(hours=101))
|
||||
res = exchange.refresh_latest_ohlcv(pairs)
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||
assert len(res) == 2
|
||||
assert len(res[pair1]) == 99
|
||||
assert len(res[pair2]) == 99
|
||||
assert exchange._pairs_last_refresh_time[pair1] == ohlcv[-1][0] // 1000
|
||||
refresh_pior = exchange._pairs_last_refresh_time[pair1]
|
||||
|
||||
# New candle on exchange - only return 50 candles (but one candle further)
|
||||
new_startdate = (start + timedelta(hours=51)).strftime('%Y-%m-%d %H:%M')
|
||||
ohlcv = generate_test_data_raw('1h', 50, new_startdate)
|
||||
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
||||
res = exchange.refresh_latest_ohlcv(pairs)
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||
assert len(res) == 2
|
||||
assert len(res[pair1]) == 100
|
||||
assert len(res[pair2]) == 100
|
||||
assert refresh_pior != exchange._pairs_last_refresh_time[pair1]
|
||||
|
||||
assert exchange._pairs_last_refresh_time[pair1] == ohlcv[-1][0] // 1000
|
||||
assert exchange._pairs_last_refresh_time[pair2] == ohlcv[-1][0] // 1000
|
||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||
|
||||
# Retry same call - no action.
|
||||
res = exchange.refresh_latest_ohlcv(pairs)
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 0
|
||||
assert len(res) == 2
|
||||
assert len(res[pair1]) == 100
|
||||
assert len(res[pair2]) == 100
|
||||
|
||||
# Move to distant future (so a 1 call would cause a hole in the data)
|
||||
time_machine.move_to(start + timedelta(hours=2000))
|
||||
ohlcv = generate_test_data_raw('1h', 100, start + timedelta(hours=1900))
|
||||
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
||||
res = exchange.refresh_latest_ohlcv(pairs)
|
||||
|
||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||
assert len(res) == 2
|
||||
# Cache eviction - new data.
|
||||
assert len(res[pair1]) == 99
|
||||
assert len(res[pair2]) == 99
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("exchange_name", EXCHANGES)
|
||||
async def test__async_get_candle_history(default_conf, mocker, caplog, exchange_name):
|
||||
|
@@ -158,3 +158,28 @@ def test_make_train_test_datasets(mocker, freqai_conf):
|
||||
assert data_dictionary
|
||||
assert len(data_dictionary) == 7
|
||||
assert len(data_dictionary['train_features'].index) == 1916
|
||||
|
||||
|
||||
@pytest.mark.parametrize('indicator', [
|
||||
'%-ADArsi-period_10_5m',
|
||||
'doesnt_exist',
|
||||
])
|
||||
def test_spice_extractor(mocker, freqai_conf, indicator, caplog):
|
||||
freqai, unfiltered_dataframe = make_unfiltered_dataframe(mocker, freqai_conf)
|
||||
freqai.dk.find_features(unfiltered_dataframe)
|
||||
|
||||
features_filtered, labels_filtered = freqai.dk.filter_features(
|
||||
unfiltered_dataframe,
|
||||
freqai.dk.training_features_list,
|
||||
freqai.dk.label_list,
|
||||
training_filter=True,
|
||||
)
|
||||
|
||||
vec = freqai.dk.spice_extractor(indicator, features_filtered)
|
||||
if 'doesnt_exist' in indicator:
|
||||
assert log_has_re(
|
||||
"User asked spice_rack for",
|
||||
caplog,
|
||||
)
|
||||
else:
|
||||
assert len(vec) == 2860
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import copy
|
||||
import platform
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
@@ -382,6 +383,31 @@ def test_plot_feature_importance(mocker, freqai_conf):
|
||||
shutil.rmtree(Path(freqai.dk.full_path))
|
||||
|
||||
|
||||
def test_spice_rack(mocker, default_conf, tmpdir, caplog):
|
||||
|
||||
strategy = get_patched_freqai_strategy(mocker, default_conf)
|
||||
exchange = get_patched_exchange(mocker, default_conf)
|
||||
strategy.dp = DataProvider(default_conf, exchange)
|
||||
|
||||
default_conf.update({"freqai_spice_rack": "true"})
|
||||
default_conf.update({"freqai_identifier": "spicy-id"})
|
||||
default_conf["config_files"] = [Path('config_examples', 'config_freqai.example.json')]
|
||||
default_conf["timerange"] = "20180110-20180115"
|
||||
default_conf["datadir"] = Path(default_conf["datadir"])
|
||||
default_conf['exchange'].update({'pair_whitelist':
|
||||
['ADA/BTC', 'DASH/BTC', 'ETH/BTC', 'LTC/BTC']})
|
||||
default_conf["user_data_dir"] = Path(tmpdir)
|
||||
freqai_conf = copy.deepcopy(default_conf)
|
||||
|
||||
strategy.config = freqai_conf
|
||||
strategy.load_freqAI_model()
|
||||
|
||||
assert log_has_re("Spice rack will use LTC/USD", caplog)
|
||||
assert log_has_re("Spice rack will use 15m", caplog)
|
||||
assert 'freqai' in freqai_conf
|
||||
assert strategy.freqai
|
||||
|
||||
|
||||
@pytest.mark.parametrize('timeframes,corr_pairs', [
|
||||
(['5m'], ['ADA/BTC', 'DASH/BTC']),
|
||||
(['5m'], ['ADA/BTC', 'DASH/BTC', 'ETH/USDT']),
|
||||
|
@@ -5,29 +5,8 @@ import pytest
|
||||
from freqtrade.data.dataprovider import DataProvider
|
||||
from freqtrade.enums import CandleType
|
||||
from freqtrade.resolvers.strategy_resolver import StrategyResolver
|
||||
from freqtrade.strategy import (merge_informative_pair, stoploss_from_absolute, stoploss_from_open,
|
||||
timeframe_to_minutes)
|
||||
from tests.conftest import get_patched_exchange
|
||||
|
||||
|
||||
def generate_test_data(timeframe: str, size: int, start: str = '2020-07-05'):
|
||||
np.random.seed(42)
|
||||
tf_mins = timeframe_to_minutes(timeframe)
|
||||
|
||||
base = np.random.normal(20, 2, size=size)
|
||||
|
||||
date = pd.date_range(start, periods=size, freq=f'{tf_mins}min', tz='UTC')
|
||||
df = pd.DataFrame({
|
||||
'date': date,
|
||||
'open': base,
|
||||
'high': base + np.random.normal(2, 1, size=size),
|
||||
'low': base - np.random.normal(2, 1, size=size),
|
||||
'close': base + np.random.normal(0, 1, size=size),
|
||||
'volume': np.random.normal(200, size=size)
|
||||
}
|
||||
)
|
||||
df = df.dropna()
|
||||
return df
|
||||
from freqtrade.strategy import merge_informative_pair, stoploss_from_absolute, stoploss_from_open
|
||||
from tests.conftest import generate_test_data, get_patched_exchange
|
||||
|
||||
|
||||
def test_merge_informative_pair():
|
||||
|
Reference in New Issue
Block a user