From 0e3fd29a3cce0ddf7557a578caf2d21833ee6cbe Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 14:25:10 +0330 Subject: [PATCH] fix. --- kafka_server/broker/controller/produce.py | 1 + kafka_server/broker/file/sync.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index 46f9ebd..56c4d79 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -139,6 +139,7 @@ def broker_down(): data = json.loads(request.data.decode("utf-8")) partition = data['partition'] os.environ['REPLICA_MIRROR_DOWN'] = str(partition) + print(os.environ['REPLICA_MIRROR_DOWN'], "replica mirror down") return jsonify({'status': 'Data written successfully.'}), 200 except Exception as e: print(e) diff --git a/kafka_server/broker/file/sync.py b/kafka_server/broker/file/sync.py index e80e611..1e5b048 100644 --- a/kafka_server/broker/file/sync.py +++ b/kafka_server/broker/file/sync.py @@ -16,14 +16,13 @@ class Sync: _instances_lock = threading.Lock() _sync_lock = threading.Lock() - _instance = None + _instances = None - def __new__(cls, *args, **kwargs): - if cls._instance is None: + def __new__(cls, partition: str, replica: str): + if f"{partition}-{replica}" not in cls._instances: with cls._instances_lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance + cls._instances[f"{partition}-{replica}"] = super().__new__(cls) + return cls._instances[f"{partition}-{replica}"] def __init__(self, partition: str, replica: str): if not hasattr(self, 'initialized'): @@ -33,6 +32,7 @@ def __init__(self, partition: str, replica: str): self.initialized = True def sync_data(self): + print("Syncing data... partition ", self.partition) if not self.check_data_exist(): return None, None @@ -53,8 +53,7 @@ def sync_data(self): def check_data_exist(self): if self.segment.get_sync_index() >= self.segment.get_write_index(): - print(f"No key found {self.segment.get_sync_index()} " - "in {self.segment.get_write_index()}") + print(f"No key found {self.segment.get_sync_index()} in {self.segment.get_write_index()}") return False key, _ = self.segment.read()