Merge pull request #4682 from freqtrade/sqlalchemy_14

Sqlalchemy 14 preparations
This commit is contained in:
Matthias 2021-05-27 14:19:25 +01:00 committed by GitHub
commit 377352fced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 94 deletions

View File

@ -187,7 +187,7 @@ class FreqtradeBot(LoggingMixin):
if self.get_free_open_trades(): if self.get_free_open_trades():
self.enter_positions() self.enter_positions()
Trade.query.session.flush() Trade.commit()
def process_stopped(self) -> None: def process_stopped(self) -> None:
""" """
@ -620,7 +620,7 @@ class FreqtradeBot(LoggingMixin):
self.update_trade_state(trade, order_id, order) self.update_trade_state(trade, order_id, order)
Trade.query.session.add(trade) Trade.query.session.add(trade)
Trade.query.session.flush() Trade.commit()
# Updating wallets # Updating wallets
self.wallets.update() self.wallets.update()
@ -706,6 +706,7 @@ class FreqtradeBot(LoggingMixin):
if (self.strategy.order_types.get('stoploss_on_exchange') and if (self.strategy.order_types.get('stoploss_on_exchange') and
self.handle_stoploss_on_exchange(trade)): self.handle_stoploss_on_exchange(trade)):
trades_closed += 1 trades_closed += 1
Trade.commit()
continue continue
# Check if we can sell our current pair # Check if we can sell our current pair
if trade.open_order_id is None and trade.is_open and self.handle_trade(trade): if trade.open_order_id is None and trade.is_open and self.handle_trade(trade):
@ -1036,6 +1037,7 @@ class FreqtradeBot(LoggingMixin):
elif order['side'] == 'sell': elif order['side'] == 'sell':
self.handle_cancel_sell(trade, order, constants.CANCEL_REASON['ALL_CANCELLED']) self.handle_cancel_sell(trade, order, constants.CANCEL_REASON['ALL_CANCELLED'])
Trade.commit()
def handle_cancel_buy(self, trade: Trade, order: Dict, reason: str) -> bool: def handle_cancel_buy(self, trade: Trade, order: Dict, reason: str) -> bool:
""" """
@ -1233,7 +1235,7 @@ class FreqtradeBot(LoggingMixin):
# In case of market sell orders the order can be closed immediately # In case of market sell orders the order can be closed immediately
if order.get('status', 'unknown') == 'closed': if order.get('status', 'unknown') == 'closed':
self.update_trade_state(trade, trade.open_order_id, order) self.update_trade_state(trade, trade.open_order_id, order)
Trade.query.session.flush() Trade.commit()
# Lock pair for one candle to prevent immediate re-buys # Lock pair for one candle to prevent immediate re-buys
self.strategy.lock_pair(trade.pair, datetime.now(timezone.utc), self.strategy.lock_pair(trade.pair, datetime.now(timezone.utc),
@ -1374,6 +1376,7 @@ class FreqtradeBot(LoggingMixin):
# Handling of this will happen in check_handle_timeout. # Handling of this will happen in check_handle_timeout.
return True return True
trade.update(order) trade.update(order)
Trade.commit()
# Updating wallets when order is closed # Updating wallets when order is closed
if not trade.is_open: if not trade.is_open:

View File

@ -1,7 +1,7 @@
import logging import logging
from typing import List from typing import List
from sqlalchemy import inspect from sqlalchemy import inspect, text
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,15 +62,17 @@ def migrate_trades_table(decl_base, inspector, engine, table_back_name: str, col
amount_requested = get_column_def(cols, 'amount_requested', 'amount') amount_requested = get_column_def(cols, 'amount_requested', 'amount')
# Schema migration necessary # Schema migration necessary
engine.execute(f"alter table trades rename to {table_back_name}") with engine.begin() as connection:
# drop indexes on backup table connection.execute(text(f"alter table trades rename to {table_back_name}"))
for index in inspector.get_indexes(table_back_name): # drop indexes on backup table
engine.execute(f"drop index {index['name']}") for index in inspector.get_indexes(table_back_name):
connection.execute(text(f"drop index {index['name']}"))
# let SQLAlchemy create the schema as required # let SQLAlchemy create the schema as required
decl_base.metadata.create_all(engine) decl_base.metadata.create_all(engine)
# Copy data back - following the correct schema # Copy data back - following the correct schema
engine.execute(f"""insert into trades with engine.begin() as connection:
connection.execute(text(f"""insert into trades
(id, exchange, pair, is_open, (id, exchange, pair, is_open,
fee_open, fee_open_cost, fee_open_currency, fee_open, fee_open_cost, fee_open_currency,
fee_close, fee_close_cost, fee_open_currency, open_rate, fee_close, fee_close_cost, fee_open_currency, open_rate,
@ -104,11 +106,12 @@ def migrate_trades_table(decl_base, inspector, engine, table_back_name: str, col
{strategy} strategy, {timeframe} timeframe, {strategy} strategy, {timeframe} timeframe,
{open_trade_value} open_trade_value, {close_profit_abs} close_profit_abs {open_trade_value} open_trade_value, {close_profit_abs} close_profit_abs
from {table_back_name} from {table_back_name}
""") """))
def migrate_open_orders_to_trades(engine): def migrate_open_orders_to_trades(engine):
engine.execute(""" with engine.begin() as connection:
connection.execute(text("""
insert into orders (ft_trade_id, ft_pair, order_id, ft_order_side, ft_is_open) insert into orders (ft_trade_id, ft_pair, order_id, ft_order_side, ft_is_open)
select id ft_trade_id, pair ft_pair, open_order_id, select id ft_trade_id, pair ft_pair, open_order_id,
case when close_rate_requested is null then 'buy' case when close_rate_requested is null then 'buy'
@ -120,28 +123,30 @@ def migrate_open_orders_to_trades(engine):
'stoploss' ft_order_side, 1 ft_is_open 'stoploss' ft_order_side, 1 ft_is_open
from trades from trades
where stoploss_order_id is not null where stoploss_order_id is not null
""") """))
def migrate_orders_table(decl_base, inspector, engine, table_back_name: str, cols: List): def migrate_orders_table(decl_base, inspector, engine, table_back_name: str, cols: List):
# Schema migration necessary # Schema migration necessary
engine.execute(f"alter table orders rename to {table_back_name}")
# drop indexes on backup table with engine.begin() as connection:
for index in inspector.get_indexes(table_back_name): connection.execute(text(f"alter table orders rename to {table_back_name}"))
engine.execute(f"drop index {index['name']}") # drop indexes on backup table
for index in inspector.get_indexes(table_back_name):
connection.execute(text(f"drop index {index['name']}"))
# let SQLAlchemy create the schema as required # let SQLAlchemy create the schema as required
decl_base.metadata.create_all(engine) decl_base.metadata.create_all(engine)
with engine.begin() as connection:
engine.execute(f""" connection.execute(text(f"""
insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id,
symbol, order_type, side, price, amount, filled, average, remaining, cost, order_date, status, symbol, order_type, side, price, amount, filled, average, remaining, cost,
order_filled_date, order_update_date) order_date, order_filled_date, order_update_date)
select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id,
symbol, order_type, side, price, amount, filled, null average, remaining, cost, order_date, status, symbol, order_type, side, price, amount, filled, null average, remaining, cost,
order_filled_date, order_update_date order_date, order_filled_date, order_update_date
from {table_back_name} from {table_back_name}
""") """))
def check_migrate(engine, decl_base, previous_tables) -> None: def check_migrate(engine, decl_base, previous_tables) -> None:

View File

@ -9,10 +9,7 @@ from typing import Any, Dict, List, Optional
from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer, String, from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer, String,
create_engine, desc, func, inspect) create_engine, desc, func, inspect)
from sqlalchemy.exc import NoSuchModuleError from sqlalchemy.exc import NoSuchModuleError
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Query, declarative_base, relationship, scoped_session, sessionmaker
from sqlalchemy.orm import Query, relationship
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.pool import StaticPool from sqlalchemy.pool import StaticPool
from sqlalchemy.sql.schema import UniqueConstraint from sqlalchemy.sql.schema import UniqueConstraint
@ -41,16 +38,18 @@ def init_db(db_url: str, clean_open_orders: bool = False) -> None:
""" """
kwargs = {} kwargs = {}
# Take care of thread ownership if in-memory db
if db_url == 'sqlite://': if db_url == 'sqlite://':
kwargs.update({ kwargs.update({
'connect_args': {'check_same_thread': False},
'poolclass': StaticPool, 'poolclass': StaticPool,
'echo': False, })
# Take care of thread ownership
if db_url.startswith('sqlite://'):
kwargs.update({
'connect_args': {'check_same_thread': False},
}) })
try: try:
engine = create_engine(db_url, **kwargs) engine = create_engine(db_url, future=True, **kwargs)
except NoSuchModuleError: except NoSuchModuleError:
raise OperationalException(f"Given value for db_url: '{db_url}' " raise OperationalException(f"Given value for db_url: '{db_url}' "
f"is no valid database URL! (See {_SQL_DOCS_URL})") f"is no valid database URL! (See {_SQL_DOCS_URL})")
@ -58,7 +57,7 @@ def init_db(db_url: str, clean_open_orders: bool = False) -> None:
# https://docs.sqlalchemy.org/en/13/orm/contextual.html#thread-local-scope # https://docs.sqlalchemy.org/en/13/orm/contextual.html#thread-local-scope
# Scoped sessions proxy requests to the appropriate thread-local session. # Scoped sessions proxy requests to the appropriate thread-local session.
# We should use the scoped_session object - not a seperately initialized version # We should use the scoped_session object - not a seperately initialized version
Trade._session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True)) Trade._session = scoped_session(sessionmaker(bind=engine, autoflush=True))
Trade.query = Trade._session.query_property() Trade.query = Trade._session.query_property()
Order.query = Trade._session.query_property() Order.query = Trade._session.query_property()
PairLock.query = Trade._session.query_property() PairLock.query = Trade._session.query_property()
@ -77,7 +76,7 @@ def cleanup_db() -> None:
Flushes all pending operations to disk. Flushes all pending operations to disk.
:return: None :return: None
""" """
Trade.query.session.flush() Trade.commit()
def clean_dry_run_db() -> None: def clean_dry_run_db() -> None:
@ -89,6 +88,7 @@ def clean_dry_run_db() -> None:
# Check we are updating only a dry_run order not a prod one # Check we are updating only a dry_run order not a prod one
if 'dry_run' in trade.open_order_id: if 'dry_run' in trade.open_order_id:
trade.open_order_id = None trade.open_order_id = None
Trade.commit()
class Order(_DECL_BASE): class Order(_DECL_BASE):
@ -177,6 +177,7 @@ class Order(_DECL_BASE):
if filtered_orders: if filtered_orders:
oobj = filtered_orders[0] oobj = filtered_orders[0]
oobj.update_from_ccxt_object(order) oobj.update_from_ccxt_object(order)
Order.query.session.commit()
else: else:
logger.warning(f"Did not find order for {order}.") logger.warning(f"Did not find order for {order}.")
@ -712,7 +713,11 @@ class Trade(_DECL_BASE, LocalTrade):
Order.query.session.delete(order) Order.query.session.delete(order)
Trade.query.session.delete(self) Trade.query.session.delete(self)
Trade.query.session.flush() Trade.commit()
@staticmethod
def commit():
Trade.query.session.commit()
@staticmethod @staticmethod
def get_trades_proxy(*, pair: str = None, is_open: bool = None, def get_trades_proxy(*, pair: str = None, is_open: bool = None,

View File

@ -49,7 +49,7 @@ class PairLocks():
) )
if PairLocks.use_db: if PairLocks.use_db:
PairLock.query.session.add(lock) PairLock.query.session.add(lock)
PairLock.query.session.flush() PairLock.query.session.commit()
else: else:
PairLocks.locks.append(lock) PairLocks.locks.append(lock)
@ -99,7 +99,7 @@ class PairLocks():
for lock in locks: for lock in locks:
lock.active = False lock.active = False
if PairLocks.use_db: if PairLocks.use_db:
PairLock.query.session.flush() PairLock.query.session.commit()
@staticmethod @staticmethod
def is_global_lock(now: Optional[datetime] = None) -> bool: def is_global_lock(now: Optional[datetime] = None) -> bool:

View File

@ -569,7 +569,7 @@ class RPC:
# Execute sell for all open orders # Execute sell for all open orders
for trade in Trade.get_open_trades(): for trade in Trade.get_open_trades():
_exec_forcesell(trade) _exec_forcesell(trade)
Trade.query.session.flush() Trade.commit()
self._freqtrade.wallets.update() self._freqtrade.wallets.update()
return {'result': 'Created sell orders for all open trades.'} return {'result': 'Created sell orders for all open trades.'}
@ -582,7 +582,7 @@ class RPC:
raise RPCException('invalid argument') raise RPCException('invalid argument')
_exec_forcesell(trade) _exec_forcesell(trade)
Trade.query.session.flush() Trade.commit()
self._freqtrade.wallets.update() self._freqtrade.wallets.update()
return {'result': f'Created sell order for trade {trade_id}.'} return {'result': f'Created sell order for trade {trade_id}.'}
@ -615,6 +615,7 @@ class RPC:
# execute buy # execute buy
if self._freqtrade.execute_buy(pair, stakeamount, price, forcebuy=True): if self._freqtrade.execute_buy(pair, stakeamount, price, forcebuy=True):
Trade.commit()
trade = Trade.get_trades([Trade.is_open.is_(True), Trade.pair == pair]).first() trade = Trade.get_trades([Trade.is_open.is_(True), Trade.pair == pair]).first()
return trade return trade
else: else:
@ -705,8 +706,7 @@ class RPC:
lock.active = False lock.active = False
lock.lock_end_time = datetime.now(timezone.utc) lock.lock_end_time = datetime.now(timezone.utc)
# session is always the same PairLock.query.session.commit()
PairLock.query.session.flush()
return self._rpc_locks() return self._rpc_locks()

View File

@ -7,7 +7,7 @@ from unittest.mock import MagicMock
import arrow import arrow
import pytest import pytest
from sqlalchemy import create_engine, inspect from sqlalchemy import create_engine, inspect, text
from freqtrade import constants from freqtrade import constants
from freqtrade.exceptions import DependencyException, OperationalException from freqtrade.exceptions import DependencyException, OperationalException
@ -486,9 +486,10 @@ def test_migrate_old(mocker, default_conf, fee):
mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine) mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine)
# Create table using the old format # Create table using the old format
engine.execute(create_table_old) with engine.begin() as connection:
engine.execute(insert_table_old) connection.execute(text(create_table_old))
engine.execute(insert_table_old2) connection.execute(text(insert_table_old))
connection.execute(text(insert_table_old2))
# Run init to test migration # Run init to test migration
init_db(default_conf['db_url'], default_conf['dry_run']) init_db(default_conf['db_url'], default_conf['dry_run'])
@ -579,15 +580,16 @@ def test_migrate_new(mocker, default_conf, fee, caplog):
mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine) mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine)
# Create table using the old format # Create table using the old format
engine.execute(create_table_old) with engine.begin() as connection:
engine.execute("create index ix_trades_is_open on trades(is_open)") connection.execute(text(create_table_old))
engine.execute("create index ix_trades_pair on trades(pair)") connection.execute(text("create index ix_trades_is_open on trades(is_open)"))
engine.execute(insert_table_old) connection.execute(text("create index ix_trades_pair on trades(pair)"))
connection.execute(text(insert_table_old))
# fake previous backup # fake previous backup
engine.execute("create table trades_bak as select * from trades") connection.execute(text("create table trades_bak as select * from trades"))
engine.execute("create table trades_bak1 as select * from trades") connection.execute(text("create table trades_bak1 as select * from trades"))
# Run init to test migration # Run init to test migration
init_db(default_conf['db_url'], default_conf['dry_run']) init_db(default_conf['db_url'], default_conf['dry_run'])
@ -629,47 +631,49 @@ def test_migrate_new(mocker, default_conf, fee, caplog):
caplog.clear() caplog.clear()
# Drop latest column # Drop latest column
engine.execute("alter table orders rename to orders_bak") with engine.begin() as connection:
connection.execute(text("alter table orders rename to orders_bak"))
inspector = inspect(engine) inspector = inspect(engine)
for index in inspector.get_indexes('orders_bak'): with engine.begin() as connection:
engine.execute(f"drop index {index['name']}") for index in inspector.get_indexes('orders_bak'):
# Recreate table connection.execute(text(f"drop index {index['name']}"))
engine.execute(""" # Recreate table
CREATE TABLE orders ( connection.execute(text("""
id INTEGER NOT NULL, CREATE TABLE orders (
ft_trade_id INTEGER, id INTEGER NOT NULL,
ft_order_side VARCHAR NOT NULL, ft_trade_id INTEGER,
ft_pair VARCHAR NOT NULL, ft_order_side VARCHAR NOT NULL,
ft_is_open BOOLEAN NOT NULL, ft_pair VARCHAR NOT NULL,
order_id VARCHAR NOT NULL, ft_is_open BOOLEAN NOT NULL,
status VARCHAR, order_id VARCHAR NOT NULL,
symbol VARCHAR, status VARCHAR,
order_type VARCHAR, symbol VARCHAR,
side VARCHAR, order_type VARCHAR,
price FLOAT, side VARCHAR,
amount FLOAT, price FLOAT,
filled FLOAT, amount FLOAT,
remaining FLOAT, filled FLOAT,
cost FLOAT, remaining FLOAT,
order_date DATETIME, cost FLOAT,
order_filled_date DATETIME, order_date DATETIME,
order_update_date DATETIME, order_filled_date DATETIME,
PRIMARY KEY (id), order_update_date DATETIME,
CONSTRAINT _order_pair_order_id UNIQUE (ft_pair, order_id), PRIMARY KEY (id),
FOREIGN KEY(ft_trade_id) REFERENCES trades (id) CONSTRAINT _order_pair_order_id UNIQUE (ft_pair, order_id),
) FOREIGN KEY(ft_trade_id) REFERENCES trades (id)
""") )
"""))
engine.execute(""" connection.execute(text("""
insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status,
symbol, order_type, side, price, amount, filled, remaining, cost, order_date, symbol, order_type, side, price, amount, filled, remaining, cost, order_date,
order_filled_date, order_update_date) order_filled_date, order_update_date)
select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status,
symbol, order_type, side, price, amount, filled, remaining, cost, order_date, symbol, order_type, side, price, amount, filled, remaining, cost, order_date,
order_filled_date, order_update_date order_filled_date, order_update_date
from orders_bak from orders_bak
""") """))
# Run init to test migration # Run init to test migration
init_db(default_conf['db_url'], default_conf['dry_run']) init_db(default_conf['db_url'], default_conf['dry_run'])
@ -722,8 +726,9 @@ def test_migrate_mid_state(mocker, default_conf, fee, caplog):
mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine) mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine)
# Create table using the old format # Create table using the old format
engine.execute(create_table_old) with engine.begin() as connection:
engine.execute(insert_table_old) connection.execute(text(create_table_old))
connection.execute(text(insert_table_old))
# Run init to test migration # Run init to test migration
init_db(default_conf['db_url'], default_conf['dry_run']) init_db(default_conf['db_url'], default_conf['dry_run'])
@ -1288,6 +1293,7 @@ def test_Trade_object_idem():
excludes = ( excludes = (
'delete', 'delete',
'session', 'session',
'commit',
'query', 'query',
'open_date', 'open_date',
'get_best_pair', 'get_best_pair',