From 3473fd3c904503975f7343627ce693af4d2b8813 Mon Sep 17 00:00:00 2001 From: gcarq Date: Fri, 8 Sep 2017 15:51:00 +0200 Subject: [PATCH] major refactoring to allow proper testing This commit includes: * Reducing complexity of modules * Remove unneeded wrapper classes * Implement init() for each module which initializes everything based on the config * Implement some basic tests --- exchange.py | 354 ++++++++++++++------------- main.py | 263 +++++++++++--------- utils.py => misc.py | 27 +-- persistence.py | 49 ++-- rpc/__init__.py | 1 + rpc/telegram.py | 503 ++++++++++++++++++++------------------- test/__init__.py | 0 test/test_main.py | 112 +++++++++ test/test_persistence.py | 28 +++ 9 files changed, 747 insertions(+), 590 deletions(-) rename utils.py => misc.py (75%) create mode 100644 test/__init__.py create mode 100644 test/test_main.py create mode 100644 test/test_persistence.py diff --git a/exchange.py b/exchange.py index 4385073a1..f81a9fd7e 100644 --- a/exchange.py +++ b/exchange.py @@ -4,11 +4,13 @@ from typing import List from bittrex.bittrex import Bittrex from poloniex import Poloniex -from wrapt import synchronized logger = logging.getLogger(__name__) -_exchange_api = None + +cur_exchange = None +_api = None +_conf = {} class Exchange(enum.Enum): @@ -16,192 +18,184 @@ class Exchange(enum.Enum): BITTREX = 1 -class ApiWrapper(object): +def init(config: dict) -> None: """ - Wrapper for exchanges. - Currently implemented: - * Bittrex - * Poloniex (partly) + Initializes this module with the given config, + it does basic validation whether the specified + exchange and pairs are valid. + :param config: config to use + :return: None """ - def __init__(self, config: dict): - """ - Initializes the ApiWrapper with the given config, - it does basic validation whether the specified - exchange and pairs are valid. - :param config: dict - """ - self.dry_run = config['dry_run'] - if self.dry_run: - logger.info('Instance is running with dry_run enabled') + global _api, cur_exchange - use_poloniex = config.get('poloniex', {}).get('enabled', False) - use_bittrex = config.get('bittrex', {}).get('enabled', False) + _conf.update(config) - if use_poloniex: - self.exchange = Exchange.POLONIEX - self.api = Poloniex(key=config['poloniex']['key'], secret=config['poloniex']['secret']) - elif use_bittrex: - self.exchange = Exchange.BITTREX - self.api = Bittrex(api_key=config['bittrex']['key'], api_secret=config['bittrex']['secret']) - else: - self.api = None - raise RuntimeError('No exchange specified. Aborting!') + if config['dry_run']: + logger.info('Instance is running with dry_run enabled') - # Check if all pairs are available - markets = self.get_markets() - for pair in config[self.exchange.name.lower()]['pair_whitelist']: - if pair not in markets: - raise RuntimeError('Pair {} is not available at Poloniex'.format(pair)) + use_poloniex = config.get('poloniex', {}).get('enabled', False) + use_bittrex = config.get('bittrex', {}).get('enabled', False) - def buy(self, pair: str, rate: float, amount: float) -> str: - """ - Places a limit buy order. - :param pair: Pair as str, format: BTC_ETH - :param rate: Rate limit for order - :param amount: The amount to purchase - :return: order_id of the placed buy order - """ - if self.dry_run: - pass - elif self.exchange == Exchange.POLONIEX: - self.api.buy(pair, rate, amount) - # TODO: return order id - elif self.exchange == Exchange.BITTREX: - data = self.api.buy_limit(pair.replace('_', '-'), amount, rate) - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - return data['result']['uuid'] + if use_poloniex: + cur_exchange = Exchange.POLONIEX + _api = Poloniex(key=config['poloniex']['key'], secret=config['poloniex']['secret']) + elif use_bittrex: + cur_exchange = Exchange.BITTREX + _api = Bittrex(api_key=config['bittrex']['key'], api_secret=config['bittrex']['secret']) + else: + raise RuntimeError('No exchange specified. Aborting!') - def sell(self, pair: str, rate: float, amount: float) -> str: - """ - Places a limit sell order. - :param pair: Pair as str, format: BTC_ETH - :param rate: Rate limit for order - :param amount: The amount to sell - :return: None - """ - if self.dry_run: - pass - elif self.exchange == Exchange.POLONIEX: - self.api.sell(pair, rate, amount) - # TODO: return order id - elif self.exchange == Exchange.BITTREX: - data = self.api.sell_limit(pair.replace('_', '-'), amount, rate) - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - return data['result']['uuid'] - - def get_balance(self, currency: str) -> float: - """ - Get account balance. - :param currency: currency as str, format: BTC - :return: float - """ - if self.dry_run: - return 999.9 - elif self.exchange == Exchange.POLONIEX: - data = self.api.returnBalances() - return float(data[currency]) - elif self.exchange == Exchange.BITTREX: - data = self.api.get_balance(currency) - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - return float(data['result']['Balance'] or 0.0) - - def get_ticker(self, pair: str) -> dict: - """ - Get Ticker for given pair. - :param pair: Pair as str, format: BTC_ETC - :return: dict - """ - if self.exchange == Exchange.POLONIEX: - data = self.api.returnTicker() - return { - 'bid': float(data[pair]['highestBid']), - 'ask': float(data[pair]['lowestAsk']), - 'last': float(data[pair]['last']) - } - elif self.exchange == Exchange.BITTREX: - data = self.api.get_ticker(pair.replace('_', '-')) - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - return { - 'bid': float(data['result']['Bid']), - 'ask': float(data['result']['Ask']), - 'last': float(data['result']['Last']), - } - - def cancel_order(self, order_id: str) -> None: - """ - Cancel order for given order_id - :param order_id: id as str - :return: None - """ - if self.dry_run: - pass - elif self.exchange == Exchange.POLONIEX: - raise NotImplemented('Not implemented') - elif self.exchange == Exchange.BITTREX: - data = self.api.cancel(order_id) - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - - def get_open_orders(self, pair: str) -> List[dict]: - """ - Get all open orders for given pair. - :param pair: Pair as str, format: BTC_ETC - :return: list of dicts - """ - if self.dry_run: - return [] - elif self.exchange == Exchange.POLONIEX: - raise NotImplemented('Not implemented') - elif self.exchange == Exchange.BITTREX: - data = self.api.get_open_orders(pair.replace('_', '-')) - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - return [{ - 'id': entry['OrderUuid'], - 'type': entry['OrderType'], - 'opened': entry['Opened'], - 'rate': entry['PricePerUnit'], - 'amount': entry['Quantity'], - 'remaining': entry['QuantityRemaining'], - } for entry in data['result']] - - def get_pair_detail_url(self, pair: str) -> str: - """ - Returns the market detail url for the given pair - :param pair: pair as str, format: BTC_ANT - :return: url as str - """ - if self.exchange == Exchange.POLONIEX: - raise NotImplemented('Not implemented') - elif self.exchange == Exchange.BITTREX: - return 'https://bittrex.com/Market/Index?MarketName={}'.format(pair.replace('_', '-')) - - def get_markets(self) -> List[str]: - """ - Returns all available markets - :return: list of all available pairs - """ - if self.exchange == Exchange.POLONIEX: - # TODO: implement - raise NotImplemented('Not implemented') - elif self.exchange == Exchange. BITTREX: - data = self.api.get_markets() - if not data['success']: - raise RuntimeError('BITTREX: {}'.format(data['message'])) - return [m['MarketName'].replace('-', '_') for m in data['result']] + # Check if all pairs are available + markets = get_markets() + for pair in config[cur_exchange.name.lower()]['pair_whitelist']: + if pair not in markets: + raise RuntimeError('Pair {} is not available at Poloniex'.format(pair)) -@synchronized -def get_exchange_api(conf: dict) -> ApiWrapper: +def buy(pair: str, rate: float, amount: float) -> str: """ - Returns the current exchange api or instantiates a new one - :return: exchange.ApiWrapper + Places a limit buy order. + :param pair: Pair as str, format: BTC_ETH + :param rate: Rate limit for order + :param amount: The amount to purchase + :return: order_id of the placed buy order """ - global _exchange_api - if not _exchange_api: - _exchange_api = ApiWrapper(conf) - return _exchange_api + if _conf['dry_run']: + return 'dry_run' + elif cur_exchange == Exchange.POLONIEX: + _api.buy(pair, rate, amount) + # TODO: return order id + elif cur_exchange == Exchange.BITTREX: + data = _api.buy_limit(pair.replace('_', '-'), amount, rate) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + return data['result']['uuid'] + + +def sell(pair: str, rate: float, amount: float) -> str: + """ + Places a limit sell order. + :param pair: Pair as str, format: BTC_ETH + :param rate: Rate limit for order + :param amount: The amount to sell + :return: None + """ + if _conf['dry_run']: + return 'dry_run' + elif cur_exchange == Exchange.POLONIEX: + _api.sell(pair, rate, amount) + # TODO: return order id + elif cur_exchange == Exchange.BITTREX: + data = _api.sell_limit(pair.replace('_', '-'), amount, rate) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + return data['result']['uuid'] + + +def get_balance(currency: str) -> float: + """ + Get account balance. + :param currency: currency as str, format: BTC + :return: float + """ + if _conf['dry_run']: + return 999.9 + elif cur_exchange == Exchange.POLONIEX: + data = _api.returnBalances() + return float(data[currency]) + elif cur_exchange == Exchange.BITTREX: + data = _api.get_balance(currency) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + return float(data['result']['Balance'] or 0.0) + + +def get_ticker(pair: str) -> dict: + """ + Get Ticker for given pair. + :param pair: Pair as str, format: BTC_ETC + :return: dict + """ + if cur_exchange == Exchange.POLONIEX: + data = _api.returnTicker() + return { + 'bid': float(data[pair]['highestBid']), + 'ask': float(data[pair]['lowestAsk']), + 'last': float(data[pair]['last']) + } + elif cur_exchange == Exchange.BITTREX: + data = _api.get_ticker(pair.replace('_', '-')) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + return { + 'bid': float(data['result']['Bid']), + 'ask': float(data['result']['Ask']), + 'last': float(data['result']['Last']), + } + + +def cancel_order(order_id: str) -> None: + """ + Cancel order for given order_id + :param order_id: id as str + :return: None + """ + if _conf['dry_run']: + pass + elif cur_exchange == Exchange.POLONIEX: + raise NotImplemented('Not implemented') + elif cur_exchange == Exchange.BITTREX: + data = _api.cancel(order_id) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + + +def get_open_orders(pair: str) -> List[dict]: + """ + Get all open orders for given pair. + :param pair: Pair as str, format: BTC_ETC + :return: list of dicts + """ + if _conf['dry_run']: + return [] + elif cur_exchange == Exchange.POLONIEX: + raise NotImplemented('Not implemented') + elif cur_exchange == Exchange.BITTREX: + data = _api.get_open_orders(pair.replace('_', '-')) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + return [{ + 'id': entry['OrderUuid'], + 'type': entry['OrderType'], + 'opened': entry['Opened'], + 'rate': entry['PricePerUnit'], + 'amount': entry['Quantity'], + 'remaining': entry['QuantityRemaining'], + } for entry in data['result']] + + +def get_pair_detail_url(pair: str) -> str: + """ + Returns the market detail url for the given pair + :param pair: pair as str, format: BTC_ANT + :return: url as str + """ + if cur_exchange == Exchange.POLONIEX: + raise NotImplemented('Not implemented') + elif cur_exchange == Exchange.BITTREX: + return 'https://bittrex.com/Market/Index?MarketName={}'.format(pair.replace('_', '-')) + + +def get_markets() -> List[str]: + """ + Returns all available markets + :return: list of all available pairs + """ + if cur_exchange == Exchange.POLONIEX: + # TODO: implement + raise NotImplemented('Not implemented') + elif cur_exchange == Exchange. BITTREX: + data = _api.get_markets() + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + return [m['MarketName'].replace('-', '_') for m in data['result']] diff --git a/main.py b/main.py index a97419c94..d0c9287d6 100755 --- a/main.py +++ b/main.py @@ -1,19 +1,23 @@ #!/usr/bin/env python +import enum +import json import logging -import threading import time import traceback from datetime import datetime from json import JSONDecodeError from typing import Optional +from jsonschema import validate from requests import ConnectionError from wrapt import synchronized + +import exchange +import persistence +from rpc import telegram from analyze import get_buy_signal -from persistence import Trade, Session -from exchange import get_exchange_api, Exchange -from rpc.telegram import TelegramHandler -from utils import get_conf +from persistence import Trade +from misc import conf_schema logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -25,103 +29,79 @@ __license__ = "GPLv3" __version__ = "0.8.0" -CONFIG = get_conf() -api_wrapper = get_exchange_api(CONFIG) +class State(enum.Enum): + RUNNING = 0 + PAUSED = 1 + TERMINATE = 2 -class TradeThread(threading.Thread): - def __init__(self): - super().__init__() - self._should_stop = False - - def stop(self) -> None: - """ stops the trader thread """ - self._should_stop = True - - def run(self) -> None: - """ - Threaded main function - :return: None - """ - try: - TelegramHandler.send_msg('*Status:* `trader started`') - logger.info('Trader started') - while not self._should_stop: - try: - self._process() - except (ConnectionError, JSONDecodeError, ValueError) as error: - msg = 'Got {} during _process()'.format(error.__class__.__name__) - logger.exception(msg) - finally: - Session.flush() - time.sleep(25) - except (RuntimeError, JSONDecodeError): - TelegramHandler.send_msg('*Status:* Got RuntimeError: ```\n{}\n```'.format(traceback.format_exc())) - logger.exception('RuntimeError. Stopping trader ...') - finally: - TelegramHandler.send_msg('*Status:* `Trader has stopped`') - - @staticmethod - def _process() -> None: - """ - Queries the persistence layer for open trades and handles them, - otherwise a new trade is created. - :return: None - """ - # Query trades from persistence layer - trades = Trade.query.filter(Trade.is_open.is_(True)).all() - if len(trades) < CONFIG['max_open_trades']: - try: - # Create entity and execute trade - trade = create_trade(float(CONFIG['stake_amount']), api_wrapper.exchange) - if trade: - Session.add(trade) - else: - logging.info('Got no buy signal...') - except ValueError: - logger.exception('Unable to create trade') - - for trade in trades: - # Check if there is already an open order for this trade - orders = api_wrapper.get_open_orders(trade.pair) - orders = [o for o in orders if o['id'] == trade.open_order_id] - if orders: - msg = 'There exists an open order for {}: Order(total={}, remaining={}, type={}, id={})' \ - .format( - trade, - round(orders[0]['amount'], 8), - round(orders[0]['remaining'], 8), - orders[0]['type'], - orders[0]['id']) - logger.info(msg) - continue - - # Update state - trade.open_order_id = None - # Check if this trade can be marked as closed - if close_trade_if_fulfilled(trade): - logger.info('No open orders found and trade is fulfilled. Marking %s as closed ...', trade) - continue - - # Check if we can sell our current pair - handle_trade(trade) - -# Initial stopped TradeThread instance -_instance = TradeThread() +_conf = {} +_cur_state = State.RUNNING @synchronized -def get_instance(recreate: bool=False) -> TradeThread: +def update_state(state: State) -> None: """ - Get the current instance of this thread. This is a singleton. - :param recreate: Must be True if you want to start the instance - :return: TradeThread instance + Updates the application state + :param state: new state + :return: None """ - global _instance - if recreate and not _instance.is_alive(): - logger.debug('Creating thread instance...') - _instance = TradeThread() - return _instance + global _cur_state + _cur_state = state + + +@synchronized +def get_state() -> State: + """ + Gets the current application state + :return: + """ + return _cur_state + + +def _process() -> None: + """ + Queries the persistence layer for open trades and handles them, + otherwise a new trade is created. + :return: None + """ + # Query trades from persistence layer + trades = Trade.query.filter(Trade.is_open.is_(True)).all() + if len(trades) < _conf['max_open_trades']: + try: + # Create entity and execute trade + trade = create_trade(float(_conf['stake_amount']), exchange.cur_exchange) + if trade: + Trade.session.add(trade) + else: + logging.info('Got no buy signal...') + except ValueError: + logger.exception('Unable to create trade') + + for trade in trades: + # Check if there is already an open order for this trade + orders = exchange.get_open_orders(trade.pair) + orders = [o for o in orders if o['id'] == trade.open_order_id] + if orders: + msg = 'There exists an open order for {}: Order(total={}, remaining={}, type={}, id={})' \ + .format( + trade, + round(orders[0]['amount'], 8), + round(orders[0]['remaining'], 8), + orders[0]['type'], + orders[0]['id']) + logger.info(msg) + continue + + # Update state + trade.open_order_id = None + # Check if this trade can be marked as closed + if close_trade_if_fulfilled(trade): + logger.info('No open orders found and trade is fulfilled. Marking %s as closed ...', trade) + continue + + # Check if we can sell our current pair + handle_trade(trade) def close_trade_if_fulfilled(trade: Trade) -> bool: @@ -134,7 +114,6 @@ def close_trade_if_fulfilled(trade: Trade) -> bool: # we can close this trade. if trade.close_profit and trade.close_date and trade.close_rate and not trade.open_order_id: trade.is_open = False - Session.flush() return True return False @@ -150,14 +129,14 @@ def handle_trade(trade: Trade) -> None: logger.debug('Handling open trade %s ...', trade) # Get current rate - current_rate = api_wrapper.get_ticker(trade.pair)['bid'] + current_rate = exchange.get_ticker(trade.pair)['bid'] current_profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) # Get available balance currency = trade.pair.split('_')[1] - balance = api_wrapper.get_balance(currency) + balance = exchange.get_balance(currency) - for duration, threshold in sorted(CONFIG['minimal_roi'].items()): + for duration, threshold in sorted(_conf['minimal_roi'].items()): duration, threshold = float(duration), float(threshold) # Check if time matches and current rate is above threshold time_diff = (datetime.utcnow() - trade.open_date).total_seconds() / 60 @@ -167,12 +146,12 @@ def handle_trade(trade: Trade) -> None: message = '*{}:* Selling [{}]({}) at rate `{:f} (profit: {}%)`'.format( trade.exchange.name, trade.pair.replace('_', '/'), - api_wrapper.get_pair_detail_url(trade.pair), + exchange.get_pair_detail_url(trade.pair), trade.close_rate, round(profit, 2) ) logger.info(message) - TelegramHandler.send_msg(message) + telegram.send_msg(message) return else: logger.debug('Threshold not reached. (cur_profit: %1.2f%%)', current_profit) @@ -180,18 +159,18 @@ def handle_trade(trade: Trade) -> None: logger.exception('Unable to handle open order') -def create_trade(stake_amount: float, exchange: Exchange) -> Optional[Trade]: +def create_trade(stake_amount: float, _exchange: exchange.Exchange) -> Optional[Trade]: """ Checks the implemented trading indicator(s) for a randomly picked pair, if one pair triggers the buy_signal a new trade record gets created :param stake_amount: amount of btc to spend - :param exchange: exchange to use + :param _exchange: exchange to use """ logger.info('Creating new trade with stake_amount: %f ...', stake_amount) - whitelist = CONFIG[exchange.name.lower()]['pair_whitelist'] + whitelist = _conf[_exchange.name.lower()]['pair_whitelist'] # Check if btc_amount is fulfilled - if api_wrapper.get_balance(CONFIG['stake_currency']) < stake_amount: - raise ValueError('stake amount is not fulfilled (currency={}'.format(CONFIG['stake_currency'])) + if exchange.get_balance(_conf['stake_currency']) < stake_amount: + raise ValueError('stake amount is not fulfilled (currency={}'.format(_conf['stake_currency'])) # Remove currently opened and latest pairs from whitelist trades = Trade.query.filter(Trade.is_open.is_(True)).all() @@ -213,30 +192,78 @@ def create_trade(stake_amount: float, exchange: Exchange) -> Optional[Trade]: else: return None - open_rate = api_wrapper.get_ticker(pair)['ask'] + open_rate = exchange.get_ticker(pair)['ask'] amount = stake_amount / open_rate - exchange = exchange - order_id = api_wrapper.buy(pair, open_rate, amount) + order_id = exchange.buy(pair, open_rate, amount) # Create trade entity and return message = '*{}:* Buying [{}]({}) at rate `{:f}`'.format( - exchange.name, + _exchange.name, pair.replace('_', '/'), - api_wrapper.get_pair_detail_url(pair), + exchange.get_pair_detail_url(pair), open_rate ) logger.info(message) - TelegramHandler.send_msg(message) + telegram.send_msg(message) return Trade(pair=pair, btc_amount=stake_amount, open_rate=open_rate, + open_date=datetime.utcnow(), amount=amount, - exchange=exchange, - open_order_id=order_id) + exchange=_exchange, + open_order_id=order_id, + is_open=True) + + +def init(config: dict) -> None: + """ + Initializes all modules and updates the config + :param config: config as dict + :return: None + """ + global _conf + + # Initialize all modules + telegram.init(config) + persistence.init(config) + exchange.init(config) + _conf.update(config) + + +def app(config: dict) -> None: + + logger.info('Starting freqtrade %s', __version__) + init(config) + + try: + telegram.send_msg('*Status:* `trader started`') + logger.info('Trader started') + while True: + state = get_state() + if state == State.TERMINATE: + return + elif state == State.PAUSED: + time.sleep(1) + elif state == State.RUNNING: + try: + _process() + except (ConnectionError, JSONDecodeError, ValueError) as error: + msg = 'Got {} during _process()'.format(error.__class__.__name__) + logger.exception(msg) + finally: + time.sleep(25) + except (RuntimeError, JSONDecodeError): + telegram.send_msg( + '*Status:* Got RuntimeError: ```\n{}\n```'.format(traceback.format_exc()) + ) + logger.exception('RuntimeError. Stopping trader ...') + finally: + telegram.send_msg('*Status:* `Trader has stopped`') if __name__ == '__main__': - logger.info('Starting freqtrade %s', __version__) - TelegramHandler.listen() - while True: - time.sleep(0.5) + with open('config.json') as file: + conf = json.load(file) + validate(conf, conf_schema) + app(conf) + diff --git a/utils.py b/misc.py similarity index 75% rename from utils.py rename to misc.py index e7bb006fd..6b67d0a64 100644 --- a/utils.py +++ b/misc.py @@ -1,16 +1,6 @@ -import json -import logging - -from jsonschema import validate -from wrapt import synchronized - -logger = logging.getLogger(__name__) - -_cur_conf = None - # Required json-schema for user specified config -_conf_schema = { +conf_schema = { 'type': 'object', 'properties': { 'max_open_trades': {'type': 'integer'}, @@ -65,18 +55,3 @@ _conf_schema = { 'telegram' ] } - - -@synchronized -def get_conf(filename: str='config.json') -> dict: - """ - Loads the config into memory validates it - and returns the singleton instance - :return: dict - """ - global _cur_conf - if not _cur_conf: - with open(filename) as file: - _cur_conf = json.load(file) - validate(_cur_conf, _conf_schema) - return _cur_conf diff --git a/persistence.py b/persistence.py index f8c954983..a0a85ed3d 100644 --- a/persistence.py +++ b/persistence.py @@ -5,27 +5,48 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.types import Enum -from exchange import Exchange, get_exchange_api -from utils import get_conf +import exchange + + +_db_handle = None +_session = None +_conf = {} -conf = get_conf() -if conf.get('dry_run', False): - db_handle = 'sqlite:///tradesv2.dry_run.sqlite' -else: - db_handle = 'sqlite:///tradesv2.sqlite' -engine = create_engine(db_handle, echo=False) -Session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True)) Base = declarative_base() +def init(config: dict) -> None: + """ + Initializes this module with the given config, + registers all known command handlers + and starts polling for message updates + :param config: config to use + :return: None + """ + global _db_handle, _session + _conf.update(config) + if _conf.get('dry_run', False): + _db_handle = 'sqlite:///tradesv2.dry_run.sqlite' + else: + _db_handle = 'sqlite:///tradesv2.sqlite' + + engine = create_engine(_db_handle, echo=False) + _session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True)) + Trade.session = _session + Trade.query = _session.query_property() + Base.metadata.create_all(engine) + + +def get_session(): + return _session + + class Trade(Base): __tablename__ = 'trades' - query = Session.query_property() - id = Column(Integer, primary_key=True) - exchange = Column(Enum(Exchange), nullable=False) + exchange = Column(Enum(exchange.Exchange), nullable=False) pair = Column(String, nullable=False) is_open = Column(Boolean, nullable=False, default=True) open_rate = Column(Float, nullable=False) @@ -56,12 +77,10 @@ class Trade(Base): profit = 100 * ((rate - self.open_rate) / self.open_rate) # Execute sell and update trade record - order_id = get_exchange_api(conf).sell(self.pair, rate, amount) + order_id = exchange.sell(str(self.pair), rate, amount) self.close_rate = rate self.close_profit = profit self.close_date = datetime.utcnow() self.open_order_id = order_id - Session.flush() return profit -Base.metadata.create_all(engine) diff --git a/rpc/__init__.py b/rpc/__init__.py index e69de29bb..35fa001c3 100644 --- a/rpc/__init__.py +++ b/rpc/__init__.py @@ -0,0 +1 @@ +from . import telegram diff --git a/rpc/telegram.py b/rpc/telegram.py index efb8746fa..d6abeba61 100644 --- a/rpc/telegram.py +++ b/rpc/telegram.py @@ -9,9 +9,9 @@ from telegram.ext import CommandHandler, Updater from telegram import ParseMode, Bot, Update from wrapt import synchronized -from persistence import Trade, Session -from exchange import get_exchange_api -from utils import get_conf +from persistence import Trade + +import exchange # Remove noisy log messages logging.getLogger('requests.packages.urllib3').setLevel(logging.INFO) @@ -19,9 +19,38 @@ logging.getLogger('telegram').setLevel(logging.INFO) logger = logging.getLogger(__name__) _updater = None +_conf = {} -conf = get_conf() -api_wrapper = get_exchange_api(conf) + +def init(config: dict) -> None: + """ + Initializes this module with the given config, + registers all known command handlers + and starts polling for message updates + :param config: config to use + :return: None + """ + _conf.update(config) + + # Register command handler and start telegram message polling + handles = [ + CommandHandler('status', _status), + CommandHandler('profit', _profit), + CommandHandler('start', _start), + CommandHandler('stop', _stop), + CommandHandler('forcesell', _forcesell), + CommandHandler('performance', _performance), + ] + for handle in handles: + get_updater(_conf).dispatcher.add_handler(handle) + get_updater(_conf).start_polling( + clean=True, + bootstrap_retries=3, + timeout=30, + read_latency=60, + ) + logger.info('rpc.telegram is listening for following commands: {}' + .format([h.command for h in handles])) def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]: @@ -35,7 +64,7 @@ def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[ if not isinstance(bot, Bot) or not isinstance(update, Update): raise ValueError('Received invalid Arguments: {}'.format(*args)) - chat_id = int(conf['telegram']['chat_id']) + chat_id = int(_conf['telegram']['chat_id']) if int(update.message.chat_id) == chat_id: logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id) return command_handler(*args, **kwargs) @@ -44,33 +73,31 @@ def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[ return wrapper -class TelegramHandler(object): - @staticmethod - @authorized_only - def _status(bot: Bot, update: Update) -> None: - """ - Handler for /status. - Returns the current TradeThread status - :param bot: telegram bot - :param update: message update - :return: None - """ - # Fetch open trade - trades = Trade.query.filter(Trade.is_open.is_(True)).all() - from main import get_instance - if not get_instance().is_alive(): - TelegramHandler.send_msg('*Status:* `trader is not running`', bot=bot) - elif not trades: - TelegramHandler.send_msg('*Status:* `no active order`', bot=bot) - else: - for trade in trades: - # calculate profit and send message to user - current_rate = api_wrapper.get_ticker(trade.pair)['bid'] - current_profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) - orders = api_wrapper.get_open_orders(trade.pair) - orders = [o for o in orders if o['id'] == trade.open_order_id] - order = orders[0] if orders else None - message = """ +@authorized_only +def _status(bot: Bot, update: Update) -> None: + """ + Handler for /status. + Returns the current TradeThread status + :param bot: telegram bot + :param update: message update + :return: None + """ + # Fetch open trade + trades = Trade.query.filter(Trade.is_open.is_(True)).all() + from main import get_state, State + if not get_state() == State.RUNNING: + send_msg('*Status:* `trader is not running`', bot=bot) + elif not trades: + send_msg('*Status:* `no active order`', bot=bot) + else: + for trade in trades: + # calculate profit and send message to user + current_rate = exchange.get_ticker(trade.pair)['bid'] + current_profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) + orders = exchange.get_open_orders(trade.pair) + orders = [o for o in orders if o['id'] == trade.open_order_id] + order = orders[0] if orders else None + message = """ *Trade ID:* `{trade_id}` *Current Pair:* [{pair}]({market_url}) *Open Since:* `{date}` @@ -81,239 +108,213 @@ class TelegramHandler(object): *Close Profit:* `{close_profit}` *Current Profit:* `{current_profit}%` *Open Order:* `{open_order}` - """.format( - trade_id=trade.id, - pair=trade.pair, - market_url=api_wrapper.get_pair_detail_url(trade.pair), - date=arrow.get(trade.open_date).humanize(), - open_rate=trade.open_rate, - close_rate=trade.close_rate, - current_rate=current_rate, - amount=round(trade.amount, 8), - close_profit='{}%'.format(round(trade.close_profit, 2)) if trade.close_profit else None, - current_profit=round(current_profit, 2), - open_order='{} ({})'.format(order['remaining'], order['type']) if order else None, - ) - TelegramHandler.send_msg(message, bot=bot) + """.format( + trade_id=trade.id, + pair=trade.pair, + market_url=exchange.get_pair_detail_url(trade.pair), + date=arrow.get(trade.open_date).humanize(), + open_rate=trade.open_rate, + close_rate=trade.close_rate, + current_rate=current_rate, + amount=round(trade.amount, 8), + close_profit='{}%'.format(round(trade.close_profit, 2)) if trade.close_profit else None, + current_profit=round(current_profit, 2), + open_order='{} ({})'.format(order['remaining'], order['type']) if order else None, + ) + send_msg(message, bot=bot) - @staticmethod - @authorized_only - def _profit(bot: Bot, update: Update) -> None: - """ - Handler for /profit. - Returns a cumulative profit statistics. - :param bot: telegram bot - :param update: message update - :return: None - """ - trades = Trade.query.order_by(Trade.id).all() - profit_amounts = [] - profits = [] - durations = [] - for trade in trades: - if trade.close_date: - durations.append((trade.close_date - trade.open_date).total_seconds()) - if trade.close_profit: - profit = trade.close_profit - else: - # Get current rate - current_rate = api_wrapper.get_ticker(trade.pair)['bid'] - profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) +@authorized_only +def _profit(bot: Bot, update: Update) -> None: + """ + Handler for /profit. + Returns a cumulative profit statistics. + :param bot: telegram bot + :param update: message update + :return: None + """ + trades = Trade.query.order_by(Trade.id).all() - profit_amounts.append((profit / 100) * trade.btc_amount) - profits.append(profit) + profit_amounts = [] + profits = [] + durations = [] + for trade in trades: + if trade.close_date: + durations.append((trade.close_date - trade.open_date).total_seconds()) + if trade.close_profit: + profit = trade.close_profit + else: + # Get current rate + current_rate = exchange.get_ticker(trade.pair)['bid'] + profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) - bp_pair, bp_rate = Session.query(Trade.pair, func.sum(Trade.close_profit).label('profit_sum')) \ - .filter(Trade.is_open.is_(False)) \ - .group_by(Trade.pair) \ - .order_by('profit_sum DESC') \ - .first() + profit_amounts.append((profit / 100) * trade.btc_amount) + profits.append(profit) - markdown_msg = """ + bp_pair, bp_rate = Trade.session.query(Trade.pair, func.sum(Trade.close_profit).label('profit_sum')) \ + .filter(Trade.is_open.is_(False)) \ + .group_by(Trade.pair) \ + .order_by('profit_sum DESC') \ + .first() + + markdown_msg = """ *ROI:* `{profit_btc} ({profit}%)` *Trade Count:* `{trade_count}` *First Trade opened:* `{first_trade_date}` *Latest Trade opened:* `{latest_trade_date}` *Avg. Duration:* `{avg_duration}` *Best Performing:* `{best_pair}: {best_rate}%` - """.format( - profit_btc=round(sum(profit_amounts), 8), - profit=round(sum(profits), 2), - trade_count=len(trades), - first_trade_date=arrow.get(trades[0].open_date).humanize(), - latest_trade_date=arrow.get(trades[-1].open_date).humanize(), - avg_duration=str(timedelta(seconds=sum(durations) / float(len(durations)))).split('.')[0], - best_pair=bp_pair, - best_rate=round(bp_rate, 2), - ) - TelegramHandler.send_msg(markdown_msg, bot=bot) + """.format( + profit_btc=round(sum(profit_amounts), 8), + profit=round(sum(profits), 2), + trade_count=len(trades), + first_trade_date=arrow.get(trades[0].open_date).humanize(), + latest_trade_date=arrow.get(trades[-1].open_date).humanize(), + avg_duration=str(timedelta(seconds=sum(durations) / float(len(durations)))).split('.')[0], + best_pair=bp_pair, + best_rate=round(bp_rate, 2), + ) + send_msg(markdown_msg, bot=bot) - @staticmethod - @authorized_only - def _start(bot: Bot, update: Update) -> None: - """ - Handler for /start. - Starts TradeThread - :param bot: telegram bot - :param update: message update - :return: None - """ - from main import get_instance - if get_instance().is_alive(): - TelegramHandler.send_msg('*Status:* `already running`', bot=bot) - else: - get_instance(recreate=True).start() - @staticmethod - @authorized_only - def _stop(bot: Bot, update: Update) -> None: - """ - Handler for /stop. - Stops TradeThread - :param bot: telegram bot - :param update: message update - :return: None - """ - from main import get_instance - if get_instance().is_alive(): - TelegramHandler.send_msg('`Stopping trader ...`', bot=bot) - get_instance().stop() - else: - TelegramHandler.send_msg('*Status:* `already stopped`', bot=bot) +@authorized_only +def _start(bot: Bot, update: Update) -> None: + """ + Handler for /start. + Starts TradeThread + :param bot: telegram bot + :param update: message update + :return: None + """ + from main import get_state, State, update_state + if get_state() == State.RUNNING: + send_msg('*Status:* `already running`', bot=bot) + else: + update_state(State.RUNNING) - @staticmethod - @authorized_only - def _forcesell(bot: Bot, update: Update) -> None: - """ - Handler for /forcesell . - Sells the given trade at current price - :param bot: telegram bot - :param update: message update - :return: None - """ - from main import get_instance - if not get_instance().is_alive(): - TelegramHandler.send_msg('`trader is not running`', bot=bot) + +@authorized_only +def _stop(bot: Bot, update: Update) -> None: + """ + Handler for /stop. + Stops TradeThread + :param bot: telegram bot + :param update: message update + :return: None + """ + from main import get_state, State, update_state + if get_state() == State.RUNNING: + send_msg('`Stopping trader ...`', bot=bot) + update_state(State.PAUSED) + else: + send_msg('*Status:* `already stopped`', bot=bot) + + +@authorized_only +def _forcesell(bot: Bot, update: Update) -> None: + """ + Handler for /forcesell . + Sells the given trade at current price + :param bot: telegram bot + :param update: message update + :return: None + """ + from main import get_state, State + if get_state() != State.RUNNING: + send_msg('`trader is not running`', bot=bot) + return + + try: + trade_id = int(update.message.text + .replace('/forcesell', '') + .strip()) + # Query for trade + trade = Trade.query.filter(and_( + Trade.id == trade_id, + Trade.is_open.is_(True) + )).first() + if not trade: + send_msg('There is no open trade with ID: `{}`'.format(trade_id)) return + # Get current rate + current_rate = exchange.get_ticker(trade.pair)['bid'] + # Get available balance + currency = trade.pair.split('_')[1] + balance = exchange.get_balance(currency) + # Execute sell + profit = trade.exec_sell_order(current_rate, balance) + message = '*{}:* Selling [{}]({}) at rate `{:f} (profit: {}%)`'.format( + trade.exchange.name, + trade.pair.replace('_', '/'), + exchange.get_pair_detail_url(trade.pair), + trade.close_rate, + round(profit, 2) + ) + logger.info(message) + send_msg(message) + except ValueError: + send_msg('Invalid argument. Usage: `/forcesell `') + logger.warning('/forcesell: Invalid argument received') + + +@authorized_only +def _performance(bot: Bot, update: Update) -> None: + """ + Handler for /performance. + Shows a performance statistic from finished trades + :param bot: telegram bot + :param update: message update + :return: None + """ + from main import get_state, State + if get_state() != State.RUNNING: + send_msg('`trader is not running`', bot=bot) + return + + pair_rates = Trade.session.query(Trade.pair, func.sum(Trade.close_profit).label('profit_sum')) \ + .filter(Trade.is_open.is_(False)) \ + .group_by(Trade.pair) \ + .order_by('profit_sum DESC') \ + .all() + + stats = '\n'.join('{}. {}\t{}%'.format(i + 1, pair, round(rate, 2)) for i, (pair, rate) in enumerate(pair_rates)) + + message = 'Performance:\n{}\n'.format(stats) + logger.debug(message) + send_msg(message, parse_mode=ParseMode.HTML) + + +@synchronized +def get_updater(config: dict) -> Updater: + """ + Returns the current telegram updater or instantiates a new one + :param config: dict + :return: telegram.ext.Updater + """ + global _updater + if not _updater: + _updater = Updater(token=config['telegram']['token'], workers=0) + return _updater + + +def send_msg(msg: str, bot: Bot=None, parse_mode: ParseMode=ParseMode.MARKDOWN) -> None: + """ + Send given markdown message + :param msg: message + :param bot: alternative bot + :param parse_mode: telegram parse mode + :return: None + """ + if _conf['telegram'].get('enabled', False): try: - trade_id = int(update.message.text - .replace('/forcesell', '') - .strip()) - # Query for trade - trade = Trade.query.filter(and_( - Trade.id == trade_id, - Trade.is_open.is_(True) - )).first() - if not trade: - TelegramHandler.send_msg('There is no open trade with ID: `{}`'.format(trade_id)) - return - # Get current rate - current_rate = api_wrapper.get_ticker(trade.pair)['bid'] - # Get available balance - currency = trade.pair.split('_')[1] - balance = api_wrapper.get_balance(currency) - # Execute sell - profit = trade.exec_sell_order(current_rate, balance) - message = '*{}:* Selling [{}]({}) at rate `{:f} (profit: {}%)`'.format( - trade.exchange.name, - trade.pair.replace('_', '/'), - api_wrapper.get_pair_detail_url(trade.pair), - trade.close_rate, - round(profit, 2) - ) - logger.info(message) - TelegramHandler.send_msg(message) - - except ValueError: - TelegramHandler.send_msg('Invalid argument. Usage: `/forcesell `') - logger.warning('/forcesell: Invalid argument received') - - @staticmethod - @authorized_only - def _performance(bot: Bot, update: Update) -> None: - """ - Handler for /performance. - Shows a performance statistic from finished trades - :param bot: telegram bot - :param update: message update - :return: None - """ - from main import get_instance - if not get_instance().is_alive(): - TelegramHandler.send_msg('`trader is not running`', bot=bot) - return - - pair_rates = Session.query(Trade.pair, func.sum(Trade.close_profit).label('profit_sum')) \ - .filter(Trade.is_open.is_(False)) \ - .group_by(Trade.pair) \ - .order_by('profit_sum DESC') \ - .all() - - stats = '\n'.join('{}. {}\t{}%'.format(i + 1, pair, round(rate, 2)) for i, (pair, rate) in enumerate(pair_rates)) - - message = 'Performance:\n{}\n'.format(stats) - logger.debug(message) - TelegramHandler.send_msg(message, parse_mode=ParseMode.HTML) - - @staticmethod - @synchronized - def get_updater(config: dict) -> Updater: - """ - Returns the current telegram updater or instantiates a new one - :param config: dict - :return: telegram.ext.Updater - """ - global _updater - if not _updater: - _updater = Updater(token=config['telegram']['token'], workers=0) - return _updater - - @staticmethod - def listen() -> None: - """ - Registers all known command handlers and starts polling for message updates - :return: None - """ - # Register command handler and start telegram message polling - handles = [ - CommandHandler('status', TelegramHandler._status), - CommandHandler('profit', TelegramHandler._profit), - CommandHandler('start', TelegramHandler._start), - CommandHandler('stop', TelegramHandler._stop), - CommandHandler('forcesell', TelegramHandler._forcesell), - CommandHandler('performance', TelegramHandler._performance), - ] - for handle in handles: - TelegramHandler.get_updater(conf).dispatcher.add_handler(handle) - TelegramHandler.get_updater(conf).start_polling( - clean=True, - bootstrap_retries=3, - timeout=30, - read_latency=60, - ) - logger.info('TelegramHandler is listening for following commands: {}' - .format([h.command for h in handles])) - - @staticmethod - def send_msg(msg: str, bot: Bot=None, parse_mode: ParseMode=ParseMode.MARKDOWN) -> None: - """ - Send given markdown message - :param msg: message - :param bot: alternative bot - :param parse_mode: telegram parse mode - :return: None - """ - if conf['telegram'].get('enabled', False): + bot = bot or get_updater(_conf).bot try: - bot = bot or TelegramHandler.get_updater(conf).bot - try: - bot.send_message(conf['telegram']['chat_id'], msg, parse_mode=parse_mode) - except NetworkError as error: - # Sometimes the telegram server resets the current connection, - # if this is the case we send the message again. - logger.warning('Got Telegram NetworkError: %s! Trying one more time.', error.message) - bot.send_message(conf['telegram']['chat_id'], msg, parse_mode=parse_mode) - except Exception: - logger.exception('Exception occurred within Telegram API') + bot.send_message(_conf['telegram']['chat_id'], msg, parse_mode=parse_mode) + except NetworkError as error: + # Sometimes the telegram server resets the current connection, + # if this is the case we send the message again. + logger.warning('Got Telegram NetworkError: %s! Trying one more time.', error.message) + bot.send_message(_conf['telegram']['chat_id'], msg, parse_mode=parse_mode) + except Exception: + logger.exception('Exception occurred within Telegram API') diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/test_main.py b/test/test_main.py new file mode 100644 index 000000000..99094f72b --- /dev/null +++ b/test/test_main.py @@ -0,0 +1,112 @@ +import unittest +from unittest.mock import patch, MagicMock + +import os +from jsonschema import validate + +import exchange +from main import create_trade, handle_trade, close_trade_if_fulfilled, init +from misc import conf_schema +from persistence import Trade + + +class TestMain(unittest.TestCase): + conf = { + "max_open_trades": 3, + "stake_currency": "BTC", + "stake_amount": 0.05, + "dry_run": True, + "minimal_roi": { + "2880": 0.005, + "720": 0.01, + "0": 0.02 + }, + "poloniex": { + "enabled": False, + "key": "key", + "secret": "secret", + "pair_whitelist": [] + }, + "bittrex": { + "enabled": True, + "key": "key", + "secret": "secret", + "pair_whitelist": [ + "BTC_ETH" + ] + }, + "telegram": { + "enabled": True, + "token": "token", + "chat_id": "chat_id" + } + } + + def test_1_create_trade(self): + with patch.dict('main._conf', self.conf): + with patch('main.get_buy_signal', side_effect=lambda _: True) as buy_signal: + with patch.multiple('main.telegram', init=MagicMock(), send_msg=MagicMock()): + with patch.multiple('main.exchange', + get_ticker=MagicMock(return_value={ + 'bid': 0.07256061, + 'ask': 0.072661, + 'last': 0.07256061 + }), + buy=MagicMock(return_value='mocked_order_id')): + init(self.conf) + trade = create_trade(15.0, exchange.Exchange.BITTREX) + Trade.session.add(trade) + Trade.session.flush() + self.assertIsNotNone(trade) + self.assertEqual(trade.open_rate, 0.072661) + self.assertEqual(trade.pair, 'BTC_ETH') + self.assertEqual(trade.exchange, exchange.Exchange.BITTREX) + self.assertEqual(trade.amount, 206.43811673387373) + self.assertEqual(trade.btc_amount, 15.0) + self.assertEqual(trade.is_open, True) + self.assertIsNotNone(trade.open_date) + buy_signal.assert_called_once_with('BTC_ETH') + + def test_2_handle_trade(self): + with patch.dict('main._conf', self.conf): + with patch.multiple('main.telegram', init=MagicMock(), send_msg=MagicMock()): + with patch.multiple('main.exchange', + get_ticker=MagicMock(return_value={ + 'bid': 0.17256061, + 'ask': 0.172661, + 'last': 0.17256061 + }), + buy=MagicMock(return_value='mocked_order_id')): + trade = Trade.query.filter(Trade.is_open.is_(True)).first() + self.assertTrue(trade) + handle_trade(trade) + self.assertEqual(trade.close_rate, 0.17256061) + self.assertEqual(trade.close_profit, 137.4872490056564) + self.assertIsNotNone(trade.close_date) + self.assertEqual(trade.open_order_id, 'dry_run') + + def test_3_close_trade(self): + with patch.dict('main._conf', self.conf): + trade = Trade.query.filter(Trade.is_open.is_(True)).first() + self.assertTrue(trade) + + # Simulate that there is no open order + trade.open_order_id = None + + closed = close_trade_if_fulfilled(trade) + self.assertTrue(closed) + self.assertEqual(trade.is_open, False) + + @classmethod + def setUpClass(cls): + validate(cls.conf, conf_schema) + + @classmethod + def tearDownClass(cls): + try: + os.remove('./tradesv2.dry_run.sqlite') + except FileNotFoundError: + pass + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_persistence.py b/test/test_persistence.py new file mode 100644 index 000000000..25bdfd4e1 --- /dev/null +++ b/test/test_persistence.py @@ -0,0 +1,28 @@ +import unittest +from unittest.mock import patch, Mock + +from exchange import Exchange +from persistence import Trade + + +class TestTrade(unittest.TestCase): + def test_1_exec_sell_order(self): + with patch('main.exchange.sell', side_effect='mocked_order_id') as api_mock: + trade = Trade( + pair='BTC_ETH', + btc_amount=1.00, + open_rate=0.50, + amount=10.00, + exchange=Exchange.BITTREX, + open_order_id='mocked' + ) + profit = trade.exec_sell_order(1.00, 10.00) + api_mock.assert_called_once_with('BTC_ETH', 1.0, 10.0) + self.assertEqual(profit, 100.0) + self.assertEqual(trade.close_rate, 1.0) + self.assertEqual(trade.close_profit, profit) + self.assertIsNotNone(trade.close_date) + + +if __name__ == '__main__': + unittest.main()