diff --git a/config.json.example b/config.json.example index ffff69012..a9a9cad6e 100644 --- a/config.json.example +++ b/config.json.example @@ -19,12 +19,16 @@ "key": "key", "secret": "secret", "pair_whitelist": [ - "BTC_MLN", + "BTC_RLC", + "BTC_TKN", "BTC_TRST", + "BTC_SWT", + "BTC_PIVX", + "BTC_MLN", + "BTC_XZC", "BTC_TIME", "BTC_NXS", - "BTC_GBYTE", - "BTC_SNGLS" + "BTC_LUN" ] }, "telegram": { diff --git a/exchange.py b/exchange.py index 258123b22..5ac1cca3d 100644 --- a/exchange.py +++ b/exchange.py @@ -1,24 +1,23 @@ import enum -import threading - +import logging from bittrex.bittrex import Bittrex from poloniex import Poloniex +from wrapt import synchronized +logger = logging.getLogger(__name__) -_lock = threading.Condition() _exchange_api = None +@synchronized def get_exchange_api(conf): """ Returns the current exchange api or instantiates a new one :return: exchange.ApiWrapper """ global _exchange_api - _lock.acquire() if not _exchange_api: _exchange_api = ApiWrapper(conf) - _lock.release() return _exchange_api @@ -40,6 +39,8 @@ class ApiWrapper(object): :param config: dict """ self.dry_run = config['dry_run'] + if self.dry_run: + logger.info('Instance is running with dry_run enabled') use_poloniex = config.get('poloniex', {}).get('enabled', False) use_bittrex = config.get('bittrex', {}).get('enabled', False) @@ -65,10 +66,12 @@ class ApiWrapper(object): pass elif self.exchange == Exchange.POLONIEX: self.api.buy(pair, rate, amount) + # TODO: return order id elif self.exchange == Exchange.BITTREX: data = self.api.buy_limit(pair.replace('_', '-'), amount, rate) if not data['success']: raise RuntimeError('BITTREX: {}'.format(data['message'])) + return data['result']['uuid'] def sell(self, pair, rate, amount): """ @@ -82,10 +85,12 @@ class ApiWrapper(object): pass elif self.exchange == Exchange.POLONIEX: self.api.sell(pair, rate, amount) + # TODO: return order id elif self.exchange == Exchange.BITTREX: data = self.api.sell_limit(pair.replace('_', '-'), amount, rate) if not data['success']: raise RuntimeError('BITTREX: {}'.format(data['message'])) + return data['result']['uuid'] def get_balance(self, currency): """ @@ -93,7 +98,9 @@ class ApiWrapper(object): :param currency: currency as str, format: BTC :return: float """ - if self.exchange == Exchange.POLONIEX: + if self.dry_run: + return 999.9 + elif self.exchange == Exchange.POLONIEX: data = self.api.returnBalances() return float(data[currency]) elif self.exchange == Exchange.BITTREX: @@ -125,19 +132,37 @@ class ApiWrapper(object): 'last': float(data['result']['Last']), } + def cancel_order(self, order_id): + """ + Cancel order for given order_id + :param order_id: id as str + :return: None + """ + if self.dry_run: + pass + elif self.exchange == Exchange.POLONIEX: + raise NotImplemented('Not implemented') + elif self.exchange == Exchange.BITTREX: + data = self.api.cancel(order_id) + if not data['success']: + raise RuntimeError('BITTREX: {}'.format(data['message'])) + def get_open_orders(self, pair): """ Get all open orders for given pair. :param pair: Pair as str, format: BTC_ETC :return: list of dicts """ - if self.exchange == Exchange.POLONIEX: + if self.dry_run: + return [] + elif self.exchange == Exchange.POLONIEX: raise NotImplemented('Not implemented') elif self.exchange == Exchange.BITTREX: data = self.api.get_open_orders(pair.replace('_', '-')) if not data['success']: raise RuntimeError('BITTREX: {}'.format(data['message'])) return [{ + 'id': entry['OrderUuid'], 'type': entry['OrderType'], 'opened': entry['Opened'], 'rate': entry['PricePerUnit'], diff --git a/main.py b/main.py index 36ee5dee1..c5cbaba14 100755 --- a/main.py +++ b/main.py @@ -7,6 +7,8 @@ import time import traceback from datetime import datetime +from wrapt import synchronized + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @@ -19,44 +21,34 @@ from utils import get_conf __author__ = "gcarq" __copyright__ = "gcarq 2017" __license__ = "custom" -__version__ = "0.4" +__version__ = "0.5.1" conf = get_conf() api_wrapper = get_exchange_api(conf) -_lock = threading.Condition() -_instance = None -_should_stop = False + +@synchronized +def get_instance(recreate=False): + """ + Get the current instance of this thread. This is a singleton. + :param recreate: Must be True if you want to start the instance + :return: TradeThread instance + """ + global _instance, _should_stop + if recreate and not _instance.is_alive(): + logger.debug('Creating TradeThread instance') + _should_stop = False + _instance = TradeThread() + return _instance + + +def stop_instance(): + global _should_stop + _should_stop = True class TradeThread(threading.Thread): - @staticmethod - def get_instance(recreate=False): - """ - Get the current instance of this thread. This is a singleton. - :param recreate: Must be True if you want to start the instance - :return: TradeThread instance - """ - global _instance, _should_stop - _lock.acquire() - if _instance is None or (not _instance.is_alive() and recreate): - _should_stop = False - _instance = TradeThread() - _lock.release() - return _instance - - @staticmethod - def stop(): - """ - Sets stop signal for the current instance - :return: None - """ - global _should_stop - _lock.acquire() - _should_stop = True - _lock.release() - def run(self): """ Threaded main function @@ -64,25 +56,10 @@ class TradeThread(threading.Thread): """ try: TelegramHandler.send_msg('*Status:* `trader started`') + logger.info('Trader started') while not _should_stop: try: - # Query trades from persistence layer - trade = Trade.query.filter(Trade.is_open.is_(True)).first() - if not trade: - # Create entity and execute trade - Session.add(create_trade(float(conf['stake_amount']), api_wrapper.exchange)) - continue - - # Check if there is already an open order for this pair - orders = api_wrapper.get_open_orders(trade.pair) - if orders: - msg = 'There is already an open order for this trade. (total: {}, remaining: {}, type: {})'\ - .format(round(orders[0]['amount'], 8), round(orders[0]['remaining'], 8), orders[0]['type']) - logger.info(msg) - elif close_trade_if_fulfilled(trade): - logger.info('No open orders found and close values are set. Marking trade as closed ...') - else: - handle_trade(trade) + self._process() except ValueError: logger.exception('ValueError') finally: @@ -92,9 +69,47 @@ class TradeThread(threading.Thread): TelegramHandler.send_msg('*Status:* Got RuntimeError: ```\n{}\n```'.format(traceback.format_exc())) logger.exception('RuntimeError. Stopping trader ...') finally: - Session.flush() TelegramHandler.send_msg('*Status:* `Trader has stopped`') + @staticmethod + def _process(): + """ + Queries the persistence layer for new trades and handles them + :return: None + """ + # Query trades from persistence layer + trade = Trade.query.filter(Trade.is_open.is_(True)).first() + if not trade: + # Create entity and execute trade + Session.add(create_trade(float(conf['stake_amount']), api_wrapper.exchange)) + return + + # Check if there is already an open order for this trade + orders = api_wrapper.get_open_orders(trade.pair) + orders = [o for o in orders if o['id'] == trade.open_order_id] + if orders: + msg = 'There exists an open order for this trade: (total: {}, remaining: {}, type: {}, id: {})' \ + .format(round(orders[0]['amount'], 8), + round(orders[0]['remaining'], 8), + orders[0]['type'], + orders[0]['id']) + logger.info(msg) + return + + # Update state + trade.open_order_id = None + # Check if this trade can be marked as closed + if close_trade_if_fulfilled(trade): + logger.info('No open orders found and trade is fulfilled. Marking as closed ...') + return + + # Check if we can sell our current pair + handle_trade(trade) + +# Initial stopped TradeThread instance +_instance = TradeThread() +_should_stop = False + def close_trade_if_fulfilled(trade): """ @@ -104,7 +119,7 @@ def close_trade_if_fulfilled(trade): """ # If we don't have an open order and the close rate is already set, # we can close this trade. - if trade.close_profit and trade.close_date and trade.close_rate: + if trade.close_profit and trade.close_date and trade.close_rate and not trade.open_order_id: trade.is_open = False return True return False @@ -112,9 +127,8 @@ def close_trade_if_fulfilled(trade): def handle_trade(trade): """ - Sells the current pair if the threshold is reached - and updates the trade record. - :return: current instance + Sells the current pair if the threshold is reached and updates the trade record. + :return: None """ try: if not trade.is_open: @@ -122,7 +136,7 @@ def handle_trade(trade): logger.debug('Handling open trade {} ...'.format(trade)) # Get current rate - current_rate = api_wrapper.get_ticker(trade.pair)['last'] + current_rate = api_wrapper.get_ticker(trade.pair)['bid'] current_profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) # Get available balance @@ -136,10 +150,11 @@ def handle_trade(trade): if time_diff > duration and current_rate > (1 + threshold) * trade.open_rate: # Execute sell and update trade record - api_wrapper.sell(trade.pair, current_rate, balance) + order_id = api_wrapper.sell(trade.pair, current_rate, balance) trade.close_rate = current_rate trade.close_profit = current_profit trade.close_date = datetime.utcnow() + trade.open_order_id = order_id message = '*{}:* Selling {} at rate `{:f} (profit: {}%)`'.format( trade.exchange.name, @@ -168,40 +183,35 @@ def create_trade(stake_amount: float, exchange): raise ValueError('BTC amount is not fulfilled') # Remove latest trade pair from whitelist - latest_trade = Trade.order_by(Trade.id.desc()).first() + latest_trade = Trade.query.order_by(Trade.id.desc()).first() if latest_trade and latest_trade.pair in whitelist: whitelist.remove(latest_trade.pair) - logger.debug('Ignoring {} in pair whitelist') + logger.debug('Ignoring {} in pair whitelist'.format(latest_trade.pair)) if not whitelist: raise ValueError('No pair in whitelist') # Pick random pair and execute trade idx = random.randint(0, len(whitelist) - 1) pair = whitelist[idx] - open_rate = api_wrapper.get_ticker(pair)['last'] + open_rate = api_wrapper.get_ticker(pair)['ask'] amount = stake_amount / open_rate exchange = exchange - api_wrapper.buy(pair, open_rate, amount) + order_id = api_wrapper.buy(pair, open_rate, amount) - trade = Trade( - pair=pair, - btc_amount=stake_amount, - open_rate=open_rate, - amount=amount, - exchange=exchange, - ) - message = '*{}:* Buying {} at rate `{:f}`'.format( - trade.exchange.name, - trade.pair.replace('_', '/'), - trade.open_rate - ) + # Create trade entity and return + message = '*{}:* Buying {} at rate `{:f}`'.format(exchange.name, pair.replace('_', '/'), open_rate) logger.info(message) TelegramHandler.send_msg(message) - return trade + return Trade(pair=pair, + btc_amount=stake_amount, + open_rate=open_rate, + amount=amount, + exchange=exchange, + open_order_id=order_id) if __name__ == '__main__': logger.info('Starting marginbot {}'.format(__version__)) TelegramHandler.listen() while True: - time.sleep(0.1) + time.sleep(0.5) diff --git a/persistence.py b/persistence.py index efb5fec46..5b38aeefa 100644 --- a/persistence.py +++ b/persistence.py @@ -29,6 +29,7 @@ class Trade(Base): amount = Column(Float, nullable=False) open_date = Column(DateTime, nullable=False, default=datetime.utcnow) close_date = Column(DateTime) + open_order_id = Column(String) def __repr__(self): return 'Trade(id={}, pair={}, amount={}, open_rate={}, open_since={})'.format( diff --git a/requirements.txt b/requirements.txt index 8d0faa1a9..6d1231b7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ SQLAlchemy==1.1.9 python-telegram-bot==5.3.1 arrow==0.10.0 requests==2.14.2 -urllib3==1.20 \ No newline at end of file +urllib3==1.20 +wrapt==1.10.10 \ No newline at end of file diff --git a/rpc/telegram.py b/rpc/telegram.py index 96b85e338..c5562d936 100644 --- a/rpc/telegram.py +++ b/rpc/telegram.py @@ -1,106 +1,68 @@ -import threading - -import arrow +import logging from datetime import timedelta -import logging +import arrow +from telegram.error import NetworkError from telegram.ext import CommandHandler, Updater -from telegram import ParseMode +from telegram import ParseMode, Bot, Update +from wrapt import synchronized -from persistence import Trade +from persistence import Trade, Session from exchange import get_exchange_api from utils import get_conf +# Remove noisy log messages +logging.getLogger('requests.packages.urllib3').setLevel(logging.INFO) +logging.getLogger('telegram').setLevel(logging.INFO) logger = logging.getLogger(__name__) -_lock = threading.Condition() _updater = None conf = get_conf() api_wrapper = get_exchange_api(conf) +def authorized_only(command_handler): + """ + Decorator to check if the message comes from the correct chat_id + :param command_handler: Telegram CommandHandler + :return: decorated function + """ + def wrapper(*args, **kwargs): + bot, update = args[0], args[1] + if not isinstance(bot, Bot) or not isinstance(update, Update): + raise ValueError('Received invalid Arguments: {}'.format(*args)) + + chat_id = int(conf['telegram']['chat_id']) + if int(update.message.chat_id) == chat_id: + logger.info('Executing handler: {} for chat_id: {}'.format(command_handler.__name__, chat_id)) + return command_handler(*args, **kwargs) + else: + logger.info('Rejected unauthorized message from: {}'.format(update.message.chat_id)) + return wrapper + + class TelegramHandler(object): @staticmethod - def get_updater(conf): - """ - Returns the current telegram updater instantiates a new one - :param conf: - :return: telegram.ext.Updater - """ - global _updater - _lock.acquire() - if not _updater: - _updater = Updater(token=conf['telegram']['token'], workers=0) - _lock.release() - return _updater - - @staticmethod - def listen(): - """ - Registers all known command handlers and starts polling for message updates - :return: None - """ - # Register command handler and start telegram message polling - handles = [CommandHandler('status', TelegramHandler._status), - CommandHandler('profit', TelegramHandler._profit), - CommandHandler('start', TelegramHandler._start), - CommandHandler('stop', TelegramHandler._stop)] - for handle in handles: - TelegramHandler.get_updater(conf).dispatcher.add_handler(handle) - TelegramHandler.get_updater(conf).start_polling(clean=True, bootstrap_retries=3) - - @staticmethod - def _is_correct_scope(update): - """ - Checks if it is save to process the given update - :param update: - :return: True if valid else False - """ - # Only answer to our chat - return int(update.message.chat_id) == int(conf['telegram']['chat_id']) - - @staticmethod - def send_msg(markdown_message, bot=None): - """ - Send given markdown message - :param markdown_message: message - :param bot: alternative bot - :return: None - """ - if conf['telegram'].get('enabled', False): - bot = bot or TelegramHandler.get_updater(conf).bot - try: - bot.send_message( - chat_id=conf['telegram']['chat_id'], - text=markdown_message, - parse_mode=ParseMode.MARKDOWN, - ) - except Exception: - logger.exception('Exception occurred within telegram api') - - @staticmethod + @authorized_only def _status(bot, update): """ - Handler for /status + Handler for /status. + Returns the current TradeThread status :param bot: telegram bot :param update: message update :return: None """ - if not TelegramHandler._is_correct_scope(update): - return - # Fetch open trade trade = Trade.query.filter(Trade.is_open.is_(True)).first() - - from main import TradeThread - if not TradeThread.get_instance().is_alive(): + from main import get_instance + if not get_instance().is_alive(): message = '*Status:* `trader stopped`' elif not trade: message = '*Status:* `no active order`' else: # calculate profit and send message to user - current_rate = api_wrapper.get_ticker(trade.pair)['last'] + current_rate = api_wrapper.get_ticker(trade.pair)['bid'] current_profit = 100 * ((current_rate - trade.open_rate) / trade.open_rate) open_orders = api_wrapper.get_open_orders(trade.pair) order = open_orders[0] if open_orders else None @@ -123,23 +85,20 @@ class TelegramHandler(object): amount=round(trade.amount, 8), close_profit='{}%'.format(round(trade.close_profit, 2)) if trade.close_profit else None, current_profit=round(current_profit, 2), - open_order='{} ({})'.format( - order['remaining'], - order['type'] - ) if order else None, + open_order='{} ({})'.format(order['remaining'], order['type']) if order else None, ) TelegramHandler.send_msg(message, bot=bot) @staticmethod + @authorized_only def _profit(bot, update): """ - Handler for /profit + Handler for /profit. + Returns a cumulative profit statistics. :param bot: telegram bot :param update: message update :return: None """ - if not TelegramHandler._is_correct_scope(update): - return trades = Trade.query.filter(Trade.is_open.is_(False)).all() trade_count = len(trades) profit_amount = sum((t.close_profit / 100) * t.btc_amount for t in trades) @@ -147,9 +106,7 @@ class TelegramHandler(object): avg_stake_amount = sum(t.btc_amount for t in trades) / float(trade_count) durations_hours = [(t.close_date - t.open_date).total_seconds() / 3600.0 for t in trades] avg_duration = sum(durations_hours) / float(len(durations_hours)) - markdown_msg = """ -*Total Balance:* `{total_amount} BTC` *Total Profit:* `{profit_btc} BTC ({profit}%)` *Trade Count:* `{trade_count}` *First Action:* `{first_trade_date}` @@ -157,7 +114,6 @@ class TelegramHandler(object): *Avg. Stake Amount:* `{avg_open_amount} BTC` *Avg. Duration:* `{avg_duration}` """.format( - total_amount=round(api_wrapper.get_balance('BTC'), 8), profit_btc=round(profit_amount, 8), profit=round(profit, 2), trade_count=trade_count, @@ -169,34 +125,114 @@ class TelegramHandler(object): TelegramHandler.send_msg(markdown_msg, bot=bot) @staticmethod + @authorized_only def _start(bot, update): """ - Handler for /start + Handler for /start. + Starts TradeThread :param bot: telegram bot :param update: message update :return: None """ - if not TelegramHandler._is_correct_scope(update): - return - from main import TradeThread - if TradeThread.get_instance().is_alive(): + from main import get_instance + if get_instance().is_alive(): TelegramHandler.send_msg('*Status:* `already running`', bot=bot) - return else: - TradeThread.get_instance(recreate=True).start() + get_instance(recreate=True).start() @staticmethod + @authorized_only def _stop(bot, update): """ - Handler for /stop + Handler for /stop. + Stops TradeThread :param bot: telegram bot :param update: message update :return: None """ - if not TelegramHandler._is_correct_scope(update): - return - from main import TradeThread - if TradeThread.get_instance().is_alive(): - TradeThread.stop() + from main import get_instance, stop_instance + if get_instance().is_alive(): + TelegramHandler.send_msg('`Stopping trader ...`', bot=bot) + stop_instance() else: TelegramHandler.send_msg('*Status:* `already stopped`', bot=bot) + + @staticmethod + @authorized_only + def _cancel(bot, update): + """ + Handler for /cancel. + Cancels the open order for the current Trade. + :param bot: telegram bot + :param update: message update + :return: None + """ + trade = Trade.query.filter(Trade.is_open.is_(True)).first() + if not trade: + TelegramHandler.send_msg('`There is no open trade`') + return + + order_id = trade.open_order_id + if not order_id: + TelegramHandler.send_msg('`There is no open order`') + return + + api_wrapper.cancel_order(order_id) + trade.open_order_id = None + trade.close_rate = None + trade.close_date = None + trade.close_profit = None + Session.flush() + TelegramHandler.send_msg('*Order cancelled:* `{}`'.format(order_id), bot=bot) + logger.info('Order cancelled: (order_id: {})'.format(order_id)) + + @staticmethod + @synchronized + def get_updater(conf): + """ + Returns the current telegram updater instantiates a new one + :param conf: + :return: telegram.ext.Updater + """ + global _updater + if not _updater: + _updater = Updater(token=conf['telegram']['token'], workers=0) + return _updater + + @staticmethod + def listen(): + """ + Registers all known command handlers and starts polling for message updates + :return: None + """ + # Register command handler and start telegram message polling + handles = [ + CommandHandler('status', TelegramHandler._status), + CommandHandler('profit', TelegramHandler._profit), + CommandHandler('start', TelegramHandler._start), + CommandHandler('stop', TelegramHandler._stop), + CommandHandler('cancel', TelegramHandler._cancel), + ] + for handle in handles: + TelegramHandler.get_updater(conf).dispatcher.add_handler(handle) + TelegramHandler.get_updater(conf).start_polling(clean=True, bootstrap_retries=3) + logger.info('TelegramHandler is listening for following commands: {}' + .format([h.command for h in handles])) + + @staticmethod + def send_msg(msg, bot=None): + """ + Send given markdown message + :param msg: message + :param bot: alternative bot + :return: None + """ + if conf['telegram'].get('enabled', False): + bot = bot or TelegramHandler.get_updater(conf).bot + try: + bot.send_message(conf['telegram']['chat_id'], msg, parse_mode=ParseMode.MARKDOWN) + except NetworkError as e: + logger.warning('Got Telegram NetworkError: {}! trying one more time'.format(e.message)) + bot.send_message(conf['telegram']['chat_id'], msg, parse_mode=ParseMode.MARKDOWN) + except Exception: + logger.exception('Exception occurred within Telegram API') diff --git a/utils.py b/utils.py index c229c22b8..684d16aea 100644 --- a/utils.py +++ b/utils.py @@ -1,22 +1,25 @@ import json import logging +from wrapt import synchronized + logger = logging.getLogger(__name__) -_CUR_CONF = None +_cur_conf = None -def get_conf(): +@synchronized +def get_conf(filename='config.json'): """ Loads the config into memory and returns the instance of it :return: dict """ - global _CUR_CONF - if not _CUR_CONF: - with open('config.json') as fp: - _CUR_CONF = json.load(fp) - validate_conf(_CUR_CONF) - return _CUR_CONF + global _cur_conf + if not _cur_conf: + with open(filename) as fp: + _cur_conf = json.load(fp) + validate_conf(_cur_conf) + return _cur_conf def validate_conf(conf):