optimizing trade db handling to lower monthly cost

This commit is contained in:
Gert Wohlgemuth 2018-06-02 14:44:09 -07:00
parent cd462db993
commit 13b533820f
6 changed files with 161 additions and 90 deletions

View File

@ -7,8 +7,9 @@ import boto3
import simplejson as json import simplejson as json
from boto3.dynamodb.conditions import Key from boto3.dynamodb.conditions import Key
from freqtrade.aws.tables import get_trade_table, get_strategy_table from freqtrade.aws.tables import get_strategy_table
from freqtrade.optimize.backtesting import Backtesting from freqtrade.optimize.backtesting import Backtesting
from requests import post
db = boto3.resource('dynamodb') db = boto3.resource('dynamodb')
@ -25,7 +26,7 @@ def backtest(event, context):
{ {
'strategy' : 'url handle where we can find the strategy' 'strategy' : 'url handle where we can find the strategy'
'stake_currency' : 'our desired stake currency' 'stake_currency' : 'our desired stake currency'
'asset' : '[] asset we are interested in. If empy, we fill use a default list 'asset' : '[] asset we are interested in.
'username' : user who's strategy should be evaluated 'username' : user who's strategy should be evaluated
'name' : name of the strategy we want to evaluate 'name' : name of the strategy we want to evaluate
'exchange' : name of the exchange we should be using 'exchange' : name of the exchange we should be using
@ -48,7 +49,6 @@ def backtest(event, context):
name = event['body']['name'] name = event['body']['name']
user = event['body']['user'] user = event['body']['user']
trade_table = get_trade_table()
table = get_strategy_table() table = get_strategy_table()
response = table.query( response = table.query(
@ -58,7 +58,7 @@ def backtest(event, context):
) )
till = datetime.datetime.today() till = datetime.datetime.today()
fromDate = till - datetime.timedelta(days=7) fromDate = till - datetime.timedelta(days=1)
if 'from' in event['body']: if 'from' in event['body']:
fromDate = datetime.datetime.strptime(event['body']['from'], '%Y%m%d') fromDate = datetime.datetime.strptime(event['body']['from'], '%Y%m%d')
@ -71,15 +71,14 @@ def backtest(event, context):
print("backtesting from {} till {} for {} with {} vs {}".format(fromDate, till, name, print("backtesting from {} till {} for {} with {} vs {}".format(fromDate, till, name,
event['body'][ event['body'][
'stake_currency'], 'stake_currency'],
event['body']['asset'])) event['body']['assets']))
configuration = _generate_configuration(event, fromDate, name, response, till) configuration = _generate_configuration(event, fromDate, name, response, till)
backtesting = Backtesting(configuration) backtesting = Backtesting(configuration)
result = backtesting.start() result = backtesting.start()
result_data = []
for index, row in result.iterrows(): for index, row in result.iterrows():
data = { data = {
"id": "{}.{}:{}".format(user, name, row['currency'].upper()), "id": "{}.{}:{}:test".format(user, name, row['currency'].upper()),
"trade": "{} to {}".format(row['entry'].strftime('%Y-%m-%d %H:%M:%S'), "trade": "{} to {}".format(row['entry'].strftime('%Y-%m-%d %H:%M:%S'),
row['exit'].strftime('%Y-%m-%d %H:%M:%S')), row['exit'].strftime('%Y-%m-%d %H:%M:%S')),
"pair": row['currency'], "pair": row['currency'],
@ -90,18 +89,12 @@ def backtest(event, context):
"exit_date": row['exit'].strftime('%Y-%m-%d %H:%M:%S') "exit_date": row['exit'].strftime('%Y-%m-%d %H:%M:%S')
} }
data = json.dumps(data, use_decimal=True) _submit_result_to_backend(data)
data = json.loads(data, use_decimal=True)
# persist data
trade_table.put_item(Item=data)
result_data.append(data)
# fire request message to aggregate this strategy now # fire request message to aggregate this strategy now
return { return {
"statusCode": 200, "statusCode": 200
"body": json.dumps(result_data)
} }
else: else:
return { return {
@ -120,6 +113,19 @@ def backtest(event, context):
raise Exception("not a valid event: {}".format(event)) raise Exception("not a valid event: {}".format(event))
def _submit_result_to_backend(data):
"""
submits the given result to the backend system for further processing and analysis
:param data:
:return:
"""
print(data)
try:
print(post("{}/trade".format(os.environ['BASE_URL']), data=data))
except Exception as e:
print("submission ignored: {}".format(e))
def _generate_configuration(event, fromDate, name, response, till): def _generate_configuration(event, fromDate, name, response, till):
""" """
generates the configuration for us on the fly generates the configuration for us on the fly
@ -147,11 +153,9 @@ def _generate_configuration(event, fromDate, name, response, till):
"enabled": True, "enabled": True,
"key": "key", "key": "key",
"secret": "secret", "secret": "secret",
"pair_whitelist": [ "pair_whitelist": list(
"{}/{}".format(event['body']['asset'].upper(), map(lambda x: "{}/{}".format(x, response['Items'][0]['stake_currency']).upper(),
event['body']['stake_currency']).upper(), response['Items'][0]['assets']))
]
}, },
"telegram": { "telegram": {
"enabled": False, "enabled": False,
@ -207,37 +211,34 @@ def cron(event, context):
for i in response['Items']: for i in response['Items']:
# fire a message to our queue # fire a message to our queue
for x in i['assets']: message = {
# test each asset by it self "user": i['user'],
"name": i['name'],
"asset": i['assets'],
"stake_currency": i['stake_currency']
}
message = { # triggered over html, let's provide
"user": i['user'], # a date range for the backtesting
"name": i['name'], if 'pathParameters' in event:
"asset": x, if 'from' in event['pathParameters']:
"stake_currency": i['stake_currency'] message['from'] = event['pathParameters']['from']
} else:
message['from'] = datetime.datetime.today().strftime('%Y%m%d')
if 'till' in event['pathParameters']:
message['till'] = event['pathParameters']['till']
else:
message['till'] = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime('%Y%m%d')
# triggered over html, let's provide serialized = json.dumps(message, use_decimal=True)
# a date range for the backtesting # submit item to queue for routing to the correct persistence
if 'pathParameters' in event:
if 'from' in event['pathParameters']:
message['from'] = event['pathParameters']['from']
else:
message['from'] = datetime.datetime.today().strftime('%Y%m%d')
if 'till' in event['pathParameters']:
message['till'] = event['pathParameters']['till']
else:
message['till'] = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime('%Y%m%d')
serialized = json.dumps(message, use_decimal=True) result = client.publish(
# submit item to queue for routing to the correct persistence TopicArn=topic_arn,
Message=json.dumps({'default': serialized}),
result = client.publish( Subject="schedule",
TopicArn=topic_arn, MessageStructure='json'
Message=json.dumps({'default': serialized}), )
Subject="schedule backtesting",
MessageStructure='json'
)
if 'LastEvaluatedKey' in response: if 'LastEvaluatedKey' in response:
return table.scan( return table.scan(

37
freqtrade/aws/trade.py Normal file
View File

@ -0,0 +1,37 @@
import boto3
import simplejson as json
import os
from freqtrade.aws.tables import get_trade_table
def store(event, context):
"""
stores the received data in the internal database
:param data:
:return:
"""
if 'Records' in event:
for x in event['Records']:
if 'Sns' in x and 'Message' in x['Sns']:
data = json.loads(x['Sns']['Message'], use_decimal=True)
get_trade_table().put_item(Item=data)
def submit(event, context):
"""
submits a new trade to be registered in the internal queue system
:param event:
:param context:
:return:
"""
data = json.loads(event['body'])
client = boto3.client('sns')
topic_arn = client.create_topic(Name=os.environ['tradeTopic'])['TopicArn']
result = client.publish(
TopicArn=topic_arn,
Message=json.dumps({'default': data}),
Subject="persist data",
MessageStructure='json'
)

View File

@ -4,6 +4,8 @@ from base64 import urlsafe_b64encode
import boto3 import boto3
import pytest import pytest
import simplejson as json import simplejson as json
from mock import Mock
from freqtrade.aws.backtesting_lambda import backtest, cron from freqtrade.aws.backtesting_lambda import backtest, cron
from freqtrade.aws.strategy import submit, get_trades from freqtrade.aws.strategy import submit, get_trades
@ -73,11 +75,11 @@ class MyFancyTestStrategy(IStrategy):
"from": "20180401", "from": "20180401",
"till": "20180501", "till": "20180501",
"stake_currency": "usdt", "stake_currency": "usdt",
"asset": "ltc" "assets": ["ltc"]
} }
data = json.loads(backtest({ assert backtest({
"Records": [ "Records": [
{ {
"Sns": { "Sns": {
@ -85,29 +87,14 @@ class MyFancyTestStrategy(IStrategy):
"Message": json.dumps(request) "Message": json.dumps(request)
} }
}] }]
}, {})['body']) }, {})['statusCode'] == 200
# evaluate that we now have trades in the database
# sadly not always a given at this tage
# due to the dynamic nature. Should pick a strategy for testing
# which generates a lot of trades
if len(data) > 0:
data = get_trades({
'pathParameters': {
'user': "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG",
"name": "MyFancyTestStrategy",
'stake': "USDT",
'asset': "{}".format(data[0]['pair'].split("/")[0])
}
}, {})['body']
print(data)
assert len(json.loads(data)) > 0
def test_backtest(lambda_context): def test_backtest(lambda_context):
content = """# --- Do not remove these libs --- content = """# --- Do not remove these libs ---
from freqtrade.strategy.interface import IStrategy from freqtrade.strategy.interface import IStrategy
from typing import Dict, List from typing import Dict, List
from hyperopt import hp from hyperopt import hp
from functools import reduce from functools import reduce
from pandas import DataFrame from pandas import DataFrame
@ -167,10 +154,10 @@ class MyFancyTestStrategy(IStrategy):
"user": "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG", "user": "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG",
"name": "MyFancyTestStrategy", "name": "MyFancyTestStrategy",
"stake_currency": "usdt", "stake_currency": "usdt",
"asset": "ltc" "assets": ["ltc"]
} }
data = json.loads(backtest({ assert backtest({
"Records": [ "Records": [
{ {
"Sns": { "Sns": {
@ -178,23 +165,7 @@ class MyFancyTestStrategy(IStrategy):
"Message": json.dumps(request) "Message": json.dumps(request)
} }
}] }]
}, {})['body']) }, {})['statusCode'] == 200
# evaluate that we now have trades in the database
# sadly not always a given at this tage
# due to the dynamic nature. Should pick a strategy for testing
# which generates a lot of trades
if len(data) > 0:
data = get_trades({
'pathParameters': {
'user': "GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG",
"name": "MyFancyTestStrategy",
"stake": "usdt",
"asset": "ltc"
}
}, {})['body']
print(data)
assert len(json.loads(data)) > 0
def test_cron(lambda_context): def test_cron(lambda_context):

View File

@ -0,0 +1,40 @@
from freqtrade.aws.trade import store
from freqtrade.aws.tables import get_trade_table
import simplejson as json
from boto3.dynamodb.conditions import Key, Attr
def test_store(lambda_context):
store({
"Records": [
{
"Sns": {
"Subject": "trade",
"Message": json.dumps(
{
'id': 'GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG.MyFancyTestStrategy:BTC/USDT:test',
'trade': '2018-05-05 14:15:00 to 2018-05-18 00:40:00',
'pair': 'BTC/USDT',
'duration': 625,
'profit_percent': -0.20453928,
'profit_stake': -0.20514198,
'entry_date': '2018-05-05 14:15:00',
'exit_date': '2018-05-18 00:40:00'
}
)
}
}]
}
, {})
# trade table should not have 1 item in it, with our given key
table = get_trade_table()
response = table.query(
KeyConditionExpression=Key('id')
.eq('GCU4LW2XXZW3A3FM2XZJTEJHNWHTWDKY2DIJLCZJ5ULVZ4K7LZ7D23TG.MyFancyTestStrategy:BTC/USDT:test')
)
print(response)
assert 'Items' in response
assert len(response['Items']) == 1

View File

@ -631,6 +631,7 @@ def lambda_context():
os.environ["strategyTable"] = "StrategyTable" os.environ["strategyTable"] = "StrategyTable"
os.environ["tradeTable"] = "TradeTable" os.environ["tradeTable"] = "TradeTable"
os.environ["topic"] = "UnitTestTopic" os.environ["topic"] = "UnitTestTopic"
os.environ["BASE_URL"] = "http://127.0.0.1/test"
client = session.client('sns') client = session.client('sns')
client.create_topic(Name=os.environ["topic"]) client.create_topic(Name=os.environ["topic"])
@ -643,7 +644,7 @@ def lambda_context():
responses.add_passthru('https://api.github.com') responses.add_passthru('https://api.github.com')
responses.add_passthru('https://bittrex.com') responses.add_passthru('https://bittrex.com')
responses.add_passthru('https://api.binance.com') responses.add_passthru('https://api.binance.com')
# here we will define required tables later
yield yield
sns.stop() sns.stop()
dynamo.stop() dynamo.stop()

View File

@ -199,7 +199,31 @@ functions:
environment: environment:
topic: ${self:custom.snsTopic} topic: ${self:custom.snsTopic}
strategyTable: ${self:custom.strategyTable} strategyTable: ${self:custom.strategyTable}
BASE_URL: ${self:custom.customDomain.domainName}/${self:custom.customDomain.stage}
# submits a new trade to the system
trade:
memorySize: 128
handler: freqtrade/aws/trade.submit
events:
- http:
path: trade
method: post
cors: true
environment:
tradeTopic: ${self:custom.snsTradeTopic}
# stores the received message in the trade table
trade-store:
memorySize: 128
handler: freqtrade/aws/trade.store
events:
- sns: ${self:custom.snsTradeTopic}
environment:
tradeTable: ${self:custom.tradeTable}
#submits a new strategy to the system #submits a new strategy to the system
submit_github: submit_github:
@ -232,9 +256,6 @@ functions:
tradeTable: ${self:custom.tradeTable} tradeTable: ${self:custom.tradeTable}
strategyTable: ${self:custom.strategyTable} strategyTable: ${self:custom.strategyTable}
# not more than 2 runners at any given time
reservedConcurrency: 2
# schedules all registered strategies on a daily base # schedules all registered strategies on a daily base
schedule: schedule:
memorySize: 128 memorySize: 128