From 8fe1c304dae9e452680ef69bd1ebcfd7de03b964 Mon Sep 17 00:00:00 2001 From: Karl Rister Date: Thu, 7 Dec 2023 17:03:40 -0600 Subject: [PATCH] add personal streams for targeted message delivery to improve scalability - this allows user messages to be deliverd to the recipient and the recipient only which prevents participants from having to do needless processing on messages not intended for them --- roadblock.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/roadblock.py b/roadblock.py index 3223be8..371a1a3 100644 --- a/roadblock.py +++ b/roadblock.py @@ -574,6 +574,7 @@ def define_msg_schema(self): "global-bus-created", "leader-bus-created", "followers-bus-created", + "personal-bus-created", "timeout-ts", "initialized", "switch-buses", @@ -701,12 +702,16 @@ def send_user_messages(self): self.logger.info("Sending user requested messages") user_msg_counter = 1 for user_msg in self.user_messages: + bus_name = "global" + if user_msg["recipient"]["id"] != "all": + bus_name = user_msg["recipient"]["id"] + if "user-string" in user_msg: self.logger.info("Sending user message %d: 'user-string'", user_msg_counter) - self.message_publish("global", self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-string", user_msg["user-string"])) + self.message_publish(bus_name, self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-string", user_msg["user-string"])) elif "user-object" in user_msg: self.logger.info("Sending user message %d: 'user-object'", user_msg_counter) - self.message_publish("global", self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-object", user_msg["user-object"])) + self.message_publish(bus_name, self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-object", user_msg["user-object"])) user_msg_counter += 1 @@ -730,7 +735,7 @@ def message_handle (self, message): msg_command = self.message_get_command(message) - if msg_command in ("global-bus-created", "leader-bus-created", "followers-bus-created"): + if msg_command in ("global-bus-created", "leader-bus-created", "followers-bus-created", "personal-bus-created"): self.logger.info("Received '%s' message", msg_command) elif msg_command == "timeout-ts": self.logger.info("Received 'timeout-ts' message") @@ -971,6 +976,9 @@ def message_publish(self, message_bus, message): ret_val = 0 counter = 0 + + self.logger.debug("Attempting to publish message '%s' on bus '%s'", message, message_bus) + while ret_val == 0: if self.rc != 0: self.logger.debug("self.rc != 0 --> breaking") @@ -1126,7 +1134,14 @@ def cleanup(self): self.key_delete(self.roadblock_uuid) self.key_delete(self.roadblock_uuid + "__initialized") + buses_to_clean = [] for bus_name in ( "global", "leader", "followers" ): + buses_to_clean.append(bus_name) + buses_to_clean.append(self.roadblock_leader_id) + for bus_name in self.roadblock_followers: + buses_to_clean.append(bus_name) + + for bus_name in buses_to_clean: msg_count = self.redcon.xlen(self.roadblock_uuid + "__bus__" + bus_name) self.logger.debug("total messages on bus '%s': %d", bus_name, msg_count) @@ -1584,6 +1599,11 @@ def run_it(self): self.logger.info(".") self.logger.info("Roadblock is initialized") + + # create the personal stream/bus + self.logger.info("Creating personal bus") + self.message_publish(self.my_id, self.message_build_custom(self.roadblock_role, "personal-bus-created", self.roadblock_role, self.my_id, "personal-bus-created")) + if self.roadblock_role == "follower": # tell the leader that I am online self.logger.info("Sending 'follower-online' message") @@ -1596,6 +1616,7 @@ def run_it(self): followers_last_msg_id = 0 leader_last_msg_id = 0 global_last_msg_id = 0 + personal_last_msg_id = 0 while self.watch_bus: if self.rc != 0: self.logger.debug("self.rc != 0 --> breaking") @@ -1605,12 +1626,14 @@ def run_it(self): if self.roadblock_role == "follower": msgs = self.redcon.xread(streams = { self.roadblock_uuid + "__bus__global": global_last_msg_id, - self.roadblock_uuid + "__bus__followers": followers_last_msg_id + self.roadblock_uuid + "__bus__followers": followers_last_msg_id, + self.roadblock_uuid + "__bus__" + self.my_id: personal_last_msg_id }, block = 0) elif self.roadblock_role == "leader": msgs = self.redcon.xread(streams = { self.roadblock_uuid + "__bus__global": global_last_msg_id, - self.roadblock_uuid + "__bus__leader": leader_last_msg_id + self.roadblock_uuid + "__bus__leader": leader_last_msg_id, + self.roadblock_uuid + "__bus__" + self.my_id: personal_last_msg_id }, block = 0) except redis.exceptions.ConnectionError as con_error: self.logger.error("%s", con_error) @@ -1634,6 +1657,8 @@ def run_it(self): leader_last_msg_id = msg_id elif bus_name == self.roadblock_uuid + "__bus__followers": followers_last_msg_id = msg_id + elif bus_name == self.roadblock_uuid + "__bus__" + self.my_id: + personal_last_msg_id = msg_id self.logger.debug("received msg=[%s] with msg_id=[%s] from bus '%s'", msg, msg_id, bus_name)