This module contains the class to persist trades into SQLite
import logging
import threading
from contextvars import ContextVar
from typing import Any, Dict, Final, Optional
from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import NoSuchModuleError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import StaticPool
from freqtrade.exceptions import OperationalException
from freqtrade.persistence.base import ModelBase
from freqtrade.persistence.key_value_store import _KeyValueStoreModel
from freqtrade.persistence.migrations import check_migrate
from freqtrade.persistence.pairlock import PairLock
from freqtrade.persistence.trade_model import Order, Trade
logger = logging.getLogger(__name__)
REQUEST_ID_CTX_KEY: Final[str] = 'request_id'
_request_id_ctx_var: ContextVar[Optional[str]] = ContextVar(REQUEST_ID_CTX_KEY, default=None)
def get_request_or_thread_id() -> Optional[str]:
Helper method to get either async context (for fastapi requests), or thread id
id = _request_id_ctx_var.get()
if id is None:
# when not in request context - use thread id
id = str(threading.current_thread().ident)
return id
def init_db(db_url: str) -> None:
Initializes this module with the given config,
registers all known command handlers
and starts polling for message updates
:param db_url: Database to use
:return: None
kwargs: Dict[str, Any] = {}
if db_url == 'sqlite:///':
raise OperationalException(
f'Bad db-url {db_url}. For in-memory database, please use `sqlite://`.')
if db_url == 'sqlite://':
'poolclass': StaticPool,
# Take care of thread ownership
if db_url.startswith('sqlite://'):
'connect_args': {'check_same_thread': False},
engine = create_engine(db_url, future=True, **kwargs)
except NoSuchModuleError:
raise OperationalException(f"Given value for db_url: '{db_url}' "
f"is no valid database URL! (See {_SQL_DOCS_URL})")
# Scoped sessions proxy requests to the appropriate thread-local session.
# Since we also use fastAPI, we need to make it aware of the request id, too
Trade.session = scoped_session(sessionmaker(
bind=engine, autoflush=False), scopefunc=get_request_or_thread_id)
Order.session = Trade.session
PairLock.session = Trade.session
_KeyValueStoreModel.session = Trade.session
previous_tables = inspect(engine).get_table_names()
check_migrate(engine, decl_base=ModelBase, previous_tables=previous_tables)