fixing issues of the maintainer
found a bug meaning elts could contain lists of elts (now recurively gone through) Next in line: writing tests based on StrategyUpdater.update_code
This commit is contained in:
parent
70e9fa6136
commit
c6f045afa9
@ -17,7 +17,7 @@ def start_strategy_update(args: Dict[str, Any]) -> None:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# Import here to avoid loading backtesting module when it's not used
|
# Import here to avoid loading backtesting module when it's not used
|
||||||
from freqtrade.strategy.strategy_updater import strategy_updater
|
from freqtrade.strategy.strategyupdater import StrategyUpdater
|
||||||
|
|
||||||
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
||||||
|
|
||||||
@ -33,5 +33,5 @@ def start_strategy_update(args: Dict[str, Any]) -> None:
|
|||||||
|
|
||||||
for filtered_strategy_obj in filtered_strategy_objs:
|
for filtered_strategy_obj in filtered_strategy_objs:
|
||||||
# Initialize backtesting object
|
# Initialize backtesting object
|
||||||
instance_strategy_updater = strategy_updater()
|
instance_strategy_updater = StrategyUpdater()
|
||||||
strategy_updater.start(instance_strategy_updater, filtered_strategy_obj)
|
StrategyUpdater.start(instance_strategy_updater, config, filtered_strategy_obj)
|
||||||
|
@ -1,213 +0,0 @@
|
|||||||
import ast
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
|
|
||||||
class strategy_updater:
|
|
||||||
name_mapping = {
|
|
||||||
'ticker_interval': 'timeframe',
|
|
||||||
'buy': 'enter_long',
|
|
||||||
'sell': 'exit_long',
|
|
||||||
'buy_tag': 'enter_tag',
|
|
||||||
'sell_reason': 'exit_reason',
|
|
||||||
|
|
||||||
'sell_signal': 'exit_signal',
|
|
||||||
'custom_sell': 'custom_exit',
|
|
||||||
'force_sell': 'force_exit',
|
|
||||||
'emergency_sell': 'emergency_exit',
|
|
||||||
|
|
||||||
# Strategy/config settings:
|
|
||||||
'use_sell_signal': 'use_exit_signal',
|
|
||||||
'sell_profit_only': 'exit_profit_only',
|
|
||||||
'sell_profit_offset': 'exit_profit_offset',
|
|
||||||
'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal',
|
|
||||||
'forcebuy_enable': 'force_entry_enable',
|
|
||||||
}
|
|
||||||
|
|
||||||
function_mapping = {
|
|
||||||
'populate_buy_trend': 'populate_entry_trend',
|
|
||||||
'populate_sell_trend': 'populate_exit_trend',
|
|
||||||
'custom_sell': 'custom_exit',
|
|
||||||
'check_buy_timeout': 'check_entry_timeout',
|
|
||||||
'check_sell_timeout': 'check_exit_timeout',
|
|
||||||
# '': '',
|
|
||||||
}
|
|
||||||
# order_time_in_force, order_types, unfilledtimeout
|
|
||||||
otif_ot_unfilledtimeout = {
|
|
||||||
'buy': 'entry',
|
|
||||||
'sell': 'exit',
|
|
||||||
}
|
|
||||||
|
|
||||||
# create a dictionary that maps the old column names to the new ones
|
|
||||||
rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'}
|
|
||||||
|
|
||||||
def start(self, strategy_obj: dict) -> None:
|
|
||||||
"""
|
|
||||||
Run strategy updater
|
|
||||||
It updates a strategy to v3 with the help of the ast-module
|
|
||||||
:return: None
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.cwd = os.getcwd()
|
|
||||||
self.strategies_backup_folder = f'{os.getcwd()}user_data/strategies_orig_updater'
|
|
||||||
source_file = strategy_obj['location']
|
|
||||||
|
|
||||||
# read the file
|
|
||||||
with open(source_file, 'r') as f:
|
|
||||||
old_code = f.read()
|
|
||||||
if not os.path.exists(self.strategies_backup_folder):
|
|
||||||
os.makedirs(self.strategies_backup_folder)
|
|
||||||
|
|
||||||
# backup original
|
|
||||||
# => currently no date after the filename,
|
|
||||||
# could get overridden pretty fast if this is fired twice!
|
|
||||||
shutil.copy(source_file, f"{self.strategies_backup_folder}/{strategy_obj['location_rel']}")
|
|
||||||
|
|
||||||
# update the code
|
|
||||||
new_code = strategy_updater.update_code(self, old_code,
|
|
||||||
strategy_updater.name_mapping,
|
|
||||||
strategy_updater.function_mapping,
|
|
||||||
strategy_updater.rename_dict)
|
|
||||||
|
|
||||||
# write the modified code to the destination folder
|
|
||||||
with open(source_file, 'w') as f:
|
|
||||||
f.write(new_code)
|
|
||||||
print(f"conversion of file {source_file} successful.")
|
|
||||||
|
|
||||||
# define the function to update the code
|
|
||||||
def update_code(self, code, _name_mapping, _function_mapping, _rename_dict):
|
|
||||||
# parse the code into an AST
|
|
||||||
tree = ast.parse(code)
|
|
||||||
|
|
||||||
# use the AST to update the code
|
|
||||||
updated_code = strategy_updater.modify_ast(
|
|
||||||
tree,
|
|
||||||
_name_mapping,
|
|
||||||
_function_mapping,
|
|
||||||
_rename_dict)
|
|
||||||
|
|
||||||
# return the modified code without executing it
|
|
||||||
return updated_code
|
|
||||||
|
|
||||||
# function that uses the ast module to update the code
|
|
||||||
def modify_ast(node, _name_mapping, _function_mapping, _rename_dict): # noqa
|
|
||||||
# create a visitor that will update the names and functions
|
|
||||||
class NameUpdater(ast.NodeTransformer):
|
|
||||||
def generic_visit(self, node):
|
|
||||||
# traverse the AST recursively by calling the visitor method for each child node
|
|
||||||
if hasattr(node, "_fields"):
|
|
||||||
for field_name, field_value in ast.iter_fields(node):
|
|
||||||
self.check_fields(field_value)
|
|
||||||
self.check_strategy_and_config_settings(field_value)
|
|
||||||
# add this check to handle the case where field_value is a slice
|
|
||||||
if isinstance(field_value, ast.Slice):
|
|
||||||
self.visit(field_value)
|
|
||||||
# add this check to handle the case where field_value is a target
|
|
||||||
if isinstance(field_value, ast.expr_context):
|
|
||||||
self.visit(field_value)
|
|
||||||
|
|
||||||
def check_fields(self, field_value):
|
|
||||||
if isinstance(field_value, list):
|
|
||||||
for item in field_value:
|
|
||||||
if isinstance(item, ast.AST):
|
|
||||||
self.visit(item)
|
|
||||||
|
|
||||||
def check_strategy_and_config_settings(self, field_value):
|
|
||||||
if (isinstance(field_value, ast.AST) and
|
|
||||||
hasattr(node, "targets") and
|
|
||||||
isinstance(node.targets, list)):
|
|
||||||
for target in node.targets:
|
|
||||||
if (hasattr(target, "id") and
|
|
||||||
(target.id == "order_time_in_force" or
|
|
||||||
target.id == "order_types" or
|
|
||||||
target.id == "unfilledtimeout") and
|
|
||||||
hasattr(field_value, "keys") and
|
|
||||||
isinstance(field_value.keys, list)):
|
|
||||||
for key in field_value.keys:
|
|
||||||
self.visit(key)
|
|
||||||
|
|
||||||
def visit_Name(self, node):
|
|
||||||
# if the name is in the mapping, update it
|
|
||||||
if node.id in _name_mapping:
|
|
||||||
node.id = _name_mapping[node.id]
|
|
||||||
return node
|
|
||||||
|
|
||||||
def visit_Import(self, node):
|
|
||||||
# do not update the names in import statements
|
|
||||||
return node
|
|
||||||
|
|
||||||
def visit_ImportFrom(self, node):
|
|
||||||
# do not update the names in import statements
|
|
||||||
if hasattr(node, "module"):
|
|
||||||
if node.module == "freqtrade.strategy.hyper":
|
|
||||||
node.module = "freqtrade.strategy"
|
|
||||||
return node
|
|
||||||
|
|
||||||
def visit_FunctionDef(self, node):
|
|
||||||
# if the function name is in the mapping, update it
|
|
||||||
if node.name in _function_mapping:
|
|
||||||
node.name = _function_mapping[node.name]
|
|
||||||
return self.generic_visit(node)
|
|
||||||
|
|
||||||
def visit_Attribute(self, node):
|
|
||||||
# if the attribute name is 'nr_of_successful_buys',
|
|
||||||
# update it to 'nr_of_successful_entries'
|
|
||||||
if isinstance(node.value, ast.Name) and \
|
|
||||||
node.value.id == 'trades' and \
|
|
||||||
node.attr == 'nr_of_successful_buys':
|
|
||||||
node.attr = 'nr_of_successful_entries'
|
|
||||||
return self.generic_visit(node)
|
|
||||||
|
|
||||||
def visit_ClassDef(self, node):
|
|
||||||
# check if the class is derived from IStrategy
|
|
||||||
if any(isinstance(base, ast.Name) and
|
|
||||||
base.id == 'IStrategy' for base in node.bases):
|
|
||||||
# check if the INTERFACE_VERSION variable exists
|
|
||||||
has_interface_version = any(
|
|
||||||
isinstance(child, ast.Assign) and
|
|
||||||
isinstance(child.targets[0], ast.Name) and
|
|
||||||
child.targets[0].id == 'INTERFACE_VERSION'
|
|
||||||
for child in node.body
|
|
||||||
)
|
|
||||||
|
|
||||||
# if the INTERFACE_VERSION variable does not exist, add it as the first child
|
|
||||||
if not has_interface_version:
|
|
||||||
node.body.insert(0, ast.parse('INTERFACE_VERSION = 3').body[0])
|
|
||||||
# otherwise, update its value to 3
|
|
||||||
else:
|
|
||||||
for child in node.body:
|
|
||||||
if isinstance(child, ast.Assign) and \
|
|
||||||
isinstance(child.targets[0], ast.Name) and \
|
|
||||||
child.targets[0].id == 'INTERFACE_VERSION':
|
|
||||||
child.value = ast.parse('3').body[0].value
|
|
||||||
return self.generic_visit(node)
|
|
||||||
|
|
||||||
def visit_Subscript(self, node):
|
|
||||||
if isinstance(node.slice, ast.Constant):
|
|
||||||
if node.slice.value in strategy_updater.rename_dict:
|
|
||||||
# Replace the slice attributes with the values from rename_dict
|
|
||||||
node.slice.value = strategy_updater.rename_dict[node.slice.value]
|
|
||||||
if hasattr(node.slice, "elts"):
|
|
||||||
for elt in node.slice.elts:
|
|
||||||
if isinstance(elt, ast.Constant) and \
|
|
||||||
elt.value in strategy_updater.rename_dict:
|
|
||||||
elt.value = strategy_updater.rename_dict[elt.value]
|
|
||||||
return node
|
|
||||||
|
|
||||||
def visit_Constant(self, node):
|
|
||||||
# do not update the names in import statements
|
|
||||||
if node.value in \
|
|
||||||
strategy_updater.otif_ot_unfilledtimeout:
|
|
||||||
node.value = \
|
|
||||||
strategy_updater.otif_ot_unfilledtimeout[node.value]
|
|
||||||
return node
|
|
||||||
|
|
||||||
# use the visitor to update the names and functions in the AST
|
|
||||||
NameUpdater().visit(node)
|
|
||||||
|
|
||||||
# first fix the comments so it understands "\n" properly inside multi line comments.
|
|
||||||
ast.fix_missing_locations(node)
|
|
||||||
ast.increment_lineno(node, n=1)
|
|
||||||
|
|
||||||
# generate the new code from the updated AST
|
|
||||||
return ast.unparse(node)
|
|
225
freqtrade/strategy/strategyupdater.py
Normal file
225
freqtrade/strategy/strategyupdater.py
Normal file
@ -0,0 +1,225 @@
|
|||||||
|
import ast
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import astor
|
||||||
|
|
||||||
|
|
||||||
|
class StrategyUpdater:
|
||||||
|
name_mapping = {
|
||||||
|
'ticker_interval': 'timeframe',
|
||||||
|
'buy': 'enter_long',
|
||||||
|
'sell': 'exit_long',
|
||||||
|
'buy_tag': 'enter_tag',
|
||||||
|
'sell_reason': 'exit_reason',
|
||||||
|
|
||||||
|
'sell_signal': 'exit_signal',
|
||||||
|
'custom_sell': 'custom_exit',
|
||||||
|
'force_sell': 'force_exit',
|
||||||
|
'emergency_sell': 'emergency_exit',
|
||||||
|
|
||||||
|
# Strategy/config settings:
|
||||||
|
'use_sell_signal': 'use_exit_signal',
|
||||||
|
'sell_profit_only': 'exit_profit_only',
|
||||||
|
'sell_profit_offset': 'exit_profit_offset',
|
||||||
|
'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal',
|
||||||
|
'forcebuy_enable': 'force_entry_enable',
|
||||||
|
}
|
||||||
|
|
||||||
|
function_mapping = {
|
||||||
|
'populate_buy_trend': 'populate_entry_trend',
|
||||||
|
'populate_sell_trend': 'populate_exit_trend',
|
||||||
|
'custom_sell': 'custom_exit',
|
||||||
|
'check_buy_timeout': 'check_entry_timeout',
|
||||||
|
'check_sell_timeout': 'check_exit_timeout',
|
||||||
|
# '': '',
|
||||||
|
}
|
||||||
|
# order_time_in_force, order_types, unfilledtimeout
|
||||||
|
otif_ot_unfilledtimeout = {
|
||||||
|
'buy': 'entry',
|
||||||
|
'sell': 'exit',
|
||||||
|
}
|
||||||
|
|
||||||
|
# create a dictionary that maps the old column names to the new ones
|
||||||
|
rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'}
|
||||||
|
|
||||||
|
def start(self, config, strategy_obj: dict) -> None:
|
||||||
|
"""
|
||||||
|
Run strategy updater
|
||||||
|
It updates a strategy to v3 with the help of the ast-module
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
|
||||||
|
source_file = strategy_obj['location']
|
||||||
|
strategies_backup_folder = Path.joinpath(config['user_data_dir'], "strategies_orig_updater")
|
||||||
|
target_file = Path.joinpath(strategies_backup_folder, strategy_obj['location_rel'])
|
||||||
|
|
||||||
|
# read the file
|
||||||
|
with open(source_file, 'r') as f:
|
||||||
|
old_code = f.read()
|
||||||
|
if not os.path.exists(strategies_backup_folder):
|
||||||
|
os.makedirs(strategies_backup_folder)
|
||||||
|
|
||||||
|
# backup original
|
||||||
|
# => currently no date after the filename,
|
||||||
|
# could get overridden pretty fast if this is fired twice!
|
||||||
|
# The folder is always the same and the file name too (currently).
|
||||||
|
shutil.copy(source_file, target_file)
|
||||||
|
|
||||||
|
# update the code
|
||||||
|
new_code = StrategyUpdater.update_code(self, old_code)
|
||||||
|
|
||||||
|
# write the modified code to the destination folder
|
||||||
|
with open(source_file, 'w') as f:
|
||||||
|
f.write(new_code)
|
||||||
|
print(f"conversion of file {source_file} successful.")
|
||||||
|
|
||||||
|
# define the function to update the code
|
||||||
|
def update_code(self, code):
|
||||||
|
# parse the code into an AST
|
||||||
|
tree = ast.parse(code)
|
||||||
|
|
||||||
|
# use the AST to update the code
|
||||||
|
updated_code = self.modify_ast(
|
||||||
|
tree)
|
||||||
|
|
||||||
|
# return the modified code without executing it
|
||||||
|
return updated_code
|
||||||
|
|
||||||
|
# function that uses the ast module to update the code
|
||||||
|
def modify_ast(self, tree): # noqa
|
||||||
|
# use the visitor to update the names and functions in the AST
|
||||||
|
NameUpdater().visit(tree)
|
||||||
|
|
||||||
|
# first fix the comments, so it understands "\n" properly inside multi line comments.
|
||||||
|
ast.fix_missing_locations(tree)
|
||||||
|
ast.increment_lineno(tree, n=1)
|
||||||
|
|
||||||
|
# generate the new code from the updated AST
|
||||||
|
return astor.to_source(tree)
|
||||||
|
|
||||||
|
|
||||||
|
# Here we go through each respective node, slice, elt, key ... to replace outdated entries.
|
||||||
|
class NameUpdater(ast.NodeTransformer):
|
||||||
|
def generic_visit(self, node):
|
||||||
|
# traverse the AST recursively by calling the visitor method for each child node
|
||||||
|
if hasattr(node, "_fields"):
|
||||||
|
for field_name, field_value in ast.iter_fields(node):
|
||||||
|
self.visit(field_value)
|
||||||
|
self.generic_visit(field_value)
|
||||||
|
self.check_fields(field_value)
|
||||||
|
self.check_strategy_and_config_settings(node, field_value)
|
||||||
|
# add this check to handle the case where field_value is a slice
|
||||||
|
if isinstance(field_value, ast.Slice):
|
||||||
|
self.visit(field_value)
|
||||||
|
# add this check to handle the case where field_value is a target
|
||||||
|
if isinstance(field_value, ast.expr_context):
|
||||||
|
self.visit(field_value)
|
||||||
|
|
||||||
|
def check_fields(self, field_value):
|
||||||
|
if isinstance(field_value, list):
|
||||||
|
for item in field_value:
|
||||||
|
if isinstance(item, ast.AST):
|
||||||
|
self.visit(item)
|
||||||
|
|
||||||
|
def check_strategy_and_config_settings(self, node, field_value):
|
||||||
|
if (isinstance(field_value, ast.AST) and
|
||||||
|
hasattr(node, "targets") and
|
||||||
|
isinstance(node.targets, list)):
|
||||||
|
for target in node.targets:
|
||||||
|
if (hasattr(target, "id") and
|
||||||
|
hasattr(field_value, "keys") and
|
||||||
|
isinstance(field_value.keys, list)):
|
||||||
|
if (target.id == "order_time_in_force" or
|
||||||
|
target.id == "order_types" or
|
||||||
|
target.id == "unfilledtimeout"):
|
||||||
|
for key in field_value.keys:
|
||||||
|
self.visit(key)
|
||||||
|
# Check if the target is a Subscript object with a "value" attribute
|
||||||
|
if isinstance(target, ast.Subscript) and hasattr(target.value, "attr"):
|
||||||
|
if target.value.attr == "loc":
|
||||||
|
self.visit(target)
|
||||||
|
|
||||||
|
def visit_Name(self, node):
|
||||||
|
# if the name is in the mapping, update it
|
||||||
|
if node.id in StrategyUpdater.name_mapping:
|
||||||
|
node.id = StrategyUpdater.name_mapping[node.id]
|
||||||
|
return node
|
||||||
|
|
||||||
|
def visit_Import(self, node):
|
||||||
|
# do not update the names in import statements
|
||||||
|
return node
|
||||||
|
|
||||||
|
def visit_ImportFrom(self, node):
|
||||||
|
# do not update the names in import statements
|
||||||
|
if hasattr(node, "module"):
|
||||||
|
if node.module == "freqtrade.strategy.hyper":
|
||||||
|
node.module = "freqtrade.strategy"
|
||||||
|
return node
|
||||||
|
|
||||||
|
def visit_FunctionDef(self, node):
|
||||||
|
# if the function name is in the mapping, update it
|
||||||
|
if node.name in StrategyUpdater.function_mapping:
|
||||||
|
node.name = StrategyUpdater.function_mapping[node.name]
|
||||||
|
return self.generic_visit(node)
|
||||||
|
|
||||||
|
def visit_Attribute(self, node):
|
||||||
|
# if the attribute name is 'nr_of_successful_buys',
|
||||||
|
# update it to 'nr_of_successful_entries'
|
||||||
|
if isinstance(node.value, ast.Name) and \
|
||||||
|
node.value.id == 'trades' and \
|
||||||
|
node.attr == 'nr_of_successful_buys':
|
||||||
|
node.attr = 'nr_of_successful_entries'
|
||||||
|
return self.generic_visit(node)
|
||||||
|
|
||||||
|
def visit_ClassDef(self, node):
|
||||||
|
# check if the class is derived from IStrategy
|
||||||
|
if any(isinstance(base, ast.Name) and
|
||||||
|
base.id == 'IStrategy' for base in node.bases):
|
||||||
|
# check if the INTERFACE_VERSION variable exists
|
||||||
|
has_interface_version = any(
|
||||||
|
isinstance(child, ast.Assign) and
|
||||||
|
isinstance(child.targets[0], ast.Name) and
|
||||||
|
child.targets[0].id == 'INTERFACE_VERSION'
|
||||||
|
for child in node.body
|
||||||
|
)
|
||||||
|
|
||||||
|
# if the INTERFACE_VERSION variable does not exist, add it as the first child
|
||||||
|
if not has_interface_version:
|
||||||
|
node.body.insert(0, ast.parse('INTERFACE_VERSION = 3').body[0])
|
||||||
|
# otherwise, update its value to 3
|
||||||
|
else:
|
||||||
|
for child in node.body:
|
||||||
|
if isinstance(child, ast.Assign) and \
|
||||||
|
isinstance(child.targets[0], ast.Name) and \
|
||||||
|
child.targets[0].id == 'INTERFACE_VERSION':
|
||||||
|
child.value = ast.parse('3').body[0].value
|
||||||
|
return self.generic_visit(node)
|
||||||
|
|
||||||
|
def visit_Subscript(self, node):
|
||||||
|
if isinstance(node.slice, ast.Constant):
|
||||||
|
if node.slice.value in StrategyUpdater.rename_dict:
|
||||||
|
# Replace the slice attributes with the values from rename_dict
|
||||||
|
node.slice.value = StrategyUpdater.rename_dict[node.slice.value]
|
||||||
|
if hasattr(node.slice, "elts"):
|
||||||
|
self.visit_slice_elts(node.slice.elts)
|
||||||
|
if hasattr(node.slice.value, "elts"):
|
||||||
|
self.visit_slice_elts(node.slice.value.elts)
|
||||||
|
return node
|
||||||
|
|
||||||
|
# elts can have elts (technically recursively)
|
||||||
|
def visit_slice_elts(self, elts):
|
||||||
|
for elt in elts:
|
||||||
|
if isinstance(elt, ast.Constant) and elt.value in StrategyUpdater.rename_dict:
|
||||||
|
elt.value = StrategyUpdater.rename_dict[elt.value]
|
||||||
|
elif hasattr(elt, "elts"):
|
||||||
|
self.visit_slice_elts(elt.elts)
|
||||||
|
|
||||||
|
def visit_Constant(self, node):
|
||||||
|
# do not update the names in import statements
|
||||||
|
if node.value in \
|
||||||
|
StrategyUpdater.otif_ot_unfilledtimeout:
|
||||||
|
node.value = \
|
||||||
|
StrategyUpdater.otif_ot_unfilledtimeout[node.value]
|
||||||
|
return node
|
Loading…
Reference in New Issue
Block a user