Add some type hints

This commit is contained in:
hroff-1902 2020-02-02 07:00:40 +03:00
parent 2396f35586
commit f3d500085c
28 changed files with 114 additions and 100 deletions

View File

@ -1,6 +1,6 @@
import logging import logging
import sys import sys
from typing import Any, Dict, List from typing import Any, Dict, List, cast
import arrow import arrow
@ -43,16 +43,18 @@ def start_download_data(args: Dict[str, Any]) -> None:
if config.get('download_trades'): if config.get('download_trades'):
pairs_not_available = refresh_backtest_trades_data( pairs_not_available = refresh_backtest_trades_data(
exchange, pairs=config["pairs"], datadir=config['datadir'], exchange, pairs=config["pairs"], datadir=config['datadir'],
timerange=timerange, erase=config.get("erase")) timerange=timerange, erase=cast(bool, config.get("erase")))
# Convert downloaded trade data to different timeframes # Convert downloaded trade data to different timeframes
convert_trades_to_ohlcv( convert_trades_to_ohlcv(
pairs=config["pairs"], timeframes=config["timeframes"], pairs=config["pairs"], timeframes=config["timeframes"],
datadir=config['datadir'], timerange=timerange, erase=config.get("erase")) datadir=config['datadir'], timerange=timerange,
erase=cast(bool, config.get("erase")))
else: else:
pairs_not_available = refresh_backtest_ohlcv_data( pairs_not_available = refresh_backtest_ohlcv_data(
exchange, pairs=config["pairs"], timeframes=config["timeframes"], exchange, pairs=config["pairs"], timeframes=config["timeframes"],
datadir=config['datadir'], timerange=timerange, erase=config.get("erase")) datadir=config['datadir'], timerange=timerange,
erase=cast(bool, config.get("erase")))
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("SIGINT received, aborting ...") sys.exit("SIGINT received, aborting ...")

View File

@ -28,7 +28,7 @@ def start_create_userdir(args: Dict[str, Any]) -> None:
sys.exit(1) sys.exit(1)
def deploy_new_strategy(strategy_name, strategy_path: Path, subtemplate: str): def deploy_new_strategy(strategy_name: str, strategy_path: Path, subtemplate: str) -> None:
""" """
Deploy new strategy from template to strategy_path Deploy new strategy from template to strategy_path
""" """
@ -69,7 +69,7 @@ def start_new_strategy(args: Dict[str, Any]) -> None:
raise OperationalException("`new-strategy` requires --strategy to be set.") raise OperationalException("`new-strategy` requires --strategy to be set.")
def deploy_new_hyperopt(hyperopt_name, hyperopt_path: Path, subtemplate: str): def deploy_new_hyperopt(hyperopt_name: str, hyperopt_path: Path, subtemplate: str) -> None:
""" """
Deploys a new hyperopt template to hyperopt_path Deploys a new hyperopt template to hyperopt_path
""" """

View File

@ -5,7 +5,7 @@ from freqtrade.exceptions import OperationalException
from freqtrade.state import RunMode from freqtrade.state import RunMode
def validate_plot_args(args: Dict[str, Any]): def validate_plot_args(args: Dict[str, Any]) -> None:
if not args.get('datadir') and not args.get('config'): if not args.get('datadir') and not args.get('config'):
raise OperationalException( raise OperationalException(
"You need to specify either `--datadir` or `--config` " "You need to specify either `--datadir` or `--config` "

View File

@ -10,7 +10,7 @@ from freqtrade.state import RunMode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def remove_credentials(config: Dict[str, Any]): def remove_credentials(config: Dict[str, Any]) -> None:
""" """
Removes exchange keys from the configuration and specifies dry-run Removes exchange keys from the configuration and specifies dry-run
Used for backtesting / hyperopt / edge and utils. Used for backtesting / hyperopt / edge and utils.

View File

@ -13,7 +13,7 @@ logger = logging.getLogger(__name__)
def check_conflicting_settings(config: Dict[str, Any], def check_conflicting_settings(config: Dict[str, Any],
section1: str, name1: str, section1: str, name1: str,
section2: str, name2: str): section2: str, name2: str) -> None:
section1_config = config.get(section1, {}) section1_config = config.get(section1, {})
section2_config = config.get(section2, {}) section2_config = config.get(section2, {})
if name1 in section1_config and name2 in section2_config: if name1 in section1_config and name2 in section2_config:
@ -28,7 +28,7 @@ def check_conflicting_settings(config: Dict[str, Any],
def process_deprecated_setting(config: Dict[str, Any], def process_deprecated_setting(config: Dict[str, Any],
section1: str, name1: str, section1: str, name1: str,
section2: str, name2: str): section2: str, name2: str) -> None:
section2_config = config.get(section2, {}) section2_config = config.get(section2, {})
if name2 in section2_config: if name2 in section2_config:

View File

@ -23,7 +23,7 @@ def create_datadir(config: Dict[str, Any], datadir: Optional[str] = None) -> Pat
return folder return folder
def create_userdata_dir(directory: str, create_dir=False) -> Path: def create_userdata_dir(directory: str, create_dir: bool = False) -> Path:
""" """
Create userdata directory structure. Create userdata directory structure.
if create_dir is True, then the parent-directory will be created if it does not exist. if create_dir is True, then the parent-directory will be created if it does not exist.

View File

@ -7,6 +7,7 @@ from typing import Optional
import arrow import arrow
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -30,7 +31,7 @@ class TimeRange:
return (self.starttype == other.starttype and self.stoptype == other.stoptype return (self.starttype == other.starttype and self.stoptype == other.stoptype
and self.startts == other.startts and self.stopts == other.stopts) and self.startts == other.startts and self.stopts == other.stopts)
def subtract_start(self, seconds) -> None: def subtract_start(self, seconds: int) -> None:
""" """
Subtracts <seconds> from startts if startts is set. Subtracts <seconds> from startts if startts is set.
:param seconds: Seconds to subtract from starttime :param seconds: Seconds to subtract from starttime
@ -59,7 +60,7 @@ class TimeRange:
self.starttype = 'date' self.starttype = 'date'
@staticmethod @staticmethod
def parse_timerange(text: Optional[str]): def parse_timerange(text: Optional[str]) -> 'TimeRange':
""" """
Parse the value of the argument --timerange to determine what is the range desired Parse the value of the argument --timerange to determine what is the range desired
:param text: value from --timerange :param text: value from --timerange

View File

@ -3,7 +3,7 @@ Helpers when analyzing backtest data
""" """
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Dict from typing import Dict, Union
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@ -20,7 +20,7 @@ BT_DATA_COLUMNS = ["pair", "profitperc", "open_time", "close_time", "index", "du
"open_rate", "close_rate", "open_at_end", "sell_reason"] "open_rate", "close_rate", "open_at_end", "sell_reason"]
def load_backtest_data(filename) -> pd.DataFrame: def load_backtest_data(filename: Union[Path, str]) -> pd.DataFrame:
""" """
Load backtest data file. Load backtest data file.
:param filename: pathlib.Path object, or string pointing to the file. :param filename: pathlib.Path object, or string pointing to the file.
@ -151,7 +151,8 @@ def extract_trades_of_period(dataframe: pd.DataFrame, trades: pd.DataFrame) -> p
return trades return trades
def combine_tickers_with_mean(tickers: Dict[str, pd.DataFrame], column: str = "close"): def combine_tickers_with_mean(tickers: Dict[str, pd.DataFrame],
column: str = "close") -> pd.DataFrame:
""" """
Combine multiple dataframes "column" Combine multiple dataframes "column"
:param tickers: Dict of Dataframes, dict key should be pair. :param tickers: Dict of Dataframes, dict key should be pair.

View File

@ -86,7 +86,7 @@ def load_tickerdata_file(datadir: Path, pair: str, timeframe: str,
def store_tickerdata_file(datadir: Path, pair: str, def store_tickerdata_file(datadir: Path, pair: str,
timeframe: str, data: list, is_zip: bool = False): timeframe: str, data: list, is_zip: bool = False) -> None:
""" """
Stores tickerdata to file Stores tickerdata to file
""" """
@ -109,7 +109,7 @@ def load_trades_file(datadir: Path, pair: str,
def store_trades_file(datadir: Path, pair: str, def store_trades_file(datadir: Path, pair: str,
data: list, is_zip: bool = True): data: list, is_zip: bool = True) -> None:
""" """
Stores tickerdata to file Stores tickerdata to file
""" """
@ -117,7 +117,7 @@ def store_trades_file(datadir: Path, pair: str,
misc.file_dump_json(filename, data, is_zip=is_zip) misc.file_dump_json(filename, data, is_zip=is_zip)
def _validate_pairdata(pair, pairdata, timerange: TimeRange): def _validate_pairdata(pair: str, pairdata: List[Dict], timerange: TimeRange) -> None:
if timerange.starttype == 'date' and pairdata[0][0] > timerange.startts * 1000: if timerange.starttype == 'date' and pairdata[0][0] > timerange.startts * 1000:
logger.warning('Missing data at start for pair %s, data starts at %s', logger.warning('Missing data at start for pair %s, data starts at %s',
pair, arrow.get(pairdata[0][0] // 1000).strftime('%Y-%m-%d %H:%M:%S')) pair, arrow.get(pairdata[0][0] // 1000).strftime('%Y-%m-%d %H:%M:%S'))
@ -331,7 +331,7 @@ def _download_pair_history(datadir: Path,
def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes: List[str], def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes: List[str],
datadir: Path, timerange: Optional[TimeRange] = None, datadir: Path, timerange: Optional[TimeRange] = None,
erase=False) -> List[str]: erase: bool = False) -> List[str]:
""" """
Refresh stored ohlcv data for backtesting and hyperopt operations. Refresh stored ohlcv data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand. Used by freqtrade download-data subcommand.
@ -401,7 +401,7 @@ def _download_trades_history(datadir: Path,
def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir: Path, def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir: Path,
timerange: TimeRange, erase=False) -> List[str]: timerange: TimeRange, erase: bool = False) -> List[str]:
""" """
Refresh stored trades data for backtesting and hyperopt operations. Refresh stored trades data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand. Used by freqtrade download-data subcommand.
@ -428,7 +428,7 @@ def refresh_backtest_trades_data(exchange: Exchange, pairs: List[str], datadir:
def convert_trades_to_ohlcv(pairs: List[str], timeframes: List[str], def convert_trades_to_ohlcv(pairs: List[str], timeframes: List[str],
datadir: Path, timerange: TimeRange, erase=False) -> None: datadir: Path, timerange: TimeRange, erase: bool = False) -> None:
""" """
Convert stored trades data to ohlcv data Convert stored trades data to ohlcv data
""" """

View File

@ -1,7 +1,7 @@
# pragma pylint: disable=W0603 # pragma pylint: disable=W0603
""" Edge positioning package """ """ Edge positioning package """
import logging import logging
from typing import Any, Dict, NamedTuple from typing import Any, Dict, List, NamedTuple
import arrow import arrow
import numpy as np import numpy as np
@ -181,7 +181,7 @@ class Edge:
'strategy stoploss is returned instead.') 'strategy stoploss is returned instead.')
return self.strategy.stoploss return self.strategy.stoploss
def adjust(self, pairs) -> list: def adjust(self, pairs: List[str]) -> list:
""" """
Filters out and sorts "pairs" according to Edge calculated pairs Filters out and sorts "pairs" according to Edge calculated pairs
""" """

View File

@ -24,6 +24,12 @@ from freqtrade.exceptions import (DependencyException, InvalidOrderException,
from freqtrade.exchange.common import BAD_EXCHANGES, retrier, retrier_async from freqtrade.exchange.common import BAD_EXCHANGES, retrier, retrier_async
from freqtrade.misc import deep_merge_dicts from freqtrade.misc import deep_merge_dicts
# Should probably use typing.Literal when we switch to python 3.8+
# CcxtModuleType = Literal[ccxt, ccxt_async]
CcxtModuleType = Any
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -51,7 +57,7 @@ class Exchange:
} }
_ft_has: Dict = {} _ft_has: Dict = {}
def __init__(self, config: dict, validate: bool = True) -> None: def __init__(self, config: Dict[str, Any], validate: bool = True) -> None:
""" """
Initializes this module with the given config, Initializes this module with the given config,
it does basic validation whether the specified exchange and pairs are valid. it does basic validation whether the specified exchange and pairs are valid.
@ -135,7 +141,7 @@ class Exchange:
if self._api_async and inspect.iscoroutinefunction(self._api_async.close): if self._api_async and inspect.iscoroutinefunction(self._api_async.close):
asyncio.get_event_loop().run_until_complete(self._api_async.close()) asyncio.get_event_loop().run_until_complete(self._api_async.close())
def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt, def _init_ccxt(self, exchange_config: Dict[str, Any], ccxt_module: CcxtModuleType = ccxt,
ccxt_kwargs: dict = None) -> ccxt.Exchange: ccxt_kwargs: dict = None) -> ccxt.Exchange:
""" """
Initialize ccxt with given config and return valid Initialize ccxt with given config and return valid
@ -224,13 +230,13 @@ class Exchange:
markets = self.markets markets = self.markets
return sorted(set([x['quote'] for _, x in markets.items()])) return sorted(set([x['quote'] for _, x in markets.items()]))
def klines(self, pair_interval: Tuple[str, str], copy=True) -> DataFrame: def klines(self, pair_interval: Tuple[str, str], copy: bool = True) -> DataFrame:
if pair_interval in self._klines: if pair_interval in self._klines:
return self._klines[pair_interval].copy() if copy else self._klines[pair_interval] return self._klines[pair_interval].copy() if copy else self._klines[pair_interval]
else: else:
return DataFrame() return DataFrame()
def set_sandbox(self, api, exchange_config: dict, name: str): def set_sandbox(self, api: ccxt.Exchange, exchange_config: dict, name: str) -> None:
if exchange_config.get('sandbox'): if exchange_config.get('sandbox'):
if api.urls.get('test'): if api.urls.get('test'):
api.urls['api'] = api.urls['test'] api.urls['api'] = api.urls['test']
@ -240,7 +246,7 @@ class Exchange:
"Please check your config.json") "Please check your config.json")
raise OperationalException(f'Exchange {name} does not provide a sandbox api') raise OperationalException(f'Exchange {name} does not provide a sandbox api')
def _load_async_markets(self, reload=False) -> None: def _load_async_markets(self, reload: bool = False) -> None:
try: try:
if self._api_async: if self._api_async:
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
@ -273,7 +279,7 @@ class Exchange:
except ccxt.BaseError: except ccxt.BaseError:
logger.exception("Could not reload markets.") logger.exception("Could not reload markets.")
def validate_stakecurrency(self, stake_currency) -> None: def validate_stakecurrency(self, stake_currency: str) -> None:
""" """
Checks stake-currency against available currencies on the exchange. Checks stake-currency against available currencies on the exchange.
:param stake_currency: Stake-currency to validate :param stake_currency: Stake-currency to validate
@ -319,7 +325,7 @@ class Exchange:
f"Please check if you are impacted by this restriction " f"Please check if you are impacted by this restriction "
f"on the exchange and eventually remove {pair} from your whitelist.") f"on the exchange and eventually remove {pair} from your whitelist.")
def get_valid_pair_combination(self, curr_1, curr_2) -> str: def get_valid_pair_combination(self, curr_1: str, curr_2: str) -> str:
""" """
Get valid pair combination of curr_1 and curr_2 by trying both combinations. Get valid pair combination of curr_1 and curr_2 by trying both combinations.
""" """
@ -373,7 +379,7 @@ class Exchange:
raise OperationalException( raise OperationalException(
f'Time in force policies are not supported for {self.name} yet.') f'Time in force policies are not supported for {self.name} yet.')
def validate_required_startup_candles(self, startup_candles) -> None: def validate_required_startup_candles(self, startup_candles: int) -> None:
""" """
Checks if required startup_candles is more than ohlcv_candle_limit. Checks if required startup_candles is more than ohlcv_candle_limit.
Requires a grace-period of 5 candles - so a startup-period up to 494 is allowed by default. Requires a grace-period of 5 candles - so a startup-period up to 494 is allowed by default.
@ -392,7 +398,7 @@ class Exchange:
""" """
return endpoint in self._api.has and self._api.has[endpoint] return endpoint in self._api.has and self._api.has[endpoint]
def amount_to_precision(self, pair, amount: float) -> float: def amount_to_precision(self, pair: str, amount: float) -> float:
''' '''
Returns the amount to buy or sell to a precision the Exchange accepts Returns the amount to buy or sell to a precision the Exchange accepts
Reimplementation of ccxt internal methods - ensuring we can test the result is correct Reimplementation of ccxt internal methods - ensuring we can test the result is correct
@ -406,7 +412,7 @@ class Exchange:
return amount return amount
def price_to_precision(self, pair, price: float) -> float: def price_to_precision(self, pair: str, price: float) -> float:
''' '''
Returns the price rounded up to the precision the Exchange accepts. Returns the price rounded up to the precision the Exchange accepts.
Partial Reimplementation of ccxt internal method decimal_to_precision(), Partial Reimplementation of ccxt internal method decimal_to_precision(),
@ -494,7 +500,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
def buy(self, pair: str, ordertype: str, amount: float, def buy(self, pair: str, ordertype: str, amount: float,
rate: float, time_in_force) -> Dict: rate: float, time_in_force: str) -> Dict:
if self._config['dry_run']: if self._config['dry_run']:
dry_order = self.dry_run_order(pair, ordertype, "buy", amount, rate) dry_order = self.dry_run_order(pair, ordertype, "buy", amount, rate)
@ -507,7 +513,7 @@ class Exchange:
return self.create_order(pair, ordertype, 'buy', amount, rate, params) return self.create_order(pair, ordertype, 'buy', amount, rate, params)
def sell(self, pair: str, ordertype: str, amount: float, def sell(self, pair: str, ordertype: str, amount: float,
rate: float, time_in_force='gtc') -> Dict: rate: float, time_in_force: str = 'gtc') -> Dict:
if self._config['dry_run']: if self._config['dry_run']:
dry_order = self.dry_run_order(pair, ordertype, "sell", amount, rate) dry_order = self.dry_run_order(pair, ordertype, "sell", amount, rate)
@ -976,8 +982,8 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier @retrier
def get_fee(self, symbol, type='', side='', amount=1, def get_fee(self, symbol: str, type: str = '', side: str = '', amount: float = 1,
price=1, taker_or_maker='maker') -> float: price: float = 1, taker_or_maker: str = 'maker') -> float:
try: try:
# validate that markets are loaded before trying to get fee # validate that markets are loaded before trying to get fee
if self._api.markets is None or len(self._api.markets) == 0: if self._api.markets is None or len(self._api.markets) == 0:
@ -1000,7 +1006,7 @@ def get_exchange_bad_reason(exchange_name: str) -> str:
return BAD_EXCHANGES.get(exchange_name, "") return BAD_EXCHANGES.get(exchange_name, "")
def is_exchange_known_ccxt(exchange_name: str, ccxt_module=None) -> bool: def is_exchange_known_ccxt(exchange_name: str, ccxt_module: CcxtModuleType = None) -> bool:
return exchange_name in ccxt_exchanges(ccxt_module) return exchange_name in ccxt_exchanges(ccxt_module)
@ -1008,14 +1014,14 @@ def is_exchange_officially_supported(exchange_name: str) -> bool:
return exchange_name in ['bittrex', 'binance'] return exchange_name in ['bittrex', 'binance']
def ccxt_exchanges(ccxt_module=None) -> List[str]: def ccxt_exchanges(ccxt_module: CcxtModuleType = None) -> List[str]:
""" """
Return the list of all exchanges known to ccxt Return the list of all exchanges known to ccxt
""" """
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
def available_exchanges(ccxt_module=None) -> List[str]: def available_exchanges(ccxt_module: CcxtModuleType = None) -> List[str]:
""" """
Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list
""" """
@ -1075,7 +1081,8 @@ def timeframe_to_next_date(timeframe: str, date: datetime = None) -> datetime:
return datetime.fromtimestamp(new_timestamp, tz=timezone.utc) return datetime.fromtimestamp(new_timestamp, tz=timezone.utc)
def symbol_is_pair(market_symbol: str, base_currency: str = None, quote_currency: str = None): def symbol_is_pair(market_symbol: str, base_currency: str = None,
quote_currency: str = None) -> bool:
""" """
Check if the market symbol is a pair, i.e. that its symbol consists of the base currency and the Check if the market symbol is a pair, i.e. that its symbol consists of the base currency and the
quote currency separated by '/' character. If base_currency and/or quote_currency is passed, quote currency separated by '/' character. If base_currency and/or quote_currency is passed,
@ -1088,7 +1095,7 @@ def symbol_is_pair(market_symbol: str, base_currency: str = None, quote_currency
(symbol_parts[1] == quote_currency if quote_currency else len(symbol_parts[1]) > 0)) (symbol_parts[1] == quote_currency if quote_currency else len(symbol_parts[1]) > 0))
def market_is_active(market): def market_is_active(market: Dict) -> bool:
""" """
Return True if the market is active. Return True if the market is active.
""" """

View File

@ -265,7 +265,7 @@ class FreqtradeBot:
return used_rate return used_rate
def get_trade_stake_amount(self, pair) -> float: def get_trade_stake_amount(self, pair: str) -> float:
""" """
Calculate stake amount for the trade Calculate stake amount for the trade
:return: float: Stake amount :return: float: Stake amount
@ -539,7 +539,7 @@ class FreqtradeBot:
return True return True
def _notify_buy(self, trade: Trade, order_type: str): def _notify_buy(self, trade: Trade, order_type: str) -> None:
""" """
Sends rpc notification when a buy occured. Sends rpc notification when a buy occured.
""" """
@ -735,7 +735,7 @@ class FreqtradeBot:
return False return False
def handle_trailing_stoploss_on_exchange(self, trade: Trade, order): def handle_trailing_stoploss_on_exchange(self, trade: Trade, order: dict) -> None:
""" """
Check to see if stoploss on exchange should be updated Check to see if stoploss on exchange should be updated
in case of trailing stoploss on exchange in case of trailing stoploss on exchange
@ -758,10 +758,8 @@ class FreqtradeBot:
f"for pair {trade.pair}") f"for pair {trade.pair}")
# Create new stoploss order # Create new stoploss order
if self.create_stoploss_order(trade=trade, stop_price=trade.stop_loss, if not self.create_stoploss_order(trade=trade, stop_price=trade.stop_loss,
rate=trade.stop_loss): rate=trade.stop_loss):
return False
else:
logger.warning(f"Could not create trailing stoploss order " logger.warning(f"Could not create trailing stoploss order "
f"for pair {trade.pair}.") f"for pair {trade.pair}.")
@ -990,7 +988,7 @@ class FreqtradeBot:
self._notify_sell(trade, order_type) self._notify_sell(trade, order_type)
def _notify_sell(self, trade: Trade, order_type: str): def _notify_sell(self, trade: Trade, order_type: str) -> None:
""" """
Sends rpc notification when a sell occured. Sends rpc notification when a sell occured.
""" """
@ -1031,7 +1029,7 @@ class FreqtradeBot:
# Common update trade state methods # Common update trade state methods
# #
def update_trade_state(self, trade, action_order: dict = None): def update_trade_state(self, trade: Trade, action_order: dict = None) -> None:
""" """
Checks trades with open orders and updates the amount if necessary Checks trades with open orders and updates the amount if necessary
""" """

View File

@ -6,6 +6,7 @@ import logging
import re import re
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any
from typing.io import IO from typing.io import IO
import numpy as np import numpy as np
@ -40,7 +41,7 @@ def datesarray_to_datetimearray(dates: np.ndarray) -> np.ndarray:
return dates.dt.to_pydatetime() return dates.dt.to_pydatetime()
def file_dump_json(filename: Path, data, is_zip=False) -> None: def file_dump_json(filename: Path, data: Any, is_zip: bool = False) -> None:
""" """
Dump JSON data into a file Dump JSON data into a file
:param filename: file to create :param filename: file to create
@ -61,7 +62,7 @@ def file_dump_json(filename: Path, data, is_zip=False) -> None:
logger.debug(f'done json to "{filename}"') logger.debug(f'done json to "{filename}"')
def json_load(datafile: IO): def json_load(datafile: IO) -> Any:
""" """
load data with rapidjson load data with rapidjson
Use this to have a consistent experience, Use this to have a consistent experience,
@ -125,11 +126,11 @@ def round_dict(d, n):
return {k: (round(v, n) if isinstance(v, float) else v) for k, v in d.items()} return {k: (round(v, n) if isinstance(v, float) else v) for k, v in d.items()}
def plural(num, singular: str, plural: str = None) -> str: def plural(num: float, singular: str, plural: str = None) -> str:
return singular if (num == 1 or num == -1) else plural or singular + 's' return singular if (num == 1 or num == -1) else plural or singular + 's'
def render_template(templatefile: str, arguments: dict = {}): def render_template(templatefile: str, arguments: dict = {}) -> str:
from jinja2 import Environment, PackageLoader, select_autoescape from jinja2 import Environment, PackageLoader, select_autoescape

View File

@ -9,6 +9,7 @@ from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional from typing import Any, Dict, List, NamedTuple, Optional
import arrow
from pandas import DataFrame from pandas import DataFrame
from freqtrade.configuration import (TimeRange, remove_credentials, from freqtrade.configuration import (TimeRange, remove_credentials,
@ -24,7 +25,7 @@ from freqtrade.optimize.optimize_reports import (
from freqtrade.persistence import Trade from freqtrade.persistence import Trade
from freqtrade.resolvers import ExchangeResolver, StrategyResolver from freqtrade.resolvers import ExchangeResolver, StrategyResolver
from freqtrade.state import RunMode from freqtrade.state import RunMode
from freqtrade.strategy.interface import IStrategy, SellType from freqtrade.strategy.interface import IStrategy, SellCheckTuple, SellType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -148,7 +149,7 @@ class Backtesting:
logger.info(f'Dumping backtest results to {recordfilename}') logger.info(f'Dumping backtest results to {recordfilename}')
file_dump_json(recordfilename, records) file_dump_json(recordfilename, records)
def _get_ticker_list(self, processed) -> Dict[str, DataFrame]: def _get_ticker_list(self, processed: Dict) -> Dict[str, DataFrame]:
""" """
Helper function to convert a processed tickerlist into a list for performance reasons. Helper function to convert a processed tickerlist into a list for performance reasons.
@ -175,7 +176,8 @@ class Backtesting:
ticker[pair] = [x for x in ticker_data.itertuples()] ticker[pair] = [x for x in ticker_data.itertuples()]
return ticker return ticker
def _get_close_rate(self, sell_row, trade: Trade, sell, trade_dur) -> float: def _get_close_rate(self, sell_row, trade: Trade, sell: SellCheckTuple,
trade_dur: int) -> float:
""" """
Get close rate for backtesting result Get close rate for backtesting result
""" """
@ -280,7 +282,7 @@ class Backtesting:
return None return None
def backtest(self, processed: Dict, stake_amount: float, def backtest(self, processed: Dict, stake_amount: float,
start_date, end_date, start_date: arrow.Arrow, end_date: arrow.Arrow,
max_open_trades: int = 0, position_stacking: bool = False) -> DataFrame: max_open_trades: int = 0, position_stacking: bool = False) -> DataFrame:
""" """
Implement backtesting functionality Implement backtesting functionality

View File

@ -117,11 +117,11 @@ class Hyperopt:
self.print_json = self.config.get('print_json', False) self.print_json = self.config.get('print_json', False)
@staticmethod @staticmethod
def get_lock_filename(config) -> str: def get_lock_filename(config: Dict[str, Any]) -> str:
return str(config['user_data_dir'] / 'hyperopt.lock') return str(config['user_data_dir'] / 'hyperopt.lock')
def clean_hyperopt(self): def clean_hyperopt(self) -> None:
""" """
Remove hyperopt pickle files to restart hyperopt. Remove hyperopt pickle files to restart hyperopt.
""" """
@ -158,7 +158,7 @@ class Hyperopt:
f"saved to '{self.trials_file}'.") f"saved to '{self.trials_file}'.")
@staticmethod @staticmethod
def _read_trials(trials_file) -> List: def _read_trials(trials_file: Path) -> List:
""" """
Read hyperopt trials file Read hyperopt trials file
""" """
@ -189,7 +189,7 @@ class Hyperopt:
return result return result
@staticmethod @staticmethod
def print_epoch_details(results, total_epochs, print_json: bool, def print_epoch_details(results, total_epochs: int, print_json: bool,
no_header: bool = False, header_str: str = None) -> None: no_header: bool = False, header_str: str = None) -> None:
""" """
Display details of the hyperopt result Display details of the hyperopt result
@ -218,7 +218,7 @@ class Hyperopt:
Hyperopt._params_pretty_print(params, 'trailing', "Trailing stop:") Hyperopt._params_pretty_print(params, 'trailing', "Trailing stop:")
@staticmethod @staticmethod
def _params_update_for_json(result_dict, params, space: str): def _params_update_for_json(result_dict, params, space: str) -> None:
if space in params: if space in params:
space_params = Hyperopt._space_params(params, space) space_params = Hyperopt._space_params(params, space)
if space in ['buy', 'sell']: if space in ['buy', 'sell']:
@ -235,7 +235,7 @@ class Hyperopt:
result_dict.update(space_params) result_dict.update(space_params)
@staticmethod @staticmethod
def _params_pretty_print(params, space: str, header: str): def _params_pretty_print(params, space: str, header: str) -> None:
if space in params: if space in params:
space_params = Hyperopt._space_params(params, space, 5) space_params = Hyperopt._space_params(params, space, 5)
if space == 'stoploss': if space == 'stoploss':
@ -251,7 +251,7 @@ class Hyperopt:
return round_dict(d, r) if r else d return round_dict(d, r) if r else d
@staticmethod @staticmethod
def is_best_loss(results, current_best_loss) -> bool: def is_best_loss(results, current_best_loss: float) -> bool:
return results['loss'] < current_best_loss return results['loss'] < current_best_loss
def print_results(self, results) -> None: def print_results(self, results) -> None:
@ -438,7 +438,7 @@ class Hyperopt:
random_state=self.random_state, random_state=self.random_state,
) )
def fix_optimizer_models_list(self): def fix_optimizer_models_list(self) -> None:
""" """
WORKAROUND: Since skopt is not actively supported, this resolves problems with skopt WORKAROUND: Since skopt is not actively supported, this resolves problems with skopt
memory usage, see also: https://github.com/scikit-optimize/scikit-optimize/pull/746 memory usage, see also: https://github.com/scikit-optimize/scikit-optimize/pull/746
@ -460,7 +460,7 @@ class Hyperopt:
wrap_non_picklable_objects(self.generate_optimizer))(v, i) for v in asked) wrap_non_picklable_objects(self.generate_optimizer))(v, i) for v in asked)
@staticmethod @staticmethod
def load_previous_results(trials_file) -> List: def load_previous_results(trials_file: Path) -> List:
""" """
Load data for epochs from the file if we have one Load data for epochs from the file if we have one
""" """

View File

@ -7,7 +7,7 @@ Provides lists as configured in config.json
import logging import logging
from abc import ABC, abstractmethod, abstractproperty from abc import ABC, abstractmethod, abstractproperty
from copy import deepcopy from copy import deepcopy
from typing import Dict, List from typing import Any, Dict, List
from freqtrade.exchange import market_is_active from freqtrade.exchange import market_is_active
@ -16,7 +16,8 @@ logger = logging.getLogger(__name__)
class IPairList(ABC): class IPairList(ABC):
def __init__(self, exchange, pairlistmanager, config, pairlistconfig: dict, def __init__(self, exchange, pairlistmanager,
config: Dict[str, Any], pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None: pairlist_pos: int) -> None:
""" """
:param exchange: Exchange instance :param exchange: Exchange instance

View File

@ -48,10 +48,10 @@ class PrecisionFilter(IPairList):
""" """
Filters and sorts pairlists and assigns and returns them again. Filters and sorts pairlists and assigns and returns them again.
""" """
stoploss = None stoploss = self._config.get('stoploss')
if self._config.get('stoploss') is not None: if stoploss is not None:
# Precalculate sanitized stoploss value to avoid recalculation for every pair # Precalculate sanitized stoploss value to avoid recalculation for every pair
stoploss = 1 - abs(self._config.get('stoploss')) stoploss = 1 - abs(stoploss)
# Copy list since we're modifying this list # Copy list since we're modifying this list
for p in deepcopy(pairlist): for p in deepcopy(pairlist):
ticker = tickers.get(p) ticker = tickers.get(p)

View File

@ -1,6 +1,6 @@
import logging import logging
from copy import deepcopy from copy import deepcopy
from typing import Dict, List from typing import Any, Dict, List
from freqtrade.pairlist.IPairList import IPairList from freqtrade.pairlist.IPairList import IPairList
@ -9,7 +9,8 @@ logger = logging.getLogger(__name__)
class PriceFilter(IPairList): class PriceFilter(IPairList):
def __init__(self, exchange, pairlistmanager, config, pairlistconfig: dict, def __init__(self, exchange, pairlistmanager,
config: Dict[str, Any], pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None: pairlist_pos: int) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos) super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)

View File

@ -6,7 +6,7 @@ Provides lists as configured in config.json
""" """
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Dict, List from typing import Any, Dict, List
from freqtrade.exceptions import OperationalException from freqtrade.exceptions import OperationalException
from freqtrade.pairlist.IPairList import IPairList from freqtrade.pairlist.IPairList import IPairList
@ -18,7 +18,7 @@ SORT_VALUES = ['askVolume', 'bidVolume', 'quoteVolume']
class VolumePairList(IPairList): class VolumePairList(IPairList):
def __init__(self, exchange, pairlistmanager, config, pairlistconfig: dict, def __init__(self, exchange, pairlistmanager, config: Dict[str, Any], pairlistconfig: dict,
pairlist_pos: int) -> None: pairlist_pos: int) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos) super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
@ -77,7 +77,8 @@ class VolumePairList(IPairList):
else: else:
return pairlist return pairlist
def _gen_pair_whitelist(self, pairlist, tickers, base_currency: str, key: str) -> List[str]: def _gen_pair_whitelist(self, pairlist: List[str], tickers: Dict,
base_currency: str, key: str) -> List[str]:
""" """
Updates the whitelist with with a dynamically generated list Updates the whitelist with with a dynamically generated list
:param base_currency: base currency as str :param base_currency: base currency as str

View File

@ -64,11 +64,11 @@ def init(db_url: str, clean_open_orders: bool = False) -> None:
clean_dry_run_db() clean_dry_run_db()
def has_column(columns, searchname: str) -> bool: def has_column(columns: List, searchname: str) -> bool:
return len(list(filter(lambda x: x["name"] == searchname, columns))) == 1 return len(list(filter(lambda x: x["name"] == searchname, columns))) == 1
def get_column_def(columns, column: str, default: str) -> str: def get_column_def(columns: List, column: str, default: str) -> str:
return default if not has_column(columns, column) else column return default if not has_column(columns, column) else column
@ -246,14 +246,15 @@ class Trade(_DECL_BASE):
if self.initial_stop_loss_pct else None), if self.initial_stop_loss_pct else None),
} }
def adjust_min_max_rates(self, current_price: float): def adjust_min_max_rates(self, current_price: float) -> None:
""" """
Adjust the max_rate and min_rate. Adjust the max_rate and min_rate.
""" """
self.max_rate = max(current_price, self.max_rate or self.open_rate) self.max_rate = max(current_price, self.max_rate or self.open_rate)
self.min_rate = min(current_price, self.min_rate or self.open_rate) self.min_rate = min(current_price, self.min_rate or self.open_rate)
def adjust_stop_loss(self, current_price: float, stoploss: float, initial: bool = False): def adjust_stop_loss(self, current_price: float, stoploss: float,
initial: bool = False) -> None:
""" """
This adjusts the stop loss to it's most recently observed setting This adjusts the stop loss to it's most recently observed setting
:param current_price: Current rate the asset is traded :param current_price: Current rate the asset is traded

View File

@ -370,7 +370,7 @@ def generate_profit_graph(pairs: str, tickers: Dict[str, pd.DataFrame],
return fig return fig
def generate_plot_filename(pair, timeframe) -> str: def generate_plot_filename(pair: str, timeframe: str) -> str:
""" """
Generate filenames per pair/timeframe to be used for storing plots Generate filenames per pair/timeframe to be used for storing plots
""" """

View File

@ -25,7 +25,7 @@ class IResolver:
initial_search_path: Path initial_search_path: Path
@classmethod @classmethod
def build_search_paths(cls, config, user_subdir: Optional[str] = None, def build_search_paths(cls, config: Dict[str, Any], user_subdir: Optional[str] = None,
extra_dir: Optional[str] = None) -> List[Path]: extra_dir: Optional[str] = None) -> List[Path]:
abs_paths: List[Path] = [cls.initial_search_path] abs_paths: List[Path] = [cls.initial_search_path]

View File

@ -9,7 +9,7 @@ from base64 import urlsafe_b64decode
from collections import OrderedDict from collections import OrderedDict
from inspect import getfullargspec from inspect import getfullargspec
from pathlib import Path from pathlib import Path
from typing import Dict, Optional from typing import Any, Dict, Optional
from freqtrade.constants import (REQUIRED_ORDERTIF, REQUIRED_ORDERTYPES, from freqtrade.constants import (REQUIRED_ORDERTIF, REQUIRED_ORDERTYPES,
USERPATH_STRATEGY) USERPATH_STRATEGY)
@ -30,7 +30,7 @@ class StrategyResolver(IResolver):
initial_search_path = Path(__file__).parent.parent.joinpath('strategy').resolve() initial_search_path = Path(__file__).parent.parent.joinpath('strategy').resolve()
@staticmethod @staticmethod
def load_strategy(config: Optional[Dict] = None) -> IStrategy: def load_strategy(config: Dict[str, Any] = None) -> IStrategy:
""" """
Load the custom class from config parameter Load the custom class from config parameter
:param config: configuration dictionary or None :param config: configuration dictionary or None
@ -96,7 +96,8 @@ class StrategyResolver(IResolver):
return strategy return strategy
@staticmethod @staticmethod
def _override_attribute_helper(strategy, config, attribute: str, default): def _override_attribute_helper(strategy, config: Dict[str, Any],
attribute: str, default: Any):
""" """
Override attributes in the strategy. Override attributes in the strategy.
Prevalence: Prevalence:

View File

@ -139,7 +139,8 @@ class RPC:
results.append(trade_dict) results.append(trade_dict)
return results return results
def _rpc_status_table(self, stake_currency, fiat_display_currency: str) -> Tuple[List, List]: def _rpc_status_table(self, stake_currency: str,
fiat_display_currency: str) -> Tuple[List, List]:
trades = Trade.get_open_trades() trades = Trade.get_open_trades()
if not trades: if not trades:
raise RPCException('no active trade') raise RPCException('no active trade')
@ -385,7 +386,7 @@ class RPC:
return {'status': 'No more buy will occur from now. Run /reload_conf to reset.'} return {'status': 'No more buy will occur from now. Run /reload_conf to reset.'}
def _rpc_forcesell(self, trade_id) -> Dict[str, str]: def _rpc_forcesell(self, trade_id: str) -> Dict[str, str]:
""" """
Handler for forcesell <id>. Handler for forcesell <id>.
Sells the given trade at current price Sells the given trade at current price

View File

@ -61,7 +61,7 @@ class RPCManager:
except NotImplementedError: except NotImplementedError:
logger.error(f"Message type {msg['type']} not implemented by handler {mod.name}.") logger.error(f"Message type {msg['type']} not implemented by handler {mod.name}.")
def startup_messages(self, config, pairlist) -> None: def startup_messages(self, config: Dict[str, Any], pairlist) -> None:
if config['dry_run']: if config['dry_run']:
self.send_msg({ self.send_msg({
'type': RPCMessageType.WARNING_NOTIFICATION, 'type': RPCMessageType.WARNING_NOTIFICATION,

View File

@ -180,7 +180,7 @@ class IStrategy(ABC):
if pair not in self._pair_locked_until or self._pair_locked_until[pair] < until: if pair not in self._pair_locked_until or self._pair_locked_until[pair] < until:
self._pair_locked_until[pair] = until self._pair_locked_until[pair] = until
def unlock_pair(self, pair) -> None: def unlock_pair(self, pair: str) -> None:
""" """
Unlocks a pair previously locked using lock_pair. Unlocks a pair previously locked using lock_pair.
Not used by freqtrade itself, but intended to be used if users lock pairs Not used by freqtrade itself, but intended to be used if users lock pairs

View File

@ -30,24 +30,21 @@ class Wallets:
self._last_wallet_refresh = 0 self._last_wallet_refresh = 0
self.update() self.update()
def get_free(self, currency) -> float: def get_free(self, currency: str) -> float:
balance = self._wallets.get(currency) balance = self._wallets.get(currency)
if balance and balance.free: if balance and balance.free:
return balance.free return balance.free
else: else:
return 0 return 0
def get_used(self, currency) -> float: def get_used(self, currency: str) -> float:
balance = self._wallets.get(currency) balance = self._wallets.get(currency)
if balance and balance.used: if balance and balance.used:
return balance.used return balance.used
else: else:
return 0 return 0
def get_total(self, currency) -> float: def get_total(self, currency: str) -> float:
balance = self._wallets.get(currency) balance = self._wallets.get(currency)
if balance and balance.total: if balance and balance.total:
return balance.total return balance.total
@ -87,7 +84,6 @@ class Wallets:
self._wallets = _wallets self._wallets = _wallets
def _update_live(self) -> None: def _update_live(self) -> None:
balances = self._exchange.get_balances() balances = self._exchange.get_balances()
for currency in balances: for currency in balances:

View File

@ -22,7 +22,7 @@ class Worker:
Freqtradebot worker class Freqtradebot worker class
""" """
def __init__(self, args: Dict[str, Any], config=None) -> None: def __init__(self, args: Dict[str, Any], config: Dict[str, Any] = None) -> None:
""" """
Init all variables and objects the bot needs to work Init all variables and objects the bot needs to work
""" """