Merge pull request #7340 from wizrds/sigint-freqai

Support SIGINT in FreqAI
This commit is contained in:
Robert Caulk 2022-09-04 16:43:36 +02:00 committed by GitHub
commit 956ea43e55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 18 deletions

View File

@ -76,6 +76,8 @@ class FreqaiDataDrawer:
self.full_path / f"follower_dictionary-{self.follower_name}.json"
)
self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
self.historic_predictions_bkp_path = Path(
self.full_path / "historic_predictions.backup.pkl")
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
self.follow_mode = follow_mode
if follow_mode:
@ -118,6 +120,7 @@ class FreqaiDataDrawer:
"""
exists = self.historic_predictions_path.is_file()
if exists:
try:
with open(self.historic_predictions_path, "rb") as fp:
self.historic_predictions = cloudpickle.load(fp)
logger.info(
@ -125,6 +128,13 @@ class FreqaiDataDrawer:
"that statistics may be inaccurate if the bot has been offline for "
"an extended period of time."
)
except EOFError:
logger.warning(
'Historical prediction file was corrupted. Trying to load backup file.')
with open(self.historic_predictions_bkp_path, "rb") as fp:
self.historic_predictions = cloudpickle.load(fp)
logger.warning('FreqAI successfully loaded the backup historical predictions file.')
elif not self.follow_mode:
logger.info("Could not find existing historic_predictions, starting from scratch")
else:
@ -142,6 +152,9 @@ class FreqaiDataDrawer:
with open(self.historic_predictions_path, "wb") as fp:
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
# create a backup
shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
def save_drawer_to_disk(self):
"""
Save data drawer full of all pair model metadata in present model folder.

View File

@ -7,7 +7,7 @@ import time
from abc import ABC, abstractmethod
from pathlib import Path
from threading import Lock
from typing import Any, Dict, Tuple
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
@ -27,13 +27,6 @@ pd.options.mode.chained_assignment = None
logger = logging.getLogger(__name__)
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs).start()
return wrapper
class IFreqaiModel(ABC):
"""
Class containing all tools for training and prediction in the strategy.
@ -94,6 +87,9 @@ class IFreqaiModel(ABC):
self.begin_time_train: float = 0
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
self._threads: List[threading.Thread] = []
self._stop_event = threading.Event()
def assert_config(self, config: Dict[str, Any]) -> None:
if not config.get("freqai", {}):
@ -147,15 +143,34 @@ class IFreqaiModel(ABC):
self.model = None
self.dk = None
@threaded
def start_scanning(self, strategy: IStrategy) -> None:
def shutdown(self):
"""
Cleans up threads on Shutdown, set stop event. Join threads to wait
for current training iteration.
"""
logger.info("Stopping FreqAI")
self._stop_event.set()
logger.info("Waiting on Training iteration")
for _thread in self._threads:
_thread.join()
def start_scanning(self, *args, **kwargs) -> None:
"""
Start `self._start_scanning` in a separate thread
"""
_thread = threading.Thread(target=self._start_scanning, args=args, kwargs=kwargs)
self._threads.append(_thread)
_thread.start()
def _start_scanning(self, strategy: IStrategy) -> None:
"""
Function designed to constantly scan pairs for retraining on a separate thread (intracandle)
to improve model youth. This function is agnostic to data preparation/collection/storage,
it simply trains on what ever data is available in the self.dd.
:param strategy: IStrategy = The user defined strategy class
"""
while 1:
while not self._stop_event.is_set():
time.sleep(1)
for pair in self.config.get("exchange", {}).get("pair_whitelist"):

View File

@ -148,6 +148,8 @@ class FreqtradeBot(LoggingMixin):
self.check_for_open_trades()
self.strategy.ft_bot_cleanup()
self.rpc.cleanup()
Trade.commit()
self.exchange.close()

View File

@ -168,6 +168,10 @@ class IStrategy(ABC, HyperStrategyMixin):
raise OperationalException(
'freqAI is not enabled. '
'Please enable it in your config to use this strategy.')
def shutdown(self, *args, **kwargs):
pass
self.freqai = DummyClass() # type: ignore
def ft_bot_start(self, **kwargs) -> None:
@ -181,6 +185,12 @@ class IStrategy(ABC, HyperStrategyMixin):
self.ft_load_hyper_params(self.config.get('runmode') == RunMode.HYPEROPT)
def ft_bot_cleanup(self) -> None:
"""
Clean up FreqAI and child threads
"""
self.freqai.shutdown()
@abstractmethod
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""