diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index e470b06..46f9ebd 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -158,7 +158,6 @@ def replica_down(): @app.route('/pull', methods=['GET']) def pull(): try: - print("replica url", get_replica_url()) read = Read(get_primary_partition(), get_replica_url()) key, value = read.pull_data() return jsonify({'key': key, 'value': value}), 200 diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 2a59461..09f6a04 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -72,7 +72,6 @@ def approve_sync(self): def read(self): read_index = self.indexer.get_read() - print("read_index", read_index) segment_path = self.read_segment_path() data_file_path = os.path.join(segment_path, f'{read_index}.dat') diff --git a/kafka_server/broker/file/write.py b/kafka_server/broker/file/write.py index e3860ed..b5572e1 100644 --- a/kafka_server/broker/file/write.py +++ b/kafka_server/broker/file/write.py @@ -15,9 +15,9 @@ class Write: _write_lock = threading.Lock() def __new__(cls, partition: str, replica: str = None): - if partition not in cls._instances: - cls._instances[partition] = super(Write, cls).__new__(cls) - return cls._instances[partition] + if f"{partition}-{replica}" not in cls._instances: + cls._instances[f"{partition}-{replica}"] = super(Write, cls).__new__(cls) + return cls._instances[f"{partition}-{replica}"] def __init__(self, partition: str, replica: str): if hasattr(self, 'initialized'): @@ -50,6 +50,10 @@ def replicate_data(self, key: str, value: bytes): return self.segment.approve_appending() def send_to_replica(self, key: str, value: str) -> bool: + if self.replica is None: + print("replica is None, skip it") + return True + url = f'{self.replica}/replica/data' response = requests.post( url,