Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
1 parent 6168fdb commit 0e3fd29
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
1 change: 1 addition & 0 deletions kafka_server/broker/controller/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def broker_down():
data = json.loads(request.data.decode("utf-8"))
partition = data['partition']
os.environ['REPLICA_MIRROR_DOWN'] = str(partition)
print(os.environ['REPLICA_MIRROR_DOWN'], "replica mirror down")
return jsonify({'status': 'Data written successfully.'}), 200
except Exception as e:
print(e)
Expand Down
15 changes: 7 additions & 8 deletions kafka_server/broker/file/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
class Sync:
_instances_lock = threading.Lock()
_sync_lock = threading.Lock()
_instance = None
_instances = None

def __new__(cls, *args, **kwargs):
if cls._instance is None:
def __new__(cls, partition: str, replica: str):
if f"{partition}-{replica}" not in cls._instances:
with cls._instances_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
cls._instances[f"{partition}-{replica}"] = super().__new__(cls)
return cls._instances[f"{partition}-{replica}"]

def __init__(self, partition: str, replica: str):
if not hasattr(self, 'initialized'):
Expand All @@ -33,6 +32,7 @@ def __init__(self, partition: str, replica: str):
self.initialized = True

def sync_data(self):
print("Syncing data... partition ", self.partition)
if not self.check_data_exist():
return None, None

Expand All @@ -53,8 +53,7 @@ def sync_data(self):

def check_data_exist(self):
if self.segment.get_sync_index() >= self.segment.get_write_index():
print(f"No key found {self.segment.get_sync_index()} "
"in {self.segment.get_write_index()}")
print(f"No key found {self.segment.get_sync_index()} in {self.segment.get_write_index()}")
return False

key, _ = self.segment.read()
Expand Down

0 comments on commit 0e3fd29

Please sign in to comment.