From ed55296d202d725b5d2d5e95e60273a8d21dcf1a Mon Sep 17 00:00:00 2001 From: hippocritical Date: Wed, 4 Jan 2023 23:49:33 +0100 Subject: [PATCH] removed prints for strategy could not be loaded Changed logic to contain much less if conditions currently still missing: Webhook terminology, Telegram notification settings, Strategy/Config settings --- freqtrade/strategy/strategyupdater.py | 124 ++++++++++++-------------- tests/test_strategy_updater.py | 31 ++++--- 2 files changed, 73 insertions(+), 82 deletions(-) diff --git a/freqtrade/strategy/strategyupdater.py b/freqtrade/strategy/strategyupdater.py index d12d3eaaa..ad14bb903 100644 --- a/freqtrade/strategy/strategyupdater.py +++ b/freqtrade/strategy/strategyupdater.py @@ -3,6 +3,8 @@ import os import shutil from pathlib import Path +import astor + class StrategyUpdater: name_mapping = { @@ -79,7 +81,7 @@ class StrategyUpdater: tree = ast.parse(code) # use the AST to update the code - updated_code = self.modify_ast(tree) + updated_code = self.modify_ast(self, tree) # return the modified code without executing it return updated_code @@ -94,67 +96,67 @@ class StrategyUpdater: ast.increment_lineno(tree, n=1) # generate the new code from the updated AST - return ast.dump(tree) + return astor.to_source(tree) # Here we go through each respective node, slice, elt, key ... to replace outdated entries. class NameUpdater(ast.NodeTransformer): def generic_visit(self, node): - # traverse the AST recursively by calling the visitor method for each child node - if hasattr(node, "_fields"): - for field_name, field_value in ast.iter_fields(node): - self.check_strategy_and_config_settings(node, field_value) - self.check_fields(field_value) - for child in ast.iter_child_nodes(node): - self.generic_visit(child) - def check_fields(self, field_value): - if isinstance(field_value, list): - for item in field_value: - if (isinstance(item, ast.AST) or isinstance(item, ast.If) or - isinstance(item, ast.Expr)): - self.visit(item) - if isinstance(field_value, ast.Name): - self.visit_Name(field_value) + # space is not yet transferred from buy/sell to entry/exit and thereby has to be skipped. + if isinstance(node, ast.keyword): + if node.arg == "space": + return node - def check_strategy_and_config_settings(self, node, field_value): - if (isinstance(field_value, ast.AST) and - hasattr(node, "targets") and - isinstance(node.targets, list)): - for target in node.targets: - if (hasattr(target, "id") and - hasattr(field_value, "keys") and - isinstance(field_value.keys, list)): - if (target.id == "order_time_in_force" or - target.id == "order_types" or - target.id == "unfilledtimeout"): - for key in field_value.keys: - self.visit(key) + # from here on this is the original function. + for field, old_value in ast.iter_fields(node): + if isinstance(old_value, list): + new_values = [] + for value in old_value: + if isinstance(value, ast.AST): + value = self.visit(value) + if value is None: + continue + elif not isinstance(value, ast.AST): + new_values.extend(value) + continue + new_values.append(value) + old_value[:] = new_values + elif isinstance(old_value, ast.AST): + new_node = self.visit(old_value) + if new_node is None: + delattr(node, field) + else: + setattr(node, field, new_node) + return node - def check_args(self, node): - if isinstance(node.args, ast.arguments): - self.check_args(node.args) - if hasattr(node, "args"): - if isinstance(node.args, list): - for arg in node.args: - if arg.arg in StrategyUpdater.name_mapping: - arg.arg = StrategyUpdater.name_mapping[arg.arg] + def visit_Expr(self, node): + node.value.left.id = self.check_dict(StrategyUpdater.name_mapping, node.value.left.id) + self.visit(node.value) + return node + + # Renames an element if contained inside a dictionary. + @staticmethod + def check_dict(current_dict: dict, element: str): + if element in current_dict: + element = current_dict[element] + return element + + def visit_arguments(self, node): + if isinstance(node.args, list): + for arg in node.args: + arg.arg = self.check_dict(StrategyUpdater.name_mapping, arg.arg) return node def visit_Name(self, node): # if the name is in the mapping, update it - if node.id in StrategyUpdater.name_mapping: - node.id = StrategyUpdater.name_mapping[node.id] + node.id = self.check_dict(StrategyUpdater.name_mapping, node.id) return node def visit_Import(self, node): # do not update the names in import statements return node - # This function is currently never successfully triggered - # since freqtrade currently only allows valid code to be processed. - # The module .hyper does not anymore exist and by that fails to even - # reach this function to be updated currently. def visit_ImportFrom(self, node): # if hasattr(node, "module"): # if node.module == "freqtrade.strategy.hyper": @@ -164,33 +166,21 @@ class NameUpdater(ast.NodeTransformer): def visit_If(self, node: ast.If): for child in ast.iter_child_nodes(node): self.visit(child) - return self.generic_visit(node) + return node def visit_FunctionDef(self, node): - # if the function name is in the mapping, update it - if node.name in StrategyUpdater.function_mapping: - node.name = StrategyUpdater.function_mapping[node.name] - if hasattr(node, "args"): - self.check_args(node) - return self.generic_visit(node) - - def visit_Assign(self, node): - if hasattr(node, "targets") and isinstance(node.targets, list): - for target in node.targets: - if hasattr(target, "id") and target.id in StrategyUpdater.name_mapping: - target.id = StrategyUpdater.name_mapping[target.id] + node.name = self.check_dict(StrategyUpdater.function_mapping, node.name) + self.generic_visit(node) return node def visit_Attribute(self, node): - # if the attribute name is 'nr_of_successful_buys', - # update it to 'nr_of_successful_entries' if ( isinstance(node.value, ast.Name) and node.value.id == 'trades' and node.attr == 'nr_of_successful_buys' ): node.attr = 'nr_of_successful_entries' - return self.generic_visit(node) + return node def visit_ClassDef(self, node): # check if the class is derived from IStrategy @@ -216,7 +206,8 @@ class NameUpdater(ast.NodeTransformer): and child.targets[0].id == 'INTERFACE_VERSION' ): child.value = ast.parse('3').body[0].value - return self.generic_visit(node) + self.generic_visit(node) + return node def visit_Subscript(self, node): if isinstance(node.slice, ast.Constant): @@ -228,10 +219,6 @@ class NameUpdater(ast.NodeTransformer): if hasattr(node.slice, "value"): if hasattr(node.slice.value, "elts"): self.visit_elts(node.slice.value.elts) - # Check if the target is a Subscript object with a "value" attribute - # if isinstance(target, ast.Subscript) and hasattr(target.value, "attr"): - # if target.value.attr == "loc": - # self.visit(target) return node # elts can have elts (technically recursively) @@ -241,6 +228,7 @@ class NameUpdater(ast.NodeTransformer): self.visit_elt(elt) else: self.visit_elt(elts) + return elts # sub function again needed since the structure itself is highly flexible ... def visit_elt(self, elt): @@ -254,9 +242,9 @@ class NameUpdater(ast.NodeTransformer): else: for arg in elt.args: self.visit_elts(arg) + return elt def visit_Constant(self, node): - # do not update the names in import statements - if node.value in StrategyUpdater.otif_ot_unfilledtimeout: - node.value = StrategyUpdater.otif_ot_unfilledtimeout[node.value] + node.value = self.check_dict(StrategyUpdater.otif_ot_unfilledtimeout, node.value) + node.value = self.check_dict(StrategyUpdater.name_mapping, node.value) return node diff --git a/tests/test_strategy_updater.py b/tests/test_strategy_updater.py index 682c715fe..3ece2b3d8 100644 --- a/tests/test_strategy_updater.py +++ b/tests/test_strategy_updater.py @@ -4,11 +4,6 @@ from freqtrade.strategy.strategyupdater import StrategyUpdater def test_strategy_updater(default_conf, caplog) -> None: - modified_code2 = StrategyUpdater.update_code(StrategyUpdater, """ -ticker_interval = '15m' -buy_some_parameter = IntParameter(space='buy') -sell_some_parameter = IntParameter(space='sell') -""") modified_code1 = StrategyUpdater.update_code(StrategyUpdater, """ class testClass(IStrategy): def populate_buy_trend(): @@ -21,6 +16,11 @@ class testClass(IStrategy): pass def custom_sell(): pass +""") + modified_code2 = StrategyUpdater.update_code(StrategyUpdater, """ +ticker_interval = '15m' +buy_some_parameter = IntParameter(space='buy') +sell_some_parameter = IntParameter(space='sell') """) modified_code3 = StrategyUpdater.update_code(StrategyUpdater, """ use_sell_signal = True @@ -59,11 +59,14 @@ def confirm_trade_exit(sell_reason): if (sell_reason == 'stop_loss'): pass """) - # modified_code8 = StrategyUpdater.update_code(StrategyUpdater, """ - # sell_reason == 'sell_signal' - # sell_reason == 'force_sell' - # sell_reason == 'emergency_sell' - # """) + modified_code8 = StrategyUpdater.update_code(StrategyUpdater, """ +sell_reason == 'sell_signal' +sell_reason == 'force_sell' +sell_reason == 'emergency_sell' +""") + + # currently still missing: + # Webhook terminology, Telegram notification settings, Strategy/Config settings assert "populate_entry_trend" in modified_code1 assert "populate_exit_trend" in modified_code1 @@ -100,7 +103,7 @@ def confirm_trade_exit(sell_reason): assert "exit_reason == 'stop_loss'" in modified_code7 # those tests currently don't work, next in line. - # assert "exit_signal" in modified_code8 - # assert "exit_reason" in modified_code8 - # assert "force_exit" in modified_code8 - # assert "emergency_exit" in modified_code8 + assert "exit_signal" in modified_code8 + assert "exit_reason" in modified_code8 + assert "force_exit" in modified_code8 + assert "emergency_exit" in modified_code8