pylint fixes

This commit is contained in:
gcarq 2017-09-08 23:10:22 +02:00
parent 689cd11a6c
commit 996beae770
9 changed files with 133 additions and 121 deletions

View File

@ -145,6 +145,7 @@ def plot_dataframe(dataframe: DataFrame, pair: str) -> None:
if __name__ == '__main__':
# Install PYQT5==5.9 manually if you want to test this helper function
while True:
pair = 'BTC_ANT'
#for pair in ['BTC_ANT', 'BTC_ETH', 'BTC_GNT', 'BTC_ETC']:

View File

@ -7,10 +7,10 @@ from poloniex import Poloniex
logger = logging.getLogger(__name__)
cur_exchange = None
_api = None
_conf = {}
# Current selected exchange
EXCHANGE = None
_API = None
_CONF = {}
class Exchange(enum.Enum):
@ -26,9 +26,9 @@ def init(config: dict) -> None:
:param config: config to use
:return: None
"""
global _api, cur_exchange
global _API, EXCHANGE
_conf.update(config)
_CONF.update(config)
if config['dry_run']:
logger.info('Instance is running with dry_run enabled')
@ -37,17 +37,17 @@ def init(config: dict) -> None:
use_bittrex = config.get('bittrex', {}).get('enabled', False)
if use_poloniex:
cur_exchange = Exchange.POLONIEX
_api = Poloniex(key=config['poloniex']['key'], secret=config['poloniex']['secret'])
EXCHANGE = Exchange.POLONIEX
_API = Poloniex(key=config['poloniex']['key'], secret=config['poloniex']['secret'])
elif use_bittrex:
cur_exchange = Exchange.BITTREX
_api = Bittrex(api_key=config['bittrex']['key'], api_secret=config['bittrex']['secret'])
EXCHANGE = Exchange.BITTREX
_API = Bittrex(api_key=config['bittrex']['key'], api_secret=config['bittrex']['secret'])
else:
raise RuntimeError('No exchange specified. Aborting!')
# Check if all pairs are available
markets = get_markets()
for pair in config[cur_exchange.name.lower()]['pair_whitelist']:
for pair in config[EXCHANGE.name.lower()]['pair_whitelist']:
if pair not in markets:
raise RuntimeError('Pair {} is not available at Poloniex'.format(pair))
@ -60,13 +60,13 @@ def buy(pair: str, rate: float, amount: float) -> str:
:param amount: The amount to purchase
:return: order_id of the placed buy order
"""
if _conf['dry_run']:
if _CONF['dry_run']:
return 'dry_run'
elif cur_exchange == Exchange.POLONIEX:
_api.buy(pair, rate, amount)
elif EXCHANGE == Exchange.POLONIEX:
_API.buy(pair, rate, amount)
# TODO: return order id
elif cur_exchange == Exchange.BITTREX:
data = _api.buy_limit(pair.replace('_', '-'), amount, rate)
elif EXCHANGE == Exchange.BITTREX:
data = _API.buy_limit(pair.replace('_', '-'), amount, rate)
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
return data['result']['uuid']
@ -80,13 +80,13 @@ def sell(pair: str, rate: float, amount: float) -> str:
:param amount: The amount to sell
:return: None
"""
if _conf['dry_run']:
if _CONF['dry_run']:
return 'dry_run'
elif cur_exchange == Exchange.POLONIEX:
_api.sell(pair, rate, amount)
elif EXCHANGE == Exchange.POLONIEX:
_API.sell(pair, rate, amount)
# TODO: return order id
elif cur_exchange == Exchange.BITTREX:
data = _api.sell_limit(pair.replace('_', '-'), amount, rate)
elif EXCHANGE == Exchange.BITTREX:
data = _API.sell_limit(pair.replace('_', '-'), amount, rate)
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
return data['result']['uuid']
@ -98,13 +98,13 @@ def get_balance(currency: str) -> float:
:param currency: currency as str, format: BTC
:return: float
"""
if _conf['dry_run']:
if _CONF['dry_run']:
return 999.9
elif cur_exchange == Exchange.POLONIEX:
data = _api.returnBalances()
elif EXCHANGE == Exchange.POLONIEX:
data = _API.returnBalances()
return float(data[currency])
elif cur_exchange == Exchange.BITTREX:
data = _api.get_balance(currency)
elif EXCHANGE == Exchange.BITTREX:
data = _API.get_balance(currency)
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
return float(data['result']['Balance'] or 0.0)
@ -116,15 +116,15 @@ def get_ticker(pair: str) -> dict:
:param pair: Pair as str, format: BTC_ETC
:return: dict
"""
if cur_exchange == Exchange.POLONIEX:
data = _api.returnTicker()
if EXCHANGE == Exchange.POLONIEX:
data = _API.returnTicker()
return {
'bid': float(data[pair]['highestBid']),
'ask': float(data[pair]['lowestAsk']),
'last': float(data[pair]['last'])
}
elif cur_exchange == Exchange.BITTREX:
data = _api.get_ticker(pair.replace('_', '-'))
elif EXCHANGE == Exchange.BITTREX:
data = _API.get_ticker(pair.replace('_', '-'))
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
return {
@ -140,12 +140,12 @@ def cancel_order(order_id: str) -> None:
:param order_id: id as str
:return: None
"""
if _conf['dry_run']:
if _CONF['dry_run']:
pass
elif cur_exchange == Exchange.POLONIEX:
elif EXCHANGE == Exchange.POLONIEX:
raise NotImplemented('Not implemented')
elif cur_exchange == Exchange.BITTREX:
data = _api.cancel(order_id)
elif EXCHANGE == Exchange.BITTREX:
data = _API.cancel(order_id)
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
@ -156,12 +156,12 @@ def get_open_orders(pair: str) -> List[dict]:
:param pair: Pair as str, format: BTC_ETC
:return: list of dicts
"""
if _conf['dry_run']:
if _CONF['dry_run']:
return []
elif cur_exchange == Exchange.POLONIEX:
elif EXCHANGE == Exchange.POLONIEX:
raise NotImplemented('Not implemented')
elif cur_exchange == Exchange.BITTREX:
data = _api.get_open_orders(pair.replace('_', '-'))
elif EXCHANGE == Exchange.BITTREX:
data = _API.get_open_orders(pair.replace('_', '-'))
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
return [{
@ -180,9 +180,9 @@ def get_pair_detail_url(pair: str) -> str:
:param pair: pair as str, format: BTC_ANT
:return: url as str
"""
if cur_exchange == Exchange.POLONIEX:
if EXCHANGE == Exchange.POLONIEX:
raise NotImplemented('Not implemented')
elif cur_exchange == Exchange.BITTREX:
elif EXCHANGE == Exchange.BITTREX:
return 'https://bittrex.com/Market/Index?MarketName={}'.format(pair.replace('_', '-'))
@ -191,11 +191,11 @@ def get_markets() -> List[str]:
Returns all available markets
:return: list of all available pairs
"""
if cur_exchange == Exchange.POLONIEX:
if EXCHANGE == Exchange.POLONIEX:
# TODO: implement
raise NotImplemented('Not implemented')
elif cur_exchange == Exchange. BITTREX:
data = _api.get_markets()
elif EXCHANGE == Exchange. BITTREX:
data = _API.get_markets()
if not data['success']:
raise RuntimeError('BITTREX: {}'.format(data['message']))
return [m['MarketName'].replace('-', '_') for m in data['result']]

69
main.py
View File

@ -5,19 +5,17 @@ import logging
import time
import traceback
from datetime import datetime
from json import JSONDecodeError
from typing import Optional
from jsonschema import validate
from requests import ConnectionError
from wrapt import synchronized
import exchange
import persistence
from rpc import telegram
from analyze import get_buy_signal
from persistence import Trade
from misc import conf_schema
from analyze import get_buy_signal
from misc import CONF_SCHEMA
from rpc import telegram
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@ -35,8 +33,10 @@ class State(enum.Enum):
TERMINATE = 2
_conf = {}
_cur_state = State.RUNNING
_CONF = {}
# Current application state
_STATE = State.RUNNING
@synchronized
@ -46,8 +46,8 @@ def update_state(state: State) -> None:
:param state: new state
:return: None
"""
global _cur_state
_cur_state = state
global _STATE
_STATE = state
@synchronized
@ -56,7 +56,7 @@ def get_state() -> State:
Gets the current application state
:return:
"""
return _cur_state
return _STATE
def _process() -> None:
@ -67,10 +67,10 @@ def _process() -> None:
"""
# Query trades from persistence layer
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
if len(trades) < _conf['max_open_trades']:
if len(trades) < _CONF['max_open_trades']:
try:
# Create entity and execute trade
trade = create_trade(float(_conf['stake_amount']), exchange.cur_exchange)
trade = create_trade(float(_CONF['stake_amount']), exchange.EXCHANGE)
if trade:
Trade.session.add(trade)
else:
@ -80,14 +80,17 @@ def _process() -> None:
for trade in trades:
if close_trade_if_fulfilled(trade):
logger.info('No open orders found and trade is fulfilled. Marking %s as closed ...', trade)
logger.info(
'No open orders found and trade is fulfilled. Marking %s as closed ...',
trade
)
for trade in filter(lambda t: t.is_open, trades):
# Check if there is already an open order for this trade
orders = exchange.get_open_orders(trade.pair)
orders = [o for o in orders if o['id'] == trade.open_order_id]
if orders:
msg = 'There exists an open order for {}: Order(total={}, remaining={}, type={}, id={})' \
msg = 'There is an open order for {}: Order(total={}, remaining={}, type={}, id={})' \
.format(
trade,
round(orders[0]['amount'], 8),
@ -156,20 +159,20 @@ def handle_trade(trade: Trade) -> None:
current_rate = exchange.get_ticker(trade.pair)['bid']
current_profit = 100.0 * ((current_rate - trade.open_rate) / trade.open_rate)
if 'stoploss' in _conf and current_profit < float(_conf['stoploss']) * 100.0:
if 'stoploss' in _CONF and current_profit < float(_CONF['stoploss']) * 100.0:
logger.debug('Stop loss hit.')
execute_sell(trade, current_rate)
return
for duration, threshold in sorted(_conf['minimal_roi'].items()):
for duration, threshold in sorted(_CONF['minimal_roi'].items()):
duration, threshold = float(duration), float(threshold)
# Check if time matches and current rate is above threshold
time_diff = (datetime.utcnow() - trade.open_date).total_seconds() / 60
if time_diff > duration and current_rate > (1 + threshold) * trade.open_rate:
execute_sell(trade, current_rate)
return
else:
logger.debug('Threshold not reached. (cur_profit: %1.2f%%)', current_profit)
logger.debug('Threshold not reached. (cur_profit: %1.2f%%)', current_profit)
except ValueError:
logger.exception('Unable to handle open order')
@ -182,10 +185,12 @@ def create_trade(stake_amount: float, _exchange: exchange.Exchange) -> Optional[
:param _exchange: exchange to use
"""
logger.info('Creating new trade with stake_amount: %f ...', stake_amount)
whitelist = _conf[_exchange.name.lower()]['pair_whitelist']
whitelist = _CONF[_exchange.name.lower()]['pair_whitelist']
# Check if btc_amount is fulfilled
if exchange.get_balance(_conf['stake_currency']) < stake_amount:
raise ValueError('stake amount is not fulfilled (currency={}'.format(_conf['stake_currency']))
if exchange.get_balance(_CONF['stake_currency']) < stake_amount:
raise ValueError(
'stake amount is not fulfilled (currency={}'.format(_CONF['stake_currency'])
)
# Remove currently opened and latest pairs from whitelist
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
@ -200,9 +205,9 @@ def create_trade(stake_amount: float, _exchange: exchange.Exchange) -> Optional[
raise ValueError('No pair in whitelist')
# Pick pair based on StochRSI buy signals
for p in whitelist:
if get_buy_signal(p):
pair = p
for _pair in whitelist:
if get_buy_signal(_pair):
pair = _pair
break
else:
return None
@ -230,20 +235,17 @@ def create_trade(stake_amount: float, _exchange: exchange.Exchange) -> Optional[
is_open=True)
def init(config: dict, db_url: Optional[str]=None) -> None:
def init(config: dict, db_url: Optional[str] = None) -> None:
"""
Initializes all modules and updates the config
:param config: config as dict
:param db_url: database connector string for sqlalchemy (Optional)
:return: None
"""
global _conf
# Initialize all modules
telegram.init(config)
persistence.init(config, db_url)
exchange.init(config)
_conf.update(config)
def app(config: dict) -> None:
@ -264,12 +266,12 @@ def app(config: dict) -> None:
try:
_process()
Trade.session.flush()
except (ConnectionError, JSONDecodeError, ValueError) as error:
except (ConnectionError, json.JSONDecodeError, ValueError) as error:
msg = 'Got {} during _process()'.format(error.__class__.__name__)
logger.exception(msg)
finally:
time.sleep(25)
except (RuntimeError, JSONDecodeError):
except (RuntimeError, json.JSONDecodeError):
telegram.send_msg(
'*Status:* Got RuntimeError: ```\n{}\n```'.format(traceback.format_exc())
)
@ -280,7 +282,6 @@ def app(config: dict) -> None:
if __name__ == '__main__':
with open('config.json') as file:
conf = json.load(file)
validate(conf, conf_schema)
app(conf)
_CONF = json.load(file)
validate(_CONF, CONF_SCHEMA)
app(_CONF)

View File

@ -1,6 +1,6 @@
# Required json-schema for user specified config
conf_schema = {
CONF_SCHEMA = {
'type': 'object',
'properties': {
'max_open_trades': {'type': 'integer', 'minimum': 1},

View File

@ -11,12 +11,12 @@ from sqlalchemy.types import Enum
import exchange
_conf = {}
_CONF = {}
Base = declarative_base()
def init(config: dict, db_url: Optional[str]=None) -> None:
def init(config: dict, db_url: Optional[str] = None) -> None:
"""
Initializes this module with the given config,
registers all known command handlers
@ -25,9 +25,9 @@ def init(config: dict, db_url: Optional[str]=None) -> None:
:param db_url: database connector string for sqlalchemy (Optional)
:return: None
"""
_conf.update(config)
_CONF.update(config)
if not db_url:
if _conf.get('dry_run', False):
if _CONF.get('dry_run', False):
db_url = 'sqlite:///tradesv2.dry_run.sqlite'
else:
db_url = 'sqlite:///tradesv2.sqlite'
@ -56,12 +56,16 @@ class Trade(Base):
open_order_id = Column(String)
def __repr__(self):
if self.is_open:
open_since = 'closed'
else:
open_since = round((datetime.utcnow() - self.open_date).total_seconds() / 60, 2)
return 'Trade(id={}, pair={}, amount={}, open_rate={}, open_since={})'.format(
self.id,
self.pair,
self.amount,
self.open_rate,
'closed' if not self.is_open else round((datetime.utcnow() - self.open_date).total_seconds() / 60, 2)
open_since
)
def exec_sell_order(self, rate: float, amount: float) -> float:
@ -83,4 +87,3 @@ class Trade(Base):
# Flush changes
Trade.session.flush()
return profit

View File

@ -8,8 +8,8 @@ urllib3==1.22
wrapt==1.10.11
pandas==0.20.3
matplotlib==2.0.2
PYQT5==5.9
scikit-learn==0.19.0
scipy==0.19.1
jsonschema==2.6.0
TA-Lib==0.4.10
TA-Lib==0.4.10
#PYQT5==5.9

View File

@ -18,7 +18,7 @@ logging.getLogger('telegram').setLevel(logging.INFO)
logger = logging.getLogger(__name__)
_updater = None
_conf = {}
_CONF = {}
def init(config: dict) -> None:
@ -32,7 +32,7 @@ def init(config: dict) -> None:
global _updater
_updater = Updater(token=config['telegram']['token'], workers=0)
_conf.update(config)
_CONF.update(config)
# Register command handler and start telegram message polling
handles = [
@ -51,8 +51,10 @@ def init(config: dict) -> None:
timeout=30,
read_latency=60,
)
logger.info('rpc.telegram is listening for following commands: {}'
.format([h.command for h in handles]))
logger.info(
'rpc.telegram is listening for following commands: %s',
[h.command for h in handles]
)
def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]:
@ -62,12 +64,12 @@ def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[
:return: decorated function
"""
def wrapper(*args, **kwargs):
bot, update = (args[0], args[1]) if args else (kwargs['bot'], kwargs['update'])
bot, update = kwargs.get('bot') or args[0], kwargs.get('update') or args[1]
if not isinstance(bot, Bot) or not isinstance(update, Update):
raise ValueError('Received invalid Arguments: {}'.format(*args))
chat_id = int(_conf['telegram']['chat_id'])
chat_id = int(_CONF['telegram']['chat_id'])
if int(update.message.chat_id) == chat_id:
logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id)
return command_handler(*args, **kwargs)
@ -88,7 +90,7 @@ def _status(bot: Bot, update: Update) -> None:
# Fetch open trade
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
from main import get_state, State
if not get_state() == State.RUNNING:
if get_state() != State.RUNNING:
send_msg('*Status:* `trader is not running`', bot=bot)
elif not trades:
send_msg('*Status:* `no active order`', bot=bot)
@ -100,6 +102,10 @@ def _status(bot: Bot, update: Update) -> None:
orders = exchange.get_open_orders(trade.pair)
orders = [o for o in orders if o['id'] == trade.open_order_id]
order = orders[0] if orders else None
fmt_close_profit = '{:.2f}%'.format(
round(trade.close_profit, 2)
) if trade.close_profit else None
message = """
*Trade ID:* `{trade_id}`
*Current Pair:* [{pair}]({market_url})
@ -120,7 +126,7 @@ def _status(bot: Bot, update: Update) -> None:
close_rate=trade.close_rate,
current_rate=current_rate,
amount=round(trade.amount, 8),
close_profit='{}%'.format(round(trade.close_profit, 2)) if trade.close_profit else None,
close_profit=fmt_close_profit,
current_profit=round(current_profit, 2),
open_order='{} ({})'.format(order['remaining'], order['type']) if order else None,
)
@ -297,7 +303,7 @@ def _performance(bot: Bot, update: Update) -> None:
send_msg(message, parse_mode=ParseMode.HTML)
def send_msg(msg: str, bot: Bot=None, parse_mode: ParseMode=ParseMode.MARKDOWN) -> None:
def send_msg(msg: str, bot: Bot = None, parse_mode: ParseMode = ParseMode.MARKDOWN) -> None:
"""
Send given markdown message
:param msg: message
@ -305,15 +311,18 @@ def send_msg(msg: str, bot: Bot=None, parse_mode: ParseMode=ParseMode.MARKDOWN)
:param parse_mode: telegram parse mode
:return: None
"""
if _conf['telegram'].get('enabled', False):
if _CONF['telegram'].get('enabled', False):
try:
bot = bot or _updater.bot
try:
bot.send_message(_conf['telegram']['chat_id'], msg, parse_mode=parse_mode)
bot.send_message(_CONF['telegram']['chat_id'], msg, parse_mode=parse_mode)
except NetworkError as error:
# Sometimes the telegram server resets the current connection,
# if this is the case we send the message again.
logger.warning('Got Telegram NetworkError: %s! Trying one more time.', error.message)
bot.send_message(_conf['telegram']['chat_id'], msg, parse_mode=parse_mode)
logger.warning(
'Got Telegram NetworkError: %s! Trying one more time.',
error.message
)
bot.send_message(_CONF['telegram']['chat_id'], msg, parse_mode=parse_mode)
except Exception:
logger.exception('Exception occurred within Telegram API')

View File

@ -1,12 +1,11 @@
import unittest
from unittest.mock import patch, MagicMock
import os
from jsonschema import validate
import exchange
from main import create_trade, handle_trade, close_trade_if_fulfilled, init
from misc import conf_schema
from misc import CONF_SCHEMA
from persistence import Trade
@ -43,7 +42,7 @@ class TestMain(unittest.TestCase):
}
def test_1_create_trade(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
with patch('main.get_buy_signal', side_effect=lambda _: True) as buy_signal:
with patch.multiple('main.telegram', init=MagicMock(), send_msg=MagicMock()):
with patch.multiple('main.exchange',
@ -68,7 +67,7 @@ class TestMain(unittest.TestCase):
buy_signal.assert_called_once_with('BTC_ETH')
def test_2_handle_trade(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
with patch.multiple('main.telegram', init=MagicMock(), send_msg=MagicMock()):
with patch.multiple('main.exchange',
get_ticker=MagicMock(return_value={
@ -86,7 +85,7 @@ class TestMain(unittest.TestCase):
self.assertEqual(trade.open_order_id, 'dry_run')
def test_3_close_trade(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
trade = Trade.query.filter(Trade.is_open.is_(True)).first()
self.assertTrue(trade)
@ -99,7 +98,7 @@ class TestMain(unittest.TestCase):
@classmethod
def setUpClass(cls):
validate(cls.conf, conf_schema)
validate(cls.conf, CONF_SCHEMA)
if __name__ == '__main__':

View File

@ -1,14 +1,13 @@
import unittest
from datetime import datetime
from unittest.mock import patch, MagicMock
from datetime import datetime
import os
from jsonschema import validate
from telegram import Bot, Update, Message, Chat
import exchange
from main import init, create_trade, update_state, State, get_state
from misc import conf_schema
from misc import CONF_SCHEMA
from persistence import Trade
from rpc.telegram import _status, _profit, _forcesell, _performance, _start, _stop
@ -51,10 +50,10 @@ class TestTelegram(unittest.TestCase):
}
def test_1_status_handle(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
with patch('main.get_buy_signal', side_effect=lambda _: True):
msg_mock = MagicMock()
with patch.multiple('main.telegram', _conf=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.telegram', _CONF=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.exchange',
get_ticker=MagicMock(return_value={
'bid': 0.07256061,
@ -75,10 +74,10 @@ class TestTelegram(unittest.TestCase):
self.assertIn('[BTC_ETH]', msg_mock.call_args_list[-1][0][0])
def test_2_profit_handle(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
with patch('main.get_buy_signal', side_effect=lambda _: True):
msg_mock = MagicMock()
with patch.multiple('main.telegram', _conf=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.telegram', _CONF=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.exchange',
get_ticker=MagicMock(return_value={
'bid': 0.07256061,
@ -104,10 +103,10 @@ class TestTelegram(unittest.TestCase):
self.assertIn('(100.00%)', msg_mock.call_args_list[-1][0][0])
def test_3_forcesell_handle(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
with patch('main.get_buy_signal', side_effect=lambda _: True):
msg_mock = MagicMock()
with patch.multiple('main.telegram', _conf=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.telegram', _CONF=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.exchange',
get_ticker=MagicMock(return_value={
'bid': 0.07256061,
@ -131,10 +130,10 @@ class TestTelegram(unittest.TestCase):
self.assertIn('0.072561', msg_mock.call_args_list[-1][0][0])
def test_4_performance_handle(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
with patch('main.get_buy_signal', side_effect=lambda _: True):
msg_mock = MagicMock()
with patch.multiple('main.telegram', _conf=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.telegram', _CONF=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.exchange',
get_ticker=MagicMock(return_value={
'bid': 0.07256061,
@ -161,9 +160,9 @@ class TestTelegram(unittest.TestCase):
self.assertIn('BTC_ETH 100.00%', msg_mock.call_args_list[-1][0][0])
def test_5_start_handle(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
msg_mock = MagicMock()
with patch.multiple('main.telegram', _conf=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.telegram', _CONF=self.conf, init=MagicMock(), send_msg=msg_mock):
init(self.conf, 'sqlite://')
update_state(State.PAUSED)
@ -173,9 +172,9 @@ class TestTelegram(unittest.TestCase):
self.assertEqual(msg_mock.call_count, 0)
def test_6_stop_handle(self):
with patch.dict('main._conf', self.conf):
with patch.dict('main._CONF', self.conf):
msg_mock = MagicMock()
with patch.multiple('main.telegram', _conf=self.conf, init=MagicMock(), send_msg=msg_mock):
with patch.multiple('main.telegram', _CONF=self.conf, init=MagicMock(), send_msg=msg_mock):
init(self.conf, 'sqlite://')
update_state(State.RUNNING)
@ -191,7 +190,7 @@ class TestTelegram(unittest.TestCase):
@classmethod
def setUpClass(cls):
validate(cls.conf, conf_schema)
validate(cls.conf, CONF_SCHEMA)
if __name__ == '__main__':