telegram: Adding forcebuy inline keyboard

This commit is contained in:
Gonzalo Matheu
2021-04-10 20:13:04 -03:00
parent e2f28991e6
commit 50bdae8eb2
2 changed files with 78 additions and 12 deletions

View File

@@ -12,9 +12,9 @@ from typing import Any, Callable, Dict, List, Union
import arrow
from tabulate import tabulate
from telegram import KeyboardButton, ParseMode, ReplyKeyboardMarkup, Update
from telegram import KeyboardButton, ParseMode, ReplyKeyboardMarkup, Update, BotCommand, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.error import NetworkError, TelegramError
from telegram.ext import CallbackContext, CommandHandler, Updater
from telegram.ext import CallbackContext, CommandHandler, Updater, CallbackQueryHandler
from telegram.utils.helpers import escape_markdown
from freqtrade.__init__ import __version__
@@ -169,6 +169,11 @@ class Telegram(RPCHandler):
[h.command for h in handles]
)
self._current_callback_query_handler = None
self._callback_query_handlers = {
'forcebuy': CallbackQueryHandler(self._forcebuy_inline)
}
def cleanup(self) -> None:
"""
Stops all running telegram threads.
@@ -610,6 +615,24 @@ class Telegram(RPCHandler):
except RPCException as e:
self._send_msg(str(e))
def _forcebuy_action(self, pair, price = None):
try:
self._rpc._rpc_forcebuy(pair, price)
except RPCException as e:
self._send_msg(str(e))
def _forcebuy_inline(self, update: Update, _: CallbackContext) -> None:
query = update.callback_query
pair = query.data
query.answer()
query.edit_message_text(text=f"Force Buying: {pair}")
self._forcebuy_action(pair)
@staticmethod
def _layout_inline_keyboard(buttons: List[InlineKeyboardButton],
cols=3) -> List[List[InlineKeyboardButton]]:
return [buttons[i:i + cols] for i in range(0, len(buttons), cols)]
@authorized_only
def _forcebuy(self, update: Update, context: CallbackContext) -> None:
"""
@@ -622,16 +645,13 @@ class Telegram(RPCHandler):
if context.args:
pair = context.args[0]
price = float(context.args[1]) if len(context.args) > 1 else None
try:
self._rpc._rpc_forcebuy(pair, price)
except RPCException as e:
self._send_msg(str(e))
self._forcebuy_action(pair, price)
else:
whitelist = self._rpc._rpc_whitelist()['whitelist']
pairs_keyboard: List[List[Union[str, KeyboardButton]]] = [
[f'/forcebuy {pair}' for pair in whitelist]
]
self._send_msg("Which pair?", keyboard=pairs_keyboard)
pairs = [InlineKeyboardButton(pair, callback_data=pair) for pair in whitelist]
self._send_inline_msg("Which pair?",
keyboard=self._layout_inline_keyboard(pairs),
callback_query_handler='forcebuy')
@authorized_only
def _trades(self, update: Update, context: CallbackContext) -> None:
@@ -947,9 +967,27 @@ class Telegram(RPCHandler):
f"*Current state:* `{val['state']}`"
)
def _send_inline_msg(self, msg: str, callback_query_handler,
parse_mode: str = ParseMode.MARKDOWN, disable_notification: bool = False,
keyboard: List[List[InlineKeyboardButton]] = None, ) -> None:
"""
Send given markdown message
:param msg: message
:param bot: alternative bot
:param parse_mode: telegram parse mode
:return: None
"""
if self._current_callback_query_handler:
self._updater.dispatcher.remove_handler(self._current_callback_query_handler)
self._current_callback_query_handler = self._callback_query_handlers[callback_query_handler]
self._updater.dispatcher.add_handler(self._current_callback_query_handler)
self._send_msg(msg, parse_mode, disable_notification, keyboard, reply_markup=InlineKeyboardMarkup)
def _send_msg(self, msg: str, parse_mode: str = ParseMode.MARKDOWN,
disable_notification: bool = False,
keyboard: List[List[Union[str, KeyboardButton]]] = None) -> None:
keyboard: List[List[Union[str, KeyboardButton]]] = None,
reply_markup=ReplyKeyboardMarkup) -> None:
"""
Send given markdown message
:param msg: message
@@ -959,7 +997,7 @@ class Telegram(RPCHandler):
"""
if keyboard is None:
keyboard = self._keyboard
reply_markup = ReplyKeyboardMarkup(keyboard, resize_keyboard=True)
reply_markup = reply_markup(keyboard, resize_keyboard=True)
try:
try:
self._updater.bot.send_message(