Skip to content

Commit

Permalink
get subscriptions on each req.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
1 parent b63f9f0 commit 021d2b4
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions kafka_server/broker/file/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ def __new__(cls, *args, **kwargs):
return cls._instance

def __init__(self, partition: str, replica: str):
self.subscribers = None
if not hasattr(self, 'initialized'):
self.partition = partition
self.message_in_fly = False
self.message_in_fly_since = datetime.now()
self.segment = Segment(partition, replica)
self.subscribers = self.get_subscribers()
self.initialized = True

self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly)
Expand All @@ -51,9 +51,7 @@ def toggle_message_in_fly(self):
time.sleep(5)

def read_data(self):
if len(self.subscribers) == 0:
self.subscribers = self.get_subscribers()

self.subscribers = self.get_subscribers()
if len(self.subscribers) == 0:
print("No subscribers")
return None, None
Expand Down Expand Up @@ -166,9 +164,7 @@ def choose_subscriber(self):
for i, key in enumerate(self.subscribers.keys()):
id_to_key[i] = key

print("id_to_key", id_to_key)
chosen_key = id_to_key[read_index % subscriber_count]
print("chosen_key", chosen_key)
return chosen_key, self.subscribers[chosen_key]

def load_message_in_fly(self):
Expand Down

0 comments on commit 021d2b4

Please sign in to comment.