use parquet in favor of pickle to improve performance for historic_prediction saving, loading, and active post processing

This commit is contained in:
robcaulk 2023-03-25 11:38:16 +01:00
parent 4053ee4581
commit 0883198f40

View File

@ -74,8 +74,10 @@ class FreqaiDataDrawer:
self.historic_predictions: Dict[str, DataFrame] = {} self.historic_predictions: Dict[str, DataFrame] = {}
self.full_path = full_path self.full_path = full_path
self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl") self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
self.historic_predictions_bkp_path = Path( self.historic_predictions_path_parquet = Path(
self.full_path / "historic_predictions.backup.pkl") self.full_path / "historic_predictions.parquet")
self.historic_predictions_bkp_path_parquet = Path(
self.full_path / "historic_predictions.backup.parquet")
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json") self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
self.global_metadata_path = Path(self.full_path / "global_metadata.json") self.global_metadata_path = Path(self.full_path / "global_metadata.json")
self.metric_tracker_path = Path(self.full_path / "metric_tracker.json") self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
@ -163,11 +165,12 @@ class FreqaiDataDrawer:
Locate and load a previously saved historic predictions. Locate and load a previously saved historic predictions.
:return: bool - whether or not the drawer was located :return: bool - whether or not the drawer was located
""" """
exists = self.historic_predictions_path.is_file() exists = self.historic_predictions_path_parquet.is_file()
convert = self.historic_predictions_path.is_file()
if exists: if exists:
try: try:
with self.historic_predictions_path.open("rb") as fp: self.historic_predictions = pd.read_parquet(self.historic_predictions_path_parquet)
self.historic_predictions = cloudpickle.load(fp)
logger.info( logger.info(
f"Found existing historic predictions at {self.full_path}, but beware " f"Found existing historic predictions at {self.full_path}, but beware "
"that statistics may be inaccurate if the bot has been offline for " "that statistics may be inaccurate if the bot has been offline for "
@ -176,12 +179,23 @@ class FreqaiDataDrawer:
except EOFError: except EOFError:
logger.warning( logger.warning(
'Historical prediction file was corrupted. Trying to load backup file.') 'Historical prediction file was corrupted. Trying to load backup file.')
with self.historic_predictions_bkp_path.open("rb") as fp: self.historic_predictions = pd.read_parquet(
self.historic_predictions = cloudpickle.load(fp) self.historic_predictions_bkp_path_parquet)
logger.warning('FreqAI successfully loaded the backup historical predictions file.') logger.warning('FreqAI successfully loaded the backup historical predictions file.')
elif not exists and convert:
logger.info("Converting your historic predictions pkl to parquet"
"to improve performance.")
with Path.open(self.historic_predictions_path, "rb") as fp:
self.historic_predictions = cloudpickle.load(fp)
self.historic_predictions.to_parquet(self.historic_predictions_path_parquet)
exists = True
else: else:
logger.info("Could not find existing historic_predictions, starting from scratch") logger.warning(
f"Follower could not find historic predictions at {self.full_path} "
"sending null values back to strategy"
)
return exists return exists
@ -189,11 +203,10 @@ class FreqaiDataDrawer:
""" """
Save historic predictions pickle to disk Save historic predictions pickle to disk
""" """
with self.historic_predictions_path.open("wb") as fp: self.historic_predictions.to_parquet(self.historic_predictions_path_parquet)
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
# create a backup # create a backup
shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path) shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path_parquet)
def save_metric_tracker_to_disk(self): def save_metric_tracker_to_disk(self):
""" """