Skip to content

Commit

Permalink
fix none replica.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
1 parent ff8a59b commit d1ca202
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 17 deletions.
13 changes: 5 additions & 8 deletions kafka_server/broker/file/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@
class Read:
_instances_lock = threading.Lock()
_read_lock = threading.Lock()
_instance = None
_instances = {}

def __new__(cls, partition: str, replica: str):
if cls._instance is None:
if f"{partition}-{replica}" not in cls._instances[f"{partition}-{replica}"]:
with cls._instances_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.replica = replica
if replica is not None and cls._instance.replica is None:
cls._instance.segment = Segment(partition, replica)
return cls._instance
cls._instances[f"{partition}-{replica}"] = super().__new__(cls)

return cls._instances[f"{partition}-{replica}"]

def __init__(self, partition: str, replica: str):
self.subscribers = None
Expand Down
14 changes: 5 additions & 9 deletions kafka_server/broker/file/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@ class Segment:

def __new__(cls, partition: str, replica: str):
with cls._instances_lock:
if partition not in cls._instances:
cls._instances[partition] = super(Segment, cls).__new__(cls)
cls._instances[partition].partition = partition
cls._instances[partition].indexer = Indexer(partition, replica)
cls._instances[partition].replica = replica
if f"{partition}-{replica}" not in cls._instances:
cls._instances[f"{partition}-{replica}"] = super(Segment, cls).__new__(cls)
cls._instances[f"{partition}-{replica}"].partition = partition
cls._instances[f"{partition}-{replica}"].indexer = Indexer(partition, replica)

if replica is not None and cls._instances[partition].replica is None:
cls._instances[partition].indexer = Indexer(partition, replica)

return cls._instances[partition]
return cls._instances[f"{partition}-{replica}"]

def append(self, key: str, value: str):
try:
Expand Down

0 comments on commit d1ca202

Please sign in to comment.