-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmessage_broker.py
More file actions
110 lines (98 loc) · 4.6 KB
/
message_broker.py
File metadata and controls
110 lines (98 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
'''
Written by Debojit Kaushik (Timestamp)
'''
import os
import sys
import traceback
import pika
import json
from flow import Speech2TextRequest, Text2CodeRequest, HEADERS, PARAMS
from actions import Action
class MessageBroker:
@staticmethod
def send_message(queue_ID, json_data={"Message": None}):
"""Method to send/publish messages on the queue."""
try:
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue_ID)
channel.basic_publish(body = json.dumps(json_data), routing_key=queue_ID, exchange = "")
connection.close()
except Exception:
print(traceback.format_exc())
@staticmethod
def receive_callback(ch, method, properties, body):
try:
print("Received a message.")
#Need to initialise Python engine.
action_dic = json.loads(body)
if action_dic["action"] == "init":
MessageBroker.send_message("py_to_ele", json_data = json.dumps({'status': 'Listening'}))
res = Speech2TextRequest._create_to_text_request()
res = Text2CodeRequest._create_to_code_request(res, HEADERS, PARAMS)
action_data = Action.get_action(res)
print("sending message..", action_data)
if action_data != None:
MessageBroker.send_message("py_to_ele", json.dumps(action_data))
MessageBroker.send_message("py_to_ext", json.dumps(action_data))
else:
MessageBroker.send_message("py_to_ele", json.dumps({"status":"Invalid query"}))
elif action_dic["action"] == "init_freeflow":
MessageBroker.send_message("py_to_ele", json_data = json.dumps({'status': 'Listening'}))
res = Speech2TextRequest._create_to_text_request()
print(res)
if res != None:
if "return" in res:
action_data = {
"status": "Free flow",
"action": "return",
"data": {
"args": res.split("return ")[1:]
}
}
elif "print" in res:
action_data = {
"status": "Free flow",
"action": "print",
"data": {
"args": res.split("print ")[1:]
}
}
else:
ops = {"plus": "+", "minus": "-", "multiply": "*", "divide": "/", "less than": "<", "less than or equal to": "<=", "greater than": ">", "greater than or equal to": ">=","< or equal to":"<=","> or equal to":">="}
for i in ops.keys():
res = res.replace(i, ops[i])
action_data = {
"status": "Free flow",
"action": "arithmetic",
"data": {
"args": res
}
}
print("sending message...", action_data)
if action_data != None:
MessageBroker.send_message("py_to_ele", json.dumps(action_data))
MessageBroker.send_message("py_to_ext", json.dumps(action_data))
else:
MessageBroker.send_message("py_to_ele", json.dumps({"status": "Invalid query"}))
except Exception:
print(traceback.format_exc())
@staticmethod
def receive_message(queue_ID):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue_ID)
channel.basic_consume(MessageBroker.receive_callback, queue = queue_ID, no_ack = True)
print("Starting to listen...")
channel.start_consuming()
except Exception:
print(traceback.format_exc())
if __name__ == '__main__':
try:
# print("Hey")
# message = {"first_name": "Debojit", "last_name": "Kaushik", "Age": 27}
# MessageBroker.send_message("py_to_js", json_data = json.dumps(message))
MessageBroker.receive_message("ele_to_py")
except Exception:
print(traceback.format_exc())