From c6f045afa96c30bbd612af4b5877aeb41834d8fe Mon Sep 17 00:00:00 2001 From: hippocritical Date: Thu, 29 Dec 2022 22:17:52 +0100 Subject: [PATCH] fixing issues of the maintainer found a bug meaning elts could contain lists of elts (now recurively gone through) Next in line: writing tests based on StrategyUpdater.update_code --- freqtrade/commands/strategy_utils_commands.py | 6 +- freqtrade/strategy/strategy_updater.py | 213 ----------------- freqtrade/strategy/strategyupdater.py | 225 ++++++++++++++++++ 3 files changed, 228 insertions(+), 216 deletions(-) delete mode 100644 freqtrade/strategy/strategy_updater.py create mode 100644 freqtrade/strategy/strategyupdater.py diff --git a/freqtrade/commands/strategy_utils_commands.py b/freqtrade/commands/strategy_utils_commands.py index b9431fecf..13405089f 100644 --- a/freqtrade/commands/strategy_utils_commands.py +++ b/freqtrade/commands/strategy_utils_commands.py @@ -17,7 +17,7 @@ def start_strategy_update(args: Dict[str, Any]) -> None: """ # Import here to avoid loading backtesting module when it's not used - from freqtrade.strategy.strategy_updater import strategy_updater + from freqtrade.strategy.strategyupdater import StrategyUpdater config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE) @@ -33,5 +33,5 @@ def start_strategy_update(args: Dict[str, Any]) -> None: for filtered_strategy_obj in filtered_strategy_objs: # Initialize backtesting object - instance_strategy_updater = strategy_updater() - strategy_updater.start(instance_strategy_updater, filtered_strategy_obj) + instance_strategy_updater = StrategyUpdater() + StrategyUpdater.start(instance_strategy_updater, config, filtered_strategy_obj) diff --git a/freqtrade/strategy/strategy_updater.py b/freqtrade/strategy/strategy_updater.py deleted file mode 100644 index fea8edf31..000000000 --- a/freqtrade/strategy/strategy_updater.py +++ /dev/null @@ -1,213 +0,0 @@ -import ast -import os -import shutil - - -class strategy_updater: - name_mapping = { - 'ticker_interval': 'timeframe', - 'buy': 'enter_long', - 'sell': 'exit_long', - 'buy_tag': 'enter_tag', - 'sell_reason': 'exit_reason', - - 'sell_signal': 'exit_signal', - 'custom_sell': 'custom_exit', - 'force_sell': 'force_exit', - 'emergency_sell': 'emergency_exit', - - # Strategy/config settings: - 'use_sell_signal': 'use_exit_signal', - 'sell_profit_only': 'exit_profit_only', - 'sell_profit_offset': 'exit_profit_offset', - 'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal', - 'forcebuy_enable': 'force_entry_enable', - } - - function_mapping = { - 'populate_buy_trend': 'populate_entry_trend', - 'populate_sell_trend': 'populate_exit_trend', - 'custom_sell': 'custom_exit', - 'check_buy_timeout': 'check_entry_timeout', - 'check_sell_timeout': 'check_exit_timeout', - # '': '', - } - # order_time_in_force, order_types, unfilledtimeout - otif_ot_unfilledtimeout = { - 'buy': 'entry', - 'sell': 'exit', - } - - # create a dictionary that maps the old column names to the new ones - rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'} - - def start(self, strategy_obj: dict) -> None: - """ - Run strategy updater - It updates a strategy to v3 with the help of the ast-module - :return: None - """ - - self.cwd = os.getcwd() - self.strategies_backup_folder = f'{os.getcwd()}user_data/strategies_orig_updater' - source_file = strategy_obj['location'] - - # read the file - with open(source_file, 'r') as f: - old_code = f.read() - if not os.path.exists(self.strategies_backup_folder): - os.makedirs(self.strategies_backup_folder) - - # backup original - # => currently no date after the filename, - # could get overridden pretty fast if this is fired twice! - shutil.copy(source_file, f"{self.strategies_backup_folder}/{strategy_obj['location_rel']}") - - # update the code - new_code = strategy_updater.update_code(self, old_code, - strategy_updater.name_mapping, - strategy_updater.function_mapping, - strategy_updater.rename_dict) - - # write the modified code to the destination folder - with open(source_file, 'w') as f: - f.write(new_code) - print(f"conversion of file {source_file} successful.") - - # define the function to update the code - def update_code(self, code, _name_mapping, _function_mapping, _rename_dict): - # parse the code into an AST - tree = ast.parse(code) - - # use the AST to update the code - updated_code = strategy_updater.modify_ast( - tree, - _name_mapping, - _function_mapping, - _rename_dict) - - # return the modified code without executing it - return updated_code - - # function that uses the ast module to update the code - def modify_ast(node, _name_mapping, _function_mapping, _rename_dict): # noqa - # create a visitor that will update the names and functions - class NameUpdater(ast.NodeTransformer): - def generic_visit(self, node): - # traverse the AST recursively by calling the visitor method for each child node - if hasattr(node, "_fields"): - for field_name, field_value in ast.iter_fields(node): - self.check_fields(field_value) - self.check_strategy_and_config_settings(field_value) - # add this check to handle the case where field_value is a slice - if isinstance(field_value, ast.Slice): - self.visit(field_value) - # add this check to handle the case where field_value is a target - if isinstance(field_value, ast.expr_context): - self.visit(field_value) - - def check_fields(self, field_value): - if isinstance(field_value, list): - for item in field_value: - if isinstance(item, ast.AST): - self.visit(item) - - def check_strategy_and_config_settings(self, field_value): - if (isinstance(field_value, ast.AST) and - hasattr(node, "targets") and - isinstance(node.targets, list)): - for target in node.targets: - if (hasattr(target, "id") and - (target.id == "order_time_in_force" or - target.id == "order_types" or - target.id == "unfilledtimeout") and - hasattr(field_value, "keys") and - isinstance(field_value.keys, list)): - for key in field_value.keys: - self.visit(key) - - def visit_Name(self, node): - # if the name is in the mapping, update it - if node.id in _name_mapping: - node.id = _name_mapping[node.id] - return node - - def visit_Import(self, node): - # do not update the names in import statements - return node - - def visit_ImportFrom(self, node): - # do not update the names in import statements - if hasattr(node, "module"): - if node.module == "freqtrade.strategy.hyper": - node.module = "freqtrade.strategy" - return node - - def visit_FunctionDef(self, node): - # if the function name is in the mapping, update it - if node.name in _function_mapping: - node.name = _function_mapping[node.name] - return self.generic_visit(node) - - def visit_Attribute(self, node): - # if the attribute name is 'nr_of_successful_buys', - # update it to 'nr_of_successful_entries' - if isinstance(node.value, ast.Name) and \ - node.value.id == 'trades' and \ - node.attr == 'nr_of_successful_buys': - node.attr = 'nr_of_successful_entries' - return self.generic_visit(node) - - def visit_ClassDef(self, node): - # check if the class is derived from IStrategy - if any(isinstance(base, ast.Name) and - base.id == 'IStrategy' for base in node.bases): - # check if the INTERFACE_VERSION variable exists - has_interface_version = any( - isinstance(child, ast.Assign) and - isinstance(child.targets[0], ast.Name) and - child.targets[0].id == 'INTERFACE_VERSION' - for child in node.body - ) - - # if the INTERFACE_VERSION variable does not exist, add it as the first child - if not has_interface_version: - node.body.insert(0, ast.parse('INTERFACE_VERSION = 3').body[0]) - # otherwise, update its value to 3 - else: - for child in node.body: - if isinstance(child, ast.Assign) and \ - isinstance(child.targets[0], ast.Name) and \ - child.targets[0].id == 'INTERFACE_VERSION': - child.value = ast.parse('3').body[0].value - return self.generic_visit(node) - - def visit_Subscript(self, node): - if isinstance(node.slice, ast.Constant): - if node.slice.value in strategy_updater.rename_dict: - # Replace the slice attributes with the values from rename_dict - node.slice.value = strategy_updater.rename_dict[node.slice.value] - if hasattr(node.slice, "elts"): - for elt in node.slice.elts: - if isinstance(elt, ast.Constant) and \ - elt.value in strategy_updater.rename_dict: - elt.value = strategy_updater.rename_dict[elt.value] - return node - - def visit_Constant(self, node): - # do not update the names in import statements - if node.value in \ - strategy_updater.otif_ot_unfilledtimeout: - node.value = \ - strategy_updater.otif_ot_unfilledtimeout[node.value] - return node - - # use the visitor to update the names and functions in the AST - NameUpdater().visit(node) - - # first fix the comments so it understands "\n" properly inside multi line comments. - ast.fix_missing_locations(node) - ast.increment_lineno(node, n=1) - - # generate the new code from the updated AST - return ast.unparse(node) diff --git a/freqtrade/strategy/strategyupdater.py b/freqtrade/strategy/strategyupdater.py new file mode 100644 index 000000000..b62bc7822 --- /dev/null +++ b/freqtrade/strategy/strategyupdater.py @@ -0,0 +1,225 @@ +import ast +import os +import shutil +from pathlib import Path + +import astor + + +class StrategyUpdater: + name_mapping = { + 'ticker_interval': 'timeframe', + 'buy': 'enter_long', + 'sell': 'exit_long', + 'buy_tag': 'enter_tag', + 'sell_reason': 'exit_reason', + + 'sell_signal': 'exit_signal', + 'custom_sell': 'custom_exit', + 'force_sell': 'force_exit', + 'emergency_sell': 'emergency_exit', + + # Strategy/config settings: + 'use_sell_signal': 'use_exit_signal', + 'sell_profit_only': 'exit_profit_only', + 'sell_profit_offset': 'exit_profit_offset', + 'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal', + 'forcebuy_enable': 'force_entry_enable', + } + + function_mapping = { + 'populate_buy_trend': 'populate_entry_trend', + 'populate_sell_trend': 'populate_exit_trend', + 'custom_sell': 'custom_exit', + 'check_buy_timeout': 'check_entry_timeout', + 'check_sell_timeout': 'check_exit_timeout', + # '': '', + } + # order_time_in_force, order_types, unfilledtimeout + otif_ot_unfilledtimeout = { + 'buy': 'entry', + 'sell': 'exit', + } + + # create a dictionary that maps the old column names to the new ones + rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'} + + def start(self, config, strategy_obj: dict) -> None: + """ + Run strategy updater + It updates a strategy to v3 with the help of the ast-module + :return: None + """ + + source_file = strategy_obj['location'] + strategies_backup_folder = Path.joinpath(config['user_data_dir'], "strategies_orig_updater") + target_file = Path.joinpath(strategies_backup_folder, strategy_obj['location_rel']) + + # read the file + with open(source_file, 'r') as f: + old_code = f.read() + if not os.path.exists(strategies_backup_folder): + os.makedirs(strategies_backup_folder) + + # backup original + # => currently no date after the filename, + # could get overridden pretty fast if this is fired twice! + # The folder is always the same and the file name too (currently). + shutil.copy(source_file, target_file) + + # update the code + new_code = StrategyUpdater.update_code(self, old_code) + + # write the modified code to the destination folder + with open(source_file, 'w') as f: + f.write(new_code) + print(f"conversion of file {source_file} successful.") + + # define the function to update the code + def update_code(self, code): + # parse the code into an AST + tree = ast.parse(code) + + # use the AST to update the code + updated_code = self.modify_ast( + tree) + + # return the modified code without executing it + return updated_code + + # function that uses the ast module to update the code + def modify_ast(self, tree): # noqa + # use the visitor to update the names and functions in the AST + NameUpdater().visit(tree) + + # first fix the comments, so it understands "\n" properly inside multi line comments. + ast.fix_missing_locations(tree) + ast.increment_lineno(tree, n=1) + + # generate the new code from the updated AST + return astor.to_source(tree) + + +# Here we go through each respective node, slice, elt, key ... to replace outdated entries. +class NameUpdater(ast.NodeTransformer): + def generic_visit(self, node): + # traverse the AST recursively by calling the visitor method for each child node + if hasattr(node, "_fields"): + for field_name, field_value in ast.iter_fields(node): + self.visit(field_value) + self.generic_visit(field_value) + self.check_fields(field_value) + self.check_strategy_and_config_settings(node, field_value) + # add this check to handle the case where field_value is a slice + if isinstance(field_value, ast.Slice): + self.visit(field_value) + # add this check to handle the case where field_value is a target + if isinstance(field_value, ast.expr_context): + self.visit(field_value) + + def check_fields(self, field_value): + if isinstance(field_value, list): + for item in field_value: + if isinstance(item, ast.AST): + self.visit(item) + + def check_strategy_and_config_settings(self, node, field_value): + if (isinstance(field_value, ast.AST) and + hasattr(node, "targets") and + isinstance(node.targets, list)): + for target in node.targets: + if (hasattr(target, "id") and + hasattr(field_value, "keys") and + isinstance(field_value.keys, list)): + if (target.id == "order_time_in_force" or + target.id == "order_types" or + target.id == "unfilledtimeout"): + for key in field_value.keys: + self.visit(key) + # Check if the target is a Subscript object with a "value" attribute + if isinstance(target, ast.Subscript) and hasattr(target.value, "attr"): + if target.value.attr == "loc": + self.visit(target) + + def visit_Name(self, node): + # if the name is in the mapping, update it + if node.id in StrategyUpdater.name_mapping: + node.id = StrategyUpdater.name_mapping[node.id] + return node + + def visit_Import(self, node): + # do not update the names in import statements + return node + + def visit_ImportFrom(self, node): + # do not update the names in import statements + if hasattr(node, "module"): + if node.module == "freqtrade.strategy.hyper": + node.module = "freqtrade.strategy" + return node + + def visit_FunctionDef(self, node): + # if the function name is in the mapping, update it + if node.name in StrategyUpdater.function_mapping: + node.name = StrategyUpdater.function_mapping[node.name] + return self.generic_visit(node) + + def visit_Attribute(self, node): + # if the attribute name is 'nr_of_successful_buys', + # update it to 'nr_of_successful_entries' + if isinstance(node.value, ast.Name) and \ + node.value.id == 'trades' and \ + node.attr == 'nr_of_successful_buys': + node.attr = 'nr_of_successful_entries' + return self.generic_visit(node) + + def visit_ClassDef(self, node): + # check if the class is derived from IStrategy + if any(isinstance(base, ast.Name) and + base.id == 'IStrategy' for base in node.bases): + # check if the INTERFACE_VERSION variable exists + has_interface_version = any( + isinstance(child, ast.Assign) and + isinstance(child.targets[0], ast.Name) and + child.targets[0].id == 'INTERFACE_VERSION' + for child in node.body + ) + + # if the INTERFACE_VERSION variable does not exist, add it as the first child + if not has_interface_version: + node.body.insert(0, ast.parse('INTERFACE_VERSION = 3').body[0]) + # otherwise, update its value to 3 + else: + for child in node.body: + if isinstance(child, ast.Assign) and \ + isinstance(child.targets[0], ast.Name) and \ + child.targets[0].id == 'INTERFACE_VERSION': + child.value = ast.parse('3').body[0].value + return self.generic_visit(node) + + def visit_Subscript(self, node): + if isinstance(node.slice, ast.Constant): + if node.slice.value in StrategyUpdater.rename_dict: + # Replace the slice attributes with the values from rename_dict + node.slice.value = StrategyUpdater.rename_dict[node.slice.value] + if hasattr(node.slice, "elts"): + self.visit_slice_elts(node.slice.elts) + if hasattr(node.slice.value, "elts"): + self.visit_slice_elts(node.slice.value.elts) + return node + + # elts can have elts (technically recursively) + def visit_slice_elts(self, elts): + for elt in elts: + if isinstance(elt, ast.Constant) and elt.value in StrategyUpdater.rename_dict: + elt.value = StrategyUpdater.rename_dict[elt.value] + elif hasattr(elt, "elts"): + self.visit_slice_elts(elt.elts) + + def visit_Constant(self, node): + # do not update the names in import statements + if node.value in \ + StrategyUpdater.otif_ot_unfilledtimeout: + node.value = \ + StrategyUpdater.otif_ot_unfilledtimeout[node.value] + return node