Skip to content

Commit

Permalink
fix none replica.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
1 parent 836eb4a commit ff8a59b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
4 changes: 3 additions & 1 deletion kafka_server/broker/file/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ class Read:
_instance = None

def __new__(cls, partition: str, replica: str):
if cls._instance is None or (cls._instance.replica is None and replica is not None):
if cls._instance is None:
with cls._instances_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.replica = replica
if replica is not None and cls._instance.replica is None:
cls._instance.segment = Segment(partition, replica)
return cls._instance

def __init__(self, partition: str, replica: str):
Expand Down
6 changes: 5 additions & 1 deletion kafka_server/broker/file/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ class Segment:

def __new__(cls, partition: str, replica: str):
with cls._instances_lock:
if partition not in cls._instances or (cls._instances[partition].replica is None and replica is not None):
if partition not in cls._instances:
cls._instances[partition] = super(Segment, cls).__new__(cls)
cls._instances[partition].partition = partition
cls._instances[partition].indexer = Indexer(partition, replica)
cls._instances[partition].replica = replica

if replica is not None and cls._instances[partition].replica is None:
cls._instances[partition].indexer = Indexer(partition, replica)

return cls._instances[partition]

def append(self, key: str, value: str):
Expand Down

0 comments on commit ff8a59b

Please sign in to comment.