Implement /logs endpoints in telegram and restAPI

This commit is contained in:
Matthias
2020-08-14 15:44:36 +02:00
parent b989ba0f82
commit 5f79caa307
5 changed files with 70 additions and 4 deletions

View File

@@ -186,6 +186,7 @@ class ApiServer(RPC):
self.app.add_url_rule(f'{BASE_URI}/count', 'count', view_func=self._count, methods=['GET'])
self.app.add_url_rule(f'{BASE_URI}/daily', 'daily', view_func=self._daily, methods=['GET'])
self.app.add_url_rule(f'{BASE_URI}/edge', 'edge', view_func=self._edge, methods=['GET'])
self.app.add_url_rule(f'{BASE_URI}/logs', 'log', view_func=self._get_logs, methods=['GET'])
self.app.add_url_rule(f'{BASE_URI}/profit', 'profit',
view_func=self._profit, methods=['GET'])
self.app.add_url_rule(f'{BASE_URI}/performance', 'performance',
@@ -348,6 +349,18 @@ class ApiServer(RPC):
return self.rest_dump(stats)
@require_login
@rpc_catch_errors
def _get_logs(self):
"""
Returns latest logs
get:
param:
limit: Only get a certain number of records
"""
limit = int(request.args.get('limit', 0)) or None
return self.rest_dump(self._rpc_get_logs(limit))
@require_login
@rpc_catch_errors
def _edge(self):

View File

@@ -11,9 +11,9 @@ from typing import Any, Dict, List, Optional, Tuple, Union
import arrow
from numpy import NAN, mean
from freqtrade.exceptions import (ExchangeError,
PricingError)
from freqtrade.exceptions import ExchangeError, PricingError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs
from freqtrade.loggers import bufferHandler
from freqtrade.misc import shorten_date
from freqtrade.persistence import Trade
from freqtrade.rpc.fiat_convert import CryptoToFiatConverter
@@ -633,6 +633,24 @@ class RPC:
}
return res
def _rpc_get_logs(self, limit: Optional[int]) -> Dict[str, List]:
"""Returns the last X logs"""
if limit:
buffer = bufferHandler.buffer[-limit:]
else:
buffer = bufferHandler.buffer
records = [[r.asctime, r.created, r.name, r.levelname, r.message] for r in buffer]
return {'log_count': len(records), 'logs': records}
def _rpc_get_logs_as_string(self, limit: Optional[int]) -> Dict[str, List]:
"""Returns the last X logs"""
if limit:
buffer = bufferHandler.buffer[-limit:]
else:
buffer = bufferHandler.buffer
return [bufferHandler.format(r) for r in buffer]
def _rpc_edge(self) -> List[Dict[str, Any]]:
""" Returns information related to Edge """
if not self._freqtrade.edge:

View File

@@ -103,6 +103,7 @@ class Telegram(RPC):
CommandHandler('stopbuy', self._stopbuy),
CommandHandler('whitelist', self._whitelist),
CommandHandler('blacklist', self._blacklist),
CommandHandler('logs', self._logs),
CommandHandler('edge', self._edge),
CommandHandler('help', self._help),
CommandHandler('version', self._version),
@@ -637,6 +638,34 @@ class Telegram(RPC):
except RPCException as e:
self._send_msg(str(e))
@authorized_only
def _logs(self, update: Update, context: CallbackContext) -> None:
"""
Handler for /logs
Shows the latest logs
"""
try:
try:
limit = int(context.args[0])
except (TypeError, ValueError, IndexError):
limit = 10
logs = self._rpc_get_logs_as_string(limit)
msg = ''
message_container = "<pre>{}</pre>"
for logrec in logs:
if len(msg + logrec) + 10 >= MAX_TELEGRAM_MESSAGE_LENGTH:
# Send message immediately if it would become too long
self._send_msg(message_container.format(msg), parse_mode=ParseMode.HTML)
msg = logrec + '\n'
else:
# Append message to messages to send
msg += logrec + '\n'
if msg:
self._send_msg(message_container.format(msg), parse_mode=ParseMode.HTML)
except RPCException as e:
self._send_msg(str(e))
@authorized_only
def _edge(self, update: Update, context: CallbackContext) -> None:
"""
@@ -682,6 +711,7 @@ class Telegram(RPC):
"*/stopbuy:* `Stops buying, but handles open trades gracefully` \n"
"*/reload_config:* `Reload configuration file` \n"
"*/show_config:* `Show running configuration` \n"
"*/logs [limit]:* `Show latest logs - defaults to 10` \n"
"*/whitelist:* `Show current whitelist` \n"
"*/blacklist [pair]:* `Show current blacklist, or adds one or more pairs "
"to the blacklist.` \n"