diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index 46f9ebd..e470b06 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -158,6 +158,7 @@ def replica_down(): @app.route('/pull', methods=['GET']) def pull(): try: + print("replica url", get_replica_url()) read = Read(get_primary_partition(), get_replica_url()) key, value = read.pull_data() return jsonify({'key': key, 'value': value}), 200 diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 69dcaaa..7476ff9 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -41,9 +41,7 @@ def get_write(self): def inc_read(self): with self._read_lock: self._read += 1 - print("save read index\n\n") self._save_variable(self._read, 'read') - print("send read index\n\n") self.send_to_replica() def get_read(self): @@ -105,13 +103,13 @@ def __dir_path(self) -> str: ) def send_to_replica(self): - print("sending to replica started!!!\n\n\n\n") if self.replica is None: print("No replica found /n/n/n") return url = f'{self.replica}/replica/index' data = {'partition': self.partition, 'read': self._read, 'sync': self._sync} + print(data, "to Replica") response = requests.post(url, json=data) if response.status_code != 200: raise Exception(f'indexed not yet updated {response}') diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 18b8ab7..0227972 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -29,15 +29,18 @@ def __new__(cls, partition: str, replica: str): return cls._instances[f"{partition}-{replica}"] def __init__(self, partition: str, replica: str): - self.subscribers = None - self.partition = partition - self.message_in_fly = False - self.message_in_fly_since = datetime.now() - self.segment = Segment(partition, replica) + if not hasattr(self, 'initialized'): + self.subscribers = None + self.partition = partition + self.message_in_fly = False + self.message_in_fly_since = datetime.now() + self.segment = Segment(partition, replica) + + self.initialized = True - self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) - self.toggle_thread.daemon = True - self.toggle_thread.start() + self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) + self.toggle_thread.daemon = True + self.toggle_thread.start() def toggle_message_in_fly(self): while True: @@ -107,9 +110,9 @@ def pull_data(self): def ack_message(self): if self.message_in_fly: with self._read_lock: - self.segment.approve_reading() self.message_in_fly = False self.save_message_in_fly() + self.segment.approve_reading() def check_data_exist(self): if self.segment.get_read_index() >= self.segment.get_write_index(): diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index ffb0987..2a59461 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -55,7 +55,6 @@ def approve_appending(self): def approve_reading(self): try: with self._read_lock: - print("approve_reading \n\n") self.indexer.inc_read() except Exception as e: print(f"Error inc read index: {e}")