This repository has been archived by the owner on Jun 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TBConsumer.py
98 lines (77 loc) · 3.61 KB
/
TBConsumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import pika
from train_lib.clients import Consumer, PHTClient
from train_lib.clients.rabbitmq import LOG_FORMAT
import json
from dotenv import load_dotenv, find_dotenv
import os
import logging
from builder.train_builder import TrainBuilder, BuildStatus
from loguru import logger
from builder.messages import BuilderResponse
LOGGER = logging.getLogger(__name__)
class TBConsumer(Consumer):
def __init__(self, amqp_url: str, queue: str = "", routing_key: str = None):
super().__init__(amqp_url, queue, routing_key=routing_key)
self.ampq_url = amqp_url
api_url = os.getenv("UI_TRAIN_API")
if api_url[-1] != "/":
api_url += "/"
vault_url = os.getenv("VAULT_URL")
if vault_url[-1] != "/":
vault_url = vault_url + "/"
self.pht_client = PHTClient(ampq_url=amqp_url, api_url=api_url,
vault_url=vault_url, vault_token=os.getenv("VAULT_TOKEN"))
self.builder = TrainBuilder()
# Set auto reconnect to tr
self.auto_reconnect = True
# Configure routing key
self.ROUTING_KEY = "tb"
def on_message(self, _unused_channel, basic_deliver, properties, body):
try:
message = json.loads(body)
except Exception as e:
logger.error(f"Failed to parse message: {e}")
response = BuilderResponse(type=BuildStatus.FAILED.value, data={"message": "Failed to parse message"})
self.publish_events_for_train(response)
super().on_message(_unused_channel, basic_deliver, properties, body)
return
logger.info(f"Received message: \n {message}")
response = self.builder.process_message(message)
if response:
# post message to train router to notify that the train has been built
if response.type == BuildStatus.FINISHED.value:
# check if the train has been already submitted if not notify the train router via rabbitmq
if not self.builder.redis_store.train_submitted(response.data["id"]):
self.post_message_for_train_router(response.data["id"])
self.publish_events_for_train(response)
super().on_message(_unused_channel, basic_deliver, properties, body)
def publish_events_for_train(self, response: BuilderResponse, exchange: str = "pht",
exchange_type: str = "topic", routing_key: str = "ui.tb.event"):
connection = pika.BlockingConnection(pika.URLParameters(self.ampq_url))
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
json_message = response.json().encode("utf-8")
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json_message)
logger.debug(f"Published message: {json_message}")
connection.close()
def post_message_for_train_router(self, train_id: str):
"""
Notifies the train router via RabbitMQ that the train has been built and the route is stored in vault
:param train_id: id of the train that has been built
:return:
"""
message = {
"type": "trainBuilt",
"data": {
"id": train_id
}
}
self.pht_client.publish_message_rabbit_mq(message, routing_key="tr")
def main():
load_dotenv(find_dotenv())
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
tb_consumer = TBConsumer(os.getenv("AMQP_URL"), "", routing_key="tb")
# os.getenv("UI_TRAIN_API")
tb_consumer.run()
if __name__ == '__main__':
main()