diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index b0eb161..4bec853 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -29,12 +29,12 @@ def __new__(cls, *args, **kwargs): return cls._instance def __init__(self, partition: str, replica: str): + self.subscribers = None if not hasattr(self, 'initialized'): self.partition = partition self.message_in_fly = False self.message_in_fly_since = datetime.now() self.segment = Segment(partition, replica) - self.subscribers = self.get_subscribers() self.initialized = True self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) @@ -51,9 +51,7 @@ def toggle_message_in_fly(self): time.sleep(5) def read_data(self): - if len(self.subscribers) == 0: - self.subscribers = self.get_subscribers() - + self.subscribers = self.get_subscribers() if len(self.subscribers) == 0: print("No subscribers") return None, None @@ -166,9 +164,7 @@ def choose_subscriber(self): for i, key in enumerate(self.subscribers.keys()): id_to_key[i] = key - print("id_to_key", id_to_key) chosen_key = id_to_key[read_index % subscriber_count] - print("chosen_key", chosen_key) return chosen_key, self.subscribers[chosen_key] def load_message_in_fly(self):