From 9379e0b15a47db50341149dc16967ae7ba1f0096 Mon Sep 17 00:00:00 2001 From: Gert Wohlgemuth Date: Wed, 6 Jun 2018 08:27:08 -0700 Subject: [PATCH 1/5] updated backtesting to allow for more data aggregation --- freqtrade/aws/aggregate/__init__.py | 0 freqtrade/aws/aggregate/strategy.py | 0 freqtrade/aws/backtesting_lambda.py | 59 +++++++++---- serverless.yml | 129 +++++++++++++++++++--------- 4 files changed, 130 insertions(+), 58 deletions(-) create mode 100644 freqtrade/aws/aggregate/__init__.py create mode 100644 freqtrade/aws/aggregate/strategy.py diff --git a/freqtrade/aws/aggregate/__init__.py b/freqtrade/aws/aggregate/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/freqtrade/aws/aggregate/strategy.py b/freqtrade/aws/aggregate/strategy.py new file mode 100644 index 000000000..e69de29bb diff --git a/freqtrade/aws/backtesting_lambda.py b/freqtrade/aws/backtesting_lambda.py index 15a117564..626ad2193 100644 --- a/freqtrade/aws/backtesting_lambda.py +++ b/freqtrade/aws/backtesting_lambda.py @@ -197,11 +197,19 @@ def run_backtest(configuration, name, user, interval, fromDate, till): def _store_aggregated_data(interval, name, result, timerange, user): + """ + stores aggregated data for ease of access, yay for dynamodb data duplication... + + :param interval: + :param name: + :param result: + :param timerange: + :param user: + :return: + """ for row in result[1][2]: if row[1] > 0: data = { - "id": "aggregate:{}:{}:{}:test".format(row[0].upper(), interval, timerange), - "trade": "{}.{}".format(user, name), "pair": row[0], "trades": row[1], "losses": row[6], @@ -213,19 +221,40 @@ def _store_aggregated_data(interval, name, result, timerange, user): "ticker": interval, "days": timerange } + # aggregate by pair + interval + time range for each strategy + data['id'] = "aggregate:{}:{}:{}:test".format(row[0].upper(), interval, timerange) + data['trade'] = "{}.{}".format(user, name) + _submit_to_remote(data) - print(data) - try: - print( - post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')), - json=data)) - except Exception as e: - print("submission ignored: {}".format(e)) + # id: aggregate by strategy + user + range + pair + # range: ticker + # allows us to easily see on which ticker the strategy works best + data['id'] = "aggregate:ticker:{}:{}:{}:{}:test".format(user, name, row[0].upper(), timerange), + data['trade'] = "{}".format(interval) + + _submit_to_remote(data) + + # id: aggregate by strategy + user + ticker + pair + # range: timerange + # allows us to easily see on which time range the strategy works best + data['id'] = "aggregate:timerange:{}:{}:{}:{}:test".format(user, name, row[0].upper(), interval), + data['trade'] = "{}".format(timerange) + + _submit_to_remote(data) + + +def _submit_to_remote(data): + try: + print( + post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')), + json=data)) + except Exception as e: + print("submission ignored: {}".format(e)) def _store_trade_data(interval, name, result, timerange, user): for index, row in result[0].iterrows(): - data = { + _submit_to_remote({ "id": "{}.{}:{}:{}:{}:test".format(user, name, interval, timerange, row['currency'].upper()), "trade": "{} to {}".format(row['entry'].strftime('%Y-%m-%d %H:%M:%S'), row['exit'].strftime('%Y-%m-%d %H:%M:%S')), @@ -238,15 +267,7 @@ def _store_trade_data(interval, name, result, timerange, user): "strategy": name, "user": user - } - - print(data) - try: - print( - post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')), - json=data)) - except Exception as e: - print("submission ignored: {}".format(e)) + }) def generate_configuration(fromDate, till, name, refresh, user, remote=True): diff --git a/serverless.yml b/serverless.yml index 4185017e0..1bac69e0d 100644 --- a/serverless.yml +++ b/serverless.yml @@ -112,6 +112,7 @@ functions: environment: strategyTable: ${self:custom.strategyTable} + reservedConcurrency: 5 #returns the source code of this given strategy #unless it's private @@ -136,6 +137,7 @@ functions: environment: strategyTable: ${self:custom.strategyTable} + reservedConcurrency: 5 # loads the details of the specific strategy get: @@ -154,6 +156,80 @@ functions: environment: strategyTable: ${self:custom.strategyTable} + reservedConcurrency: 5 + + # loads the aggregation report for the given strategy based on different tickers + get_aggregate_interval: + memorySize: 128 + handler: freqtrade/aws/aggregate/strategy.ticker + events: + - http: + path: strategies/{user}/{name}/aggregate/ticker + method: get + cors: true + request: + parameter: + paths: + user: true + name: true + + environment: + strategyTable: ${self:custom.strategyTable} + tradeTable: ${self:custom.tradeTable} + reservedConcurrency: 5 + + # loads the aggregation report for the given strategy based on different tickers + get_aggregate_timeframe: + memorySize: 128 + handler: freqtrade/aws/aggregate/strategy.timeframe + events: + - http: + path: strategies/{user}/{name}/aggregate/timeframe + method: get + cors: true + request: + parameter: + paths: + user: true + name: true + + environment: + strategyTable: ${self:custom.strategyTable} + tradeTable: ${self:custom.tradeTable} + reservedConcurrency: 5 + + #submits a new strategy to the system + submit: + memorySize: 128 + handler: freqtrade/aws/strategy.submit + events: + - http: + path: strategies/submit + method: post + cors: true + + environment: + topic: ${self:custom.snsTopic} + strategyTable: ${self:custom.strategyTable} + BASE_URL: ${self:custom.customDomain.domainName}/${self:custom.customDomain.stage} + reservedConcurrency: 5 + + #submits a new strategy to the system + submit_github: + memorySize: 128 + handler: freqtrade/aws/strategy.submit_github + events: + - http: + path: strategies/submit/github + method: post + cors: true + + environment: + topic: ${self:custom.snsTopic} + strategyTable: ${self:custom.strategyTable} + reservedConcurrency: 1 + +### TRADE REQUESTS # loads all trades for a strategy and it's associated pairs trades: @@ -175,22 +251,7 @@ functions: environment: strategyTable: ${self:custom.strategyTable} tradeTable: ${self:custom.tradeTable} - - - #submits a new strategy to the system - submit: - memorySize: 128 - handler: freqtrade/aws/strategy.submit - events: - - http: - path: strategies/submit - method: post - cors: true - - environment: - topic: ${self:custom.snsTopic} - strategyTable: ${self:custom.strategyTable} - BASE_URL: ${self:custom.customDomain.domainName}/${self:custom.customDomain.stage} + reservedConcurrency: 5 # submits a new trade to the system trade: @@ -204,19 +265,9 @@ functions: environment: tradeTopic: ${self:custom.snsTradeTopic} + reservedConcurrency: 5 - # stores the received message in the trade table - trade-store: - memorySize: 128 - handler: freqtrade/aws/trade.store - - events: - - sns: ${self:custom.snsTradeTopic} - - environment: - tradeTable: ${self:custom.tradeTable} - - # stores the received message in the trade table + # query aggregates by day and ticker for all strategies trade-aggregate: memorySize: 128 handler: freqtrade/aws/trade.get_aggregated_trades @@ -234,21 +285,21 @@ functions: environment: tradeTable: ${self:custom.tradeTable} - #submits a new strategy to the system - submit_github: + reservedConcurrency: 5 +### SNS TRIGGERED FUNCTIONS + + # stores the received message in the trade table + trade-store: memorySize: 128 - handler: freqtrade/aws/strategy.submit_github + handler: freqtrade/aws/trade.store + events: - - http: - path: strategies/submit/github - method: post - cors: true + - sns: ${self:custom.snsTradeTopic} environment: - topic: ${self:custom.snsTopic} - strategyTable: ${self:custom.strategyTable} - + tradeTable: ${self:custom.tradeTable} + reservedConcurrency: 1 #backtests the strategy #should be switched to utilze aws fargate instead #and running a container @@ -275,7 +326,7 @@ functions: events: - schedule: - rate: rate(580 minutes) + rate: rate(1440 minutes) enabled: true environment: From 9eaf984a2b1fba086e08441544b39511f9891a1e Mon Sep 17 00:00:00 2001 From: Gert Wohlgemuth Date: Wed, 6 Jun 2018 09:17:05 -0700 Subject: [PATCH 2/5] added comments --- freqtrade/aws/backtesting_lambda.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/freqtrade/aws/backtesting_lambda.py b/freqtrade/aws/backtesting_lambda.py index 626ad2193..c910e6617 100644 --- a/freqtrade/aws/backtesting_lambda.py +++ b/freqtrade/aws/backtesting_lambda.py @@ -244,6 +244,11 @@ def _store_aggregated_data(interval, name, result, timerange, user): def _submit_to_remote(data): + """ + submits data to the backend to be persisted in the database + :param data: + :return: + """ try: print( post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')), @@ -253,6 +258,16 @@ def _submit_to_remote(data): def _store_trade_data(interval, name, result, timerange, user): + """ + stores individual trades on the remote system + + :param interval: + :param name: + :param result: + :param timerange: + :param user: + :return: + """ for index, row in result[0].iterrows(): _submit_to_remote({ "id": "{}.{}:{}:{}:{}:test".format(user, name, interval, timerange, row['currency'].upper()), From 778b9ac6342c8655362798cb9ba7a5fbaf5adffb Mon Sep 17 00:00:00 2001 From: Gert Wohlgemuth Date: Wed, 6 Jun 2018 09:26:46 -0700 Subject: [PATCH 3/5] added more logging --- freqtrade/aws/backtesting_lambda.py | 1 + 1 file changed, 1 insertion(+) diff --git a/freqtrade/aws/backtesting_lambda.py b/freqtrade/aws/backtesting_lambda.py index c910e6617..cd7db6e6f 100644 --- a/freqtrade/aws/backtesting_lambda.py +++ b/freqtrade/aws/backtesting_lambda.py @@ -250,6 +250,7 @@ def _submit_to_remote(data): :return: """ try: + print("submitting data: {}".format(data)) print( post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')), json=data)) From 4b92c0f848aef659a06181ce72c38c38b3caf30f Mon Sep 17 00:00:00 2001 From: Gert Wohlgemuth Date: Wed, 6 Jun 2018 09:48:09 -0700 Subject: [PATCH 4/5] added more logging --- freqtrade/aws/trade.py | 1 + 1 file changed, 1 insertion(+) diff --git a/freqtrade/aws/trade.py b/freqtrade/aws/trade.py index 6d0c5d5cc..dd1beff70 100644 --- a/freqtrade/aws/trade.py +++ b/freqtrade/aws/trade.py @@ -15,6 +15,7 @@ def store(event, context): for x in event['Records']: if 'Sns' in x and 'Message' in x['Sns']: data = json.loads(x['Sns']['Message'], use_decimal=True) + print("storing data: {}".format(data)) get_trade_table().put_item(Item=data) From f733de1662a500abc23dfcdba245476aba667de5 Mon Sep 17 00:00:00 2001 From: Gert Wohlgemuth Date: Wed, 6 Jun 2018 18:20:26 -0700 Subject: [PATCH 5/5] updated lambda to reduce traffic --- freqtrade/aws/backtesting_lambda.py | 91 +++++++++++------------------ 1 file changed, 34 insertions(+), 57 deletions(-) diff --git a/freqtrade/aws/backtesting_lambda.py b/freqtrade/aws/backtesting_lambda.py index cd7db6e6f..4de73bb8f 100644 --- a/freqtrade/aws/backtesting_lambda.py +++ b/freqtrade/aws/backtesting_lambda.py @@ -46,18 +46,9 @@ def backtest(event, context): name = event['body']['name'] user = event['body']['user'] - till = datetime.datetime.today() - fromDate = till - datetime.timedelta(days=90) - + days = [90] if 'days' in event['body']: - fromDate = till - datetime.timedelta(days=event['body']['days']) - else: - if 'from' in event['body']: - fromDate = datetime.datetime.strptime(event['body']['from'], '%Y%m%d') - if 'till' in event['body']: - till = datetime.datetime.strptime(event['body']['till'], '%Y%m%d') - - timerange = (till - fromDate).days + days = event['body']['days'] # by default we refresh data refresh = True @@ -65,29 +56,24 @@ def backtest(event, context): if 'refresh' in event['body']: refresh = event['body']['refresh'] - print("time range between dates is: {} days".format(timerange)) - try: - print("schedule back testing from {} till {} for {} with {} vs {}".format(fromDate, till, name, - event['body'][ - 'stake_currency'], - event['body'][ - 'assets'])) - if "ticker" in event['body']: ticker = event['body']['ticker'] else: - ticker = '5m' + ticker = ['5m'] if "local" in event['body'] and event['body']['local']: print("running in local mode") - configuration = generate_configuration(fromDate, till, name, refresh, user, False) - - run_backtest(configuration, name, user, ticker, fromDate, till) + for x in days: + for y in ticker: + till = datetime.datetime.today() + fromDate = till - datetime.timedelta(days=x) + configuration = generate_configuration(fromDate, till, name, refresh, user, False) + run_backtest(configuration, name, user, y, fromDate, till) else: print("running in remote mode") - _submit_job(name, user, ticker, fromDate, till) + _submit_job(name, user, ticker, days) return { "statusCode": 200 @@ -102,7 +88,7 @@ def backtest(event, context): raise Exception("not a valid event: {}".format(event)) -def _submit_job(name, user, ticker, fromDate, till): +def _submit_job(name, user, ticker, days): """ submits a new task to the cluster @@ -143,15 +129,11 @@ def _submit_job(name, user, ticker, fromDate, till): }, { "name": "FREQ_TICKER", - "value": "{}".format(ticker) + "value": "{}".format(json.dumps(ticker)) }, { - "name": "FREQ_FROM", - "value": "{}".format(fromDate.strftime('%Y%m%d')) - }, - { - "name": "FREQ_TILL", - "value": "{}".format(till.strftime('%Y%m%d')) + "name": "FREQ_DAYS", + "value": "{}".format(json.dumps(days)) }, { "name": "FREQ_STRATEGY", @@ -407,34 +389,29 @@ def cron(event, context): for i in response['Items']: # fire a message to our queue - # we want to evaluate several time spans for the strategy - for day in [1, 7, 30, 90]: + message = { + "user": i['user'], + "name": i['name'], + "assets": i['assets'], + "stake_currency": i['stake_currency'], + "local": False, + "refresh": True, + "ticker": ['5m', '15m', '30m', '1h', '2h', '4h', '6h', '12h', '1d'], + "days": [1, 2, 3, 4, 5, 6, 7, 14, 30, 90] + } - # we want to evaluate several time intervals for each strategy - for interval in ['5m', '15m', '30m', '1h']: - message = { - "user": i['user'], - "name": i['name'], - "assets": i['assets'], - "stake_currency": i['stake_currency'], - "local": False, - "refresh": True, - "ticker": interval, - "days": day - } + print("submitting: {}".format(message)) + serialized = json.dumps(message, use_decimal=True) + # submit item to queue for routing to the correct persistence - print("submitting: {}".format(message)) - serialized = json.dumps(message, use_decimal=True) - # submit item to queue for routing to the correct persistence + result = client.publish( + TopicArn=topic_arn, + Message=json.dumps({'default': serialized}), + Subject="schedule", + MessageStructure='json' + ) - result = client.publish( - TopicArn=topic_arn, - Message=json.dumps({'default': serialized}), - Subject="schedule", - MessageStructure='json' - ) - - print(result) + print(result) if 'LastEvaluatedKey' in response: return table.scan(