From 13b533820f028fe357e4ff89cdb19f8338b5c900 Mon Sep 17 00:00:00 2001 From: Gert Wohlgemuth Date: Sat, 2 Jun 2018 14:44:09 -0700 Subject: [PATCH] optimizing trade db handling to lower monthly cost --- freqtrade/aws/backtesting_lambda.py | 97 ++++++++++++++-------------- freqtrade/aws/trade.py | 37 +++++++++++ freqtrade/tests/aws/test_backtest.py | 47 +++----------- freqtrade/tests/aws/test_trade.py | 40 ++++++++++++ freqtrade/tests/conftest.py | 3 +- serverless.yml | 27 +++++++- 6 files changed, 161 insertions(+), 90 deletions(-) create mode 100644 freqtrade/aws/trade.py create mode 100644 freqtrade/tests/aws/test_trade.py diff --git a/freqtrade/aws/backtesting_lambda.py b/freqtrade/aws/backtesting_lambda.py index 0537613c6..f6577232c 100644 --- a/freqtrade/aws/backtesting_lambda.py +++ b/freqtrade/aws/backtesting_lambda.py @@ -7,8 +7,9 @@ import boto3 import simplejson as json from boto3.dynamodb.conditions import Key -from freqtrade.aws.tables import get_trade_table, get_strategy_table +from freqtrade.aws.tables import get_strategy_table from freqtrade.optimize.backtesting import Backtesting +from requests import post db = boto3.resource('dynamodb') @@ -25,7 +26,7 @@ def backtest(event, context): { 'strategy' : 'url handle where we can find the strategy' 'stake_currency' : 'our desired stake currency' - 'asset' : '[] asset we are interested in. If empy, we fill use a default list + 'asset' : '[] asset we are interested in. 'username' : user who's strategy should be evaluated 'name' : name of the strategy we want to evaluate 'exchange' : name of the exchange we should be using @@ -48,7 +49,6 @@ def backtest(event, context): name = event['body']['name'] user = event['body']['user'] - trade_table = get_trade_table() table = get_strategy_table() response = table.query( @@ -58,7 +58,7 @@ def backtest(event, context): ) till = datetime.datetime.today() - fromDate = till - datetime.timedelta(days=7) + fromDate = till - datetime.timedelta(days=1) if 'from' in event['body']: fromDate = datetime.datetime.strptime(event['body']['from'], '%Y%m%d') @@ -71,15 +71,14 @@ def backtest(event, context): print("backtesting from {} till {} for {} with {} vs {}".format(fromDate, till, name, event['body'][ 'stake_currency'], - event['body']['asset'])) + event['body']['assets'])) configuration = _generate_configuration(event, fromDate, name, response, till) backtesting = Backtesting(configuration) result = backtesting.start() - result_data = [] for index, row in result.iterrows(): data = { - "id": "{}.{}:{}".format(user, name, row['currency'].upper()), + "id": "{}.{}:{}:test".format(user, name, row['currency'].upper()), "trade": "{} to {}".format(row['entry'].strftime('%Y-%m-%d %H:%M:%S'), row['exit'].strftime('%Y-%m-%d %H:%M:%S')), "pair": row['currency'], @@ -90,18 +89,12 @@ def backtest(event, context): "exit_date": row['exit'].strftime('%Y-%m-%d %H:%M:%S') } - data = json.dumps(data, use_decimal=True) - data = json.loads(data, use_decimal=True) - - # persist data - trade_table.put_item(Item=data) - result_data.append(data) + _submit_result_to_backend(data) # fire request message to aggregate this strategy now return { - "statusCode": 200, - "body": json.dumps(result_data) + "statusCode": 200 } else: return { @@ -120,6 +113,19 @@ def backtest(event, context): raise Exception("not a valid event: {}".format(event)) +def _submit_result_to_backend(data): + """ + submits the given result to the backend system for further processing and analysis + :param data: + :return: + """ + print(data) + try: + print(post("{}/trade".format(os.environ['BASE_URL']), data=data)) + except Exception as e: + print("submission ignored: {}".format(e)) + + def _generate_configuration(event, fromDate, name, response, till): """ generates the configuration for us on the fly @@ -147,11 +153,9 @@ def _generate_configuration(event, fromDate, name, response, till): "enabled": True, "key": "key", "secret": "secret", - "pair_whitelist": [ - "{}/{}".format(event['body']['asset'].upper(), - event['body']['stake_currency']).upper(), - - ] + "pair_whitelist": list( + map(lambda x: "{}/{}".format(x, response['Items'][0]['stake_currency']).upper(), + response['Items'][0]['assets'])) }, "telegram": { "enabled": False, @@ -207,37 +211,34 @@ def cron(event, context): for i in response['Items']: # fire a message to our queue - for x in i['assets']: - # test each asset by it self + message = { + "user": i['user'], + "name": i['name'], + "asset": i['assets'], + "stake_currency": i['stake_currency'] + } - message = { - "user": i['user'], - "name": i['name'], - "asset": x, - "stake_currency": i['stake_currency'] - } + # triggered over html, let's provide + # a date range for the backtesting + if 'pathParameters' in event: + if 'from' in event['pathParameters']: + message['from'] = event['pathParameters']['from'] + else: + message['from'] = datetime.datetime.today().strftime('%Y%m%d') + if 'till' in event['pathParameters']: + message['till'] = event['pathParameters']['till'] + else: + message['till'] = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime('%Y%m%d') - # triggered over html, let's provide - # a date range for the backtesting - if 'pathParameters' in event: - if 'from' in event['pathParameters']: - message['from'] = event['pathParameters']['from'] - else: - message['from'] = datetime.datetime.today().strftime('%Y%m%d') - if 'till' in event['pathParameters']: - message['till'] = event['pathParameters']['till'] - else: - message['till'] = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime('%Y%m%d') + serialized = json.dumps(message, use_decimal=True) + # submit item to queue for routing to the correct persistence - serialized = json.dumps(message, use_decimal=True) - # submit item to queue for routing to the correct persistence - - result = client.publish( - TopicArn=topic_arn, - Message=json.dumps({'default': serialized}), - Subject="schedule backtesting", - MessageStructure='json' - ) + result = client.publish( + TopicArn=topic_arn, + Message=json.dumps({'default': serialized}), + Subject="schedule", + MessageStructure='json' + ) if 'LastEvaluatedKey' in response: return table.scan( diff --git a/freqtrade/aws/trade.py b/freqtrade/aws/trade.py new file mode 100644 index 000000000..cb2581a9c --- /dev/null +++ b/freqtrade/aws/trade.py @@ -0,0 +1,37 @@ +import boto3 +import simplejson as json +import os +from freqtrade.aws.tables import get_trade_table + + +def store(event, context): + """ + stores the received data in the internal database + :param data: + :return: + """ + if 'Records' in event: + for x in event['Records']: + if 'Sns' in x and 'Message' in x['Sns']: + data = json.loads(x['Sns']['Message'], use_decimal=True) + get_trade_table().put_item(Item=data) + + +def submit(event, context): + """ + submits a new trade to be registered in the internal queue system + :param event: + :param context: + :return: + """ + + data = json.loads(event['body']) + client = boto3.client('sns') + topic_arn = client.create_topic(Name=os.environ['tradeTopic'])['TopicArn'] + + result = client.publish( + TopicArn=topic_arn, + Message=json.dumps({'default': data}), + Subject="persist data", + MessageStructure='json' + ) diff --git a/freqtrade/tests/aws/test_backtest.py b/freqtrade/tests/aws/test_backtest.py index 2a643caa2..f192fd4da 100644 --- a/freqtrade/tests/aws/test_backtest.py +++ b/freqtrade/tests/aws/test_backtest.py @@ -4,6 +4,8 @@ from base64 import urlsafe_b64encode import boto3 import pytest import simplejson as json +from mock import Mock + from freqtrade.aws.backtesting_lambda import backtest, cron from freqtrade.aws.strategy import submit, get_trades @@ -73,11 +75,11 @@ class MyFancyTestStrategy(IStrategy): "from": "20180401", "till": "20180501", "stake_currency": "usdt", - "asset": "ltc" + "assets": ["ltc"] } - data = json.loads(backtest({ + assert backtest({ "Records": [ { "Sns": { @@ -85,29 +87,14 @@ class MyFancyTestStrategy(IStrategy): "Message": json.dumps(request) } }] - }, {})['body']) - - # evaluate that we now have trades in the database - # sadly not always a given at this tage - # due to the dynamic nature. Should pick a strategy for testing - # which generates a lot of trades - if len(data) > 0: - data = get_trades({ - 'pathParameters': { - 'user': "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG", - "name": "MyFancyTestStrategy", - 'stake': "USDT", - 'asset': "{}".format(data[0]['pair'].split("/")[0]) - } - }, {})['body'] - print(data) - assert len(json.loads(data)) > 0 + }, {})['statusCode'] == 200 def test_backtest(lambda_context): content = """# --- Do not remove these libs --- from freqtrade.strategy.interface import IStrategy from typing import Dict, List + from hyperopt import hp from functools import reduce from pandas import DataFrame @@ -167,10 +154,10 @@ class MyFancyTestStrategy(IStrategy): "user": "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG", "name": "MyFancyTestStrategy", "stake_currency": "usdt", - "asset": "ltc" + "assets": ["ltc"] } - data = json.loads(backtest({ + assert backtest({ "Records": [ { "Sns": { @@ -178,23 +165,7 @@ class MyFancyTestStrategy(IStrategy): "Message": json.dumps(request) } }] - }, {})['body']) - - # evaluate that we now have trades in the database - # sadly not always a given at this tage - # due to the dynamic nature. Should pick a strategy for testing - # which generates a lot of trades - if len(data) > 0: - data = get_trades({ - 'pathParameters': { - 'user': "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG", - "name": "MyFancyTestStrategy", - "stake": "usdt", - "asset": "ltc" - } - }, {})['body'] - print(data) - assert len(json.loads(data)) > 0 + }, {})['statusCode'] == 200 def test_cron(lambda_context): diff --git a/freqtrade/tests/aws/test_trade.py b/freqtrade/tests/aws/test_trade.py new file mode 100644 index 000000000..52fbbd202 --- /dev/null +++ b/freqtrade/tests/aws/test_trade.py @@ -0,0 +1,40 @@ +from freqtrade.aws.trade import store +from freqtrade.aws.tables import get_trade_table +import simplejson as json +from boto3.dynamodb.conditions import Key, Attr + + +def test_store(lambda_context): + store({ + "Records": [ + { + "Sns": { + "Subject": "trade", + "Message": json.dumps( + { + 'id': 'GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG.MyFancyTestStrategy:BTC/USDT:test', + 'trade': '2018-05-05 14:15:00 to 2018-05-18 00:40:00', + 'pair': 'BTC/USDT', + 'duration': 625, + 'profit_percent': -0.20453928, + 'profit_stake': -0.20514198, + 'entry_date': '2018-05-05 14:15:00', + 'exit_date': '2018-05-18 00:40:00' + } + ) + } + }] + } + , {}) + + # trade table should not have 1 item in it, with our given key + + table = get_trade_table() + response = table.query( + KeyConditionExpression=Key('id') + .eq('GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG.MyFancyTestStrategy:BTC/USDT:test') + ) + + print(response) + assert 'Items' in response + assert len(response['Items']) == 1 diff --git a/freqtrade/tests/conftest.py b/freqtrade/tests/conftest.py index 5e8ac29c6..f0f5c85ae 100644 --- a/freqtrade/tests/conftest.py +++ b/freqtrade/tests/conftest.py @@ -631,6 +631,7 @@ def lambda_context(): os.environ["strategyTable"] = "StrategyTable" os.environ["tradeTable"] = "TradeTable" os.environ["topic"] = "UnitTestTopic" + os.environ["BASE_URL"] = "http://127.0.0.1/test" client = session.client('sns') client.create_topic(Name=os.environ["topic"]) @@ -643,7 +644,7 @@ def lambda_context(): responses.add_passthru('https://api.github.com') responses.add_passthru('https://bittrex.com') responses.add_passthru('https://api.binance.com') - # here we will define required tables later + yield sns.stop() dynamo.stop() diff --git a/serverless.yml b/serverless.yml index c61179f26..0aa55c835 100644 --- a/serverless.yml +++ b/serverless.yml @@ -199,7 +199,31 @@ functions: environment: topic: ${self:custom.snsTopic} strategyTable: ${self:custom.strategyTable} + BASE_URL: ${self:custom.customDomain.domainName}/${self:custom.customDomain.stage} + # submits a new trade to the system + trade: + memorySize: 128 + handler: freqtrade/aws/trade.submit + events: + - http: + path: trade + method: post + cors: true + + environment: + tradeTopic: ${self:custom.snsTradeTopic} + + # stores the received message in the trade table + trade-store: + memorySize: 128 + handler: freqtrade/aws/trade.store + + events: + - sns: ${self:custom.snsTradeTopic} + + environment: + tradeTable: ${self:custom.tradeTable} #submits a new strategy to the system submit_github: @@ -232,9 +256,6 @@ functions: tradeTable: ${self:custom.tradeTable} strategyTable: ${self:custom.strategyTable} - # not more than 2 runners at any given time - reservedConcurrency: 2 - # schedules all registered strategies on a daily base schedule: memorySize: 128