stable/freqtrade/rpc/api_server/deps.py

52 lines
1.3 KiB
Python
Raw Normal View History

from typing import Any, Dict, Iterator, Optional
2021-01-02 11:54:40 +00:00
2022-01-22 06:11:59 +00:00
from fastapi import Depends
from freqtrade.enums import RunMode
from freqtrade.persistence import Trade
2021-01-02 12:12:49 +00:00
from freqtrade.rpc.rpc import RPC, RPCException
2021-01-02 11:54:40 +00:00
from .webserver import ApiServer
2021-01-02 12:12:49 +00:00
def get_rpc_optional() -> Optional[RPC]:
if ApiServer._has_rpc:
return ApiServer._rpc
return None
def get_rpc() -> Optional[Iterator[RPC]]:
2021-01-02 12:12:49 +00:00
_rpc = get_rpc_optional()
if _rpc:
Trade.rollback()
yield _rpc
Trade.rollback()
2021-01-02 12:12:49 +00:00
else:
raise RPCException('Bot is not in the correct state')
2021-01-02 11:54:40 +00:00
def get_config() -> Dict[str, Any]:
return ApiServer._config
2020-12-27 14:24:49 +00:00
2021-01-02 11:54:40 +00:00
def get_api_config() -> Dict[str, Any]:
2020-12-27 14:24:49 +00:00
return ApiServer._config['api_server']
2022-01-22 06:11:59 +00:00
def get_exchange(config=Depends(get_config)):
if not ApiServer._exchange:
from freqtrade.resolvers import ExchangeResolver
ApiServer._exchange = ExchangeResolver.load_exchange(
config['exchange']['name'], config, load_leverage_tiers=False)
2022-01-22 06:11:59 +00:00
return ApiServer._exchange
2022-11-15 03:27:45 +00:00
def get_message_stream():
return ApiServer._message_stream
def is_webserver_mode(config=Depends(get_config)):
if config['runmode'] != RunMode.WEBSERVER:
raise RPCException('Bot is not in the correct state')
return None