This commit is contained in:
EC2 Default User 2018-06-07 02:06:27 +00:00
commit e08be2742e
5 changed files with 181 additions and 115 deletions

View File

View File

View File

@ -46,18 +46,9 @@ def backtest(event, context):
name = event['body']['name']
user = event['body']['user']
till = datetime.datetime.today()
fromDate = till - datetime.timedelta(days=90)
days = [90]
if 'days' in event['body']:
fromDate = till - datetime.timedelta(days=event['body']['days'])
else:
if 'from' in event['body']:
fromDate = datetime.datetime.strptime(event['body']['from'], '%Y%m%d')
if 'till' in event['body']:
till = datetime.datetime.strptime(event['body']['till'], '%Y%m%d')
timerange = (till - fromDate).days
days = event['body']['days']
# by default we refresh data
refresh = True
@ -65,29 +56,24 @@ def backtest(event, context):
if 'refresh' in event['body']:
refresh = event['body']['refresh']
print("time range between dates is: {} days".format(timerange))
try:
print("schedule back testing from {} till {} for {} with {} vs {}".format(fromDate, till, name,
event['body'][
'stake_currency'],
event['body'][
'assets']))
if "ticker" in event['body']:
ticker = event['body']['ticker']
else:
ticker = '5m'
ticker = ['5m']
if "local" in event['body'] and event['body']['local']:
print("running in local mode")
configuration = generate_configuration(fromDate, till, name, refresh, user, False)
run_backtest(configuration, name, user, ticker, fromDate, till)
for x in days:
for y in ticker:
till = datetime.datetime.today()
fromDate = till - datetime.timedelta(days=x)
configuration = generate_configuration(fromDate, till, name, refresh, user, False)
run_backtest(configuration, name, user, y, fromDate, till)
else:
print("running in remote mode")
_submit_job(name, user, ticker, fromDate, till)
_submit_job(name, user, ticker, days)
return {
"statusCode": 200
@ -102,7 +88,7 @@ def backtest(event, context):
raise Exception("not a valid event: {}".format(event))
def _submit_job(name, user, ticker, fromDate, till):
def _submit_job(name, user, ticker, days):
"""
submits a new task to the cluster
@ -143,15 +129,11 @@ def _submit_job(name, user, ticker, fromDate, till):
},
{
"name": "FREQ_TICKER",
"value": "{}".format(ticker)
"value": "{}".format(json.dumps(ticker))
},
{
"name": "FREQ_FROM",
"value": "{}".format(fromDate.strftime('%Y%m%d'))
},
{
"name": "FREQ_TILL",
"value": "{}".format(till.strftime('%Y%m%d'))
"name": "FREQ_DAYS",
"value": "{}".format(json.dumps(days))
},
{
"name": "FREQ_STRATEGY",
@ -197,11 +179,19 @@ def run_backtest(configuration, name, user, interval, fromDate, till):
def _store_aggregated_data(interval, name, result, timerange, user):
"""
stores aggregated data for ease of access, yay for dynamodb data duplication...
:param interval:
:param name:
:param result:
:param timerange:
:param user:
:return:
"""
for row in result[1][2]:
if row[1] > 0:
data = {
"id": "aggregate:{}:{}:{}:test".format(row[0].upper(), interval, timerange),
"trade": "{}.{}".format(user, name),
"pair": row[0],
"trades": row[1],
"losses": row[6],
@ -213,19 +203,56 @@ def _store_aggregated_data(interval, name, result, timerange, user):
"ticker": interval,
"days": timerange
}
# aggregate by pair + interval + time range for each strategy
data['id'] = "aggregate:{}:{}:{}:test".format(row[0].upper(), interval, timerange)
data['trade'] = "{}.{}".format(user, name)
_submit_to_remote(data)
print(data)
try:
print(
post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')),
json=data))
except Exception as e:
print("submission ignored: {}".format(e))
# id: aggregate by strategy + user + range + pair
# range: ticker
# allows us to easily see on which ticker the strategy works best
data['id'] = "aggregate:ticker:{}:{}:{}:{}:test".format(user, name, row[0].upper(), timerange),
data['trade'] = "{}".format(interval)
_submit_to_remote(data)
# id: aggregate by strategy + user + ticker + pair
# range: timerange
# allows us to easily see on which time range the strategy works best
data['id'] = "aggregate:timerange:{}:{}:{}:{}:test".format(user, name, row[0].upper(), interval),
data['trade'] = "{}".format(timerange)
_submit_to_remote(data)
def _submit_to_remote(data):
"""
submits data to the backend to be persisted in the database
:param data:
:return:
"""
try:
print("submitting data: {}".format(data))
print(
post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')),
json=data))
except Exception as e:
print("submission ignored: {}".format(e))
def _store_trade_data(interval, name, result, timerange, user):
"""
stores individual trades on the remote system
:param interval:
:param name:
:param result:
:param timerange:
:param user:
:return:
"""
for index, row in result[0].iterrows():
data = {
_submit_to_remote({
"id": "{}.{}:{}:{}:{}:test".format(user, name, interval, timerange, row['currency'].upper()),
"trade": "{} to {}".format(row['entry'].strftime('%Y-%m-%d %H:%M:%S'),
row['exit'].strftime('%Y-%m-%d %H:%M:%S')),
@ -238,15 +265,7 @@ def _store_trade_data(interval, name, result, timerange, user):
"strategy": name,
"user": user
}
print(data)
try:
print(
post("{}/trade".format(os.environ.get('BASE_URL', 'https://freq.isaac.international/dev')),
json=data))
except Exception as e:
print("submission ignored: {}".format(e))
})
def generate_configuration(fromDate, till, name, refresh, user, remote=True):
@ -370,34 +389,29 @@ def cron(event, context):
for i in response['Items']:
# fire a message to our queue
# we want to evaluate several time spans for the strategy
for day in [1, 7, 30, 90]:
message = {
"user": i['user'],
"name": i['name'],
"assets": i['assets'],
"stake_currency": i['stake_currency'],
"local": False,
"refresh": True,
"ticker": ['5m', '15m', '30m', '1h', '2h', '4h', '6h', '12h', '1d'],
"days": [1, 2, 3, 4, 5, 6, 7, 14, 30, 90]
}
# we want to evaluate several time intervals for each strategy
for interval in ['5m', '15m', '30m', '1h']:
message = {
"user": i['user'],
"name": i['name'],
"assets": i['assets'],
"stake_currency": i['stake_currency'],
"local": False,
"refresh": True,
"ticker": interval,
"days": day
}
print("submitting: {}".format(message))
serialized = json.dumps(message, use_decimal=True)
# submit item to queue for routing to the correct persistence
print("submitting: {}".format(message))
serialized = json.dumps(message, use_decimal=True)
# submit item to queue for routing to the correct persistence
result = client.publish(
TopicArn=topic_arn,
Message=json.dumps({'default': serialized}),
Subject="schedule",
MessageStructure='json'
)
result = client.publish(
TopicArn=topic_arn,
Message=json.dumps({'default': serialized}),
Subject="schedule",
MessageStructure='json'
)
print(result)
print(result)
if 'LastEvaluatedKey' in response:
return table.scan(

View File

@ -15,6 +15,7 @@ def store(event, context):
for x in event['Records']:
if 'Sns' in x and 'Message' in x['Sns']:
data = json.loads(x['Sns']['Message'], use_decimal=True)
print("storing data: {}".format(data))
get_trade_table().put_item(Item=data)

View File

@ -112,6 +112,7 @@ functions:
environment:
strategyTable: ${self:custom.strategyTable}
reservedConcurrency: 5
#returns the source code of this given strategy
#unless it's private
@ -136,6 +137,7 @@ functions:
environment:
strategyTable: ${self:custom.strategyTable}
reservedConcurrency: 5
# loads the details of the specific strategy
get:
@ -154,6 +156,80 @@ functions:
environment:
strategyTable: ${self:custom.strategyTable}
reservedConcurrency: 5
# loads the aggregation report for the given strategy based on different tickers
get_aggregate_interval:
memorySize: 128
handler: freqtrade/aws/aggregate/strategy.ticker
events:
- http:
path: strategies/{user}/{name}/aggregate/ticker
method: get
cors: true
request:
parameter:
paths:
user: true
name: true
environment:
strategyTable: ${self:custom.strategyTable}
tradeTable: ${self:custom.tradeTable}
reservedConcurrency: 5
# loads the aggregation report for the given strategy based on different tickers
get_aggregate_timeframe:
memorySize: 128
handler: freqtrade/aws/aggregate/strategy.timeframe
events:
- http:
path: strategies/{user}/{name}/aggregate/timeframe
method: get
cors: true
request:
parameter:
paths:
user: true
name: true
environment:
strategyTable: ${self:custom.strategyTable}
tradeTable: ${self:custom.tradeTable}
reservedConcurrency: 5
#submits a new strategy to the system
submit:
memorySize: 128
handler: freqtrade/aws/strategy.submit
events:
- http:
path: strategies/submit
method: post
cors: true
environment:
topic: ${self:custom.snsTopic}
strategyTable: ${self:custom.strategyTable}
BASE_URL: ${self:custom.customDomain.domainName}/${self:custom.customDomain.stage}
reservedConcurrency: 5
#submits a new strategy to the system
submit_github:
memorySize: 128
handler: freqtrade/aws/strategy.submit_github
events:
- http:
path: strategies/submit/github
method: post
cors: true
environment:
topic: ${self:custom.snsTopic}
strategyTable: ${self:custom.strategyTable}
reservedConcurrency: 1
### TRADE REQUESTS
# loads all trades for a strategy and it's associated pairs
trades:
@ -175,22 +251,7 @@ functions:
environment:
strategyTable: ${self:custom.strategyTable}
tradeTable: ${self:custom.tradeTable}
#submits a new strategy to the system
submit:
memorySize: 128
handler: freqtrade/aws/strategy.submit
events:
- http:
path: strategies/submit
method: post
cors: true
environment:
topic: ${self:custom.snsTopic}
strategyTable: ${self:custom.strategyTable}
BASE_URL: ${self:custom.customDomain.domainName}/${self:custom.customDomain.stage}
reservedConcurrency: 5
# submits a new trade to the system
trade:
@ -204,19 +265,9 @@ functions:
environment:
tradeTopic: ${self:custom.snsTradeTopic}
reservedConcurrency: 5
# stores the received message in the trade table
trade-store:
memorySize: 128
handler: freqtrade/aws/trade.store
events:
- sns: ${self:custom.snsTradeTopic}
environment:
tradeTable: ${self:custom.tradeTable}
# stores the received message in the trade table
# query aggregates by day and ticker for all strategies
trade-aggregate:
memorySize: 128
handler: freqtrade/aws/trade.get_aggregated_trades
@ -234,21 +285,21 @@ functions:
environment:
tradeTable: ${self:custom.tradeTable}
#submits a new strategy to the system
submit_github:
reservedConcurrency: 5
### SNS TRIGGERED FUNCTIONS
# stores the received message in the trade table
trade-store:
memorySize: 128
handler: freqtrade/aws/strategy.submit_github
handler: freqtrade/aws/trade.store
events:
- http:
path: strategies/submit/github
method: post
cors: true
- sns: ${self:custom.snsTradeTopic}
environment:
topic: ${self:custom.snsTopic}
strategyTable: ${self:custom.strategyTable}
tradeTable: ${self:custom.tradeTable}
reservedConcurrency: 1
#backtests the strategy
#should be switched to utilze aws fargate instead
#and running a container
@ -275,7 +326,7 @@ functions:
events:
- schedule:
rate: rate(580 minutes)
rate: rate(1440 minutes)
enabled: true
environment: