stable/freqtrade/aws/service/Queue.py
2018-05-19 15:38:33 -07:00

48 lines
1.3 KiB
Python

import simplejson as json
import os
import boto3
class Queue:
"""
abstraction of the underlaying queuing system to schedule a message to the backend for processing
"""
def submit(self, object, routingKey):
"""
submits the given object to the queue associated with the
routing key.
The routing lambda function will than make sure it will be delivered to the right destination
:param object:
:param routingKey:
:return:
"""
# get topic refrence
client = boto3.client('sns')
# if topic exists, we just reuse it
topic_arn = client.create_topic(Name=os.environ['topic'])['TopicArn']
serialized = json.dumps(object, use_decimal=True)
# submit item to queue for routing to the correct persistence
result = client.publish(
TopicArn=topic_arn,
Message=json.dumps({'default': serialized}),
Subject="route:" + routingKey,
MessageStructure='json',
MessageAttributes={
'route': {
'DataType': 'String',
'StringValue': routingKey
}
},
)
return {
"statusCode": result['ResponseMetadata']['HTTPStatusCode'],
"body": serialized
}