Merge pull request #2923 from hroff-1902/status-strategies

Add printing statuses for enlisted strategies and hyperopts
This commit is contained in:
Matthias
2020-02-15 19:43:02 +01:00
committed by GitHub
6 changed files with 88 additions and 28 deletions

View File

@@ -30,9 +30,9 @@ ARGS_HYPEROPT = ARGS_COMMON_OPTIMIZE + ["hyperopt", "hyperopt_path",
ARGS_EDGE = ARGS_COMMON_OPTIMIZE + ["stoploss_range"]
ARGS_LIST_STRATEGIES = ["strategy_path", "print_one_column"]
ARGS_LIST_STRATEGIES = ["strategy_path", "print_one_column", "print_colorized"]
ARGS_LIST_HYPEROPTS = ["hyperopt_path", "print_one_column"]
ARGS_LIST_HYPEROPTS = ["hyperopt_path", "print_one_column", "print_colorized"]
ARGS_LIST_EXCHANGES = ["print_one_column", "list_exchanges_all"]

View File

@@ -3,8 +3,10 @@ import logging
import sys
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, List
from colorama import init as colorama_init
from colorama import Fore, Style
import rapidjson
from tabulate import tabulate
@@ -36,6 +38,29 @@ def start_list_exchanges(args: Dict[str, Any]) -> None:
print(f"Exchanges available for Freqtrade: {', '.join(exchanges)}")
def _print_objs_tabular(objs: List, print_colorized: bool) -> None:
if print_colorized:
colorama_init(autoreset=True)
red = Fore.RED
yellow = Fore.YELLOW
reset = Style.RESET_ALL
else:
red = ''
yellow = ''
reset = ''
names = [s['name'] for s in objs]
objss_to_print = [{
'name': s['name'] if s['name'] else "--",
'location': s['location'].name,
'status': (red + "LOAD FAILED" + reset if s['class'] is None
else "OK" if names.count(s['name']) == 1
else yellow + "DUPLICATE NAME" + reset)
} for s in objs]
print(tabulate(objss_to_print, headers='keys', tablefmt='pipe'))
def start_list_strategies(args: Dict[str, Any]) -> None:
"""
Print files with Strategy custom classes available in the directory
@@ -43,15 +68,14 @@ def start_list_strategies(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
directory = Path(config.get('strategy_path', config['user_data_dir'] / USERPATH_STRATEGIES))
strategies = StrategyResolver.search_all_objects(directory)
strategy_objs = StrategyResolver.search_all_objects(directory, not args['print_one_column'])
# Sort alphabetically
strategies = sorted(strategies, key=lambda x: x['name'])
strats_to_print = [{'name': s['name'], 'location': s['location'].name} for s in strategies]
strategy_objs = sorted(strategy_objs, key=lambda x: x['name'])
if args['print_one_column']:
print('\n'.join([s['name'] for s in strategies]))
print('\n'.join([s['name'] for s in strategy_objs]))
else:
print(tabulate(strats_to_print, headers='keys', tablefmt='pipe'))
_print_objs_tabular(strategy_objs, config.get('print_colorized', False))
def start_list_hyperopts(args: Dict[str, Any]) -> None:
@@ -63,15 +87,14 @@ def start_list_hyperopts(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
directory = Path(config.get('hyperopt_path', config['user_data_dir'] / USERPATH_HYPEROPTS))
hyperopts = HyperOptResolver.search_all_objects(directory)
hyperopt_objs = HyperOptResolver.search_all_objects(directory, not args['print_one_column'])
# Sort alphabetically
hyperopts = sorted(hyperopts, key=lambda x: x['name'])
hyperopts_to_print = [{'name': s['name'], 'location': s['location'].name} for s in hyperopts]
hyperopt_objs = sorted(hyperopt_objs, key=lambda x: x['name'])
if args['print_one_column']:
print('\n'.join([s['name'] for s in hyperopts]))
print('\n'.join([s['name'] for s in hyperopt_objs]))
else:
print(tabulate(hyperopts_to_print, headers='keys', tablefmt='pipe'))
_print_objs_tabular(hyperopt_objs, config.get('print_colorized', False))
def start_list_timeframes(args: Dict[str, Any]) -> None:

View File

@@ -7,7 +7,7 @@ import importlib.util
import inspect
import logging
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, Union
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union
from freqtrade.exceptions import OperationalException
@@ -40,12 +40,14 @@ class IResolver:
return abs_paths
@classmethod
def _get_valid_object(cls, module_path: Path,
object_name: Optional[str]) -> Generator[Any, None, None]:
def _get_valid_object(cls, module_path: Path, object_name: Optional[str],
enum_failed: bool = False) -> Iterator[Any]:
"""
Generator returning objects with matching object_type and object_name in the path given.
:param module_path: absolute path to the module
:param object_name: Class name of the object
:param enum_failed: If True, will return None for modules which fail.
Otherwise, failing modules are skipped.
:return: generator containing matching objects
"""
@@ -58,6 +60,8 @@ class IResolver:
except (ModuleNotFoundError, SyntaxError) as err:
# Catch errors in case a specific module is not installed
logger.warning(f"Could not import {module_path} due to '{err}'")
if enum_failed:
return iter([None])
valid_objects_gen = (
obj for name, obj in inspect.getmembers(module, inspect.isclass)
@@ -136,10 +140,13 @@ class IResolver:
)
@classmethod
def search_all_objects(cls, directory: Path) -> List[Dict[str, Any]]:
def search_all_objects(cls, directory: Path,
enum_failed: bool) -> List[Dict[str, Any]]:
"""
Searches a directory for valid objects
:param directory: Path to search
:param enum_failed: If True, will return None for modules which fail.
Otherwise, failing modules are skipped.
:return: List of dicts containing 'name', 'class' and 'location' entires
"""
logger.debug(f"Searching for {cls.object_type.__name__} '{directory}'")
@@ -151,9 +158,10 @@ class IResolver:
continue
module_path = entry.resolve()
logger.debug(f"Path {module_path}")
for obj in cls._get_valid_object(module_path, object_name=None):
for obj in cls._get_valid_object(module_path, object_name=None,
enum_failed=enum_failed):
objects.append(
{'name': obj.__name__,
{'name': obj.__name__ if obj is not None else '',
'class': obj,
'location': entry,
})