Skip to content

Commit

Permalink
write without replica.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
1 parent f230513 commit 0cb9c7a
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
1 change: 0 additions & 1 deletion kafka_server/broker/controller/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ def replica_down():
@app.route('/pull', methods=['GET'])
def pull():
try:
print("replica url", get_replica_url())
read = Read(get_primary_partition(), get_replica_url())
key, value = read.pull_data()
return jsonify({'key': key, 'value': value}), 200
Expand Down
1 change: 0 additions & 1 deletion kafka_server/broker/file/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def approve_sync(self):

def read(self):
read_index = self.indexer.get_read()
print("read_index", read_index)
segment_path = self.read_segment_path()

data_file_path = os.path.join(segment_path, f'{read_index}.dat')
Expand Down
10 changes: 7 additions & 3 deletions kafka_server/broker/file/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class Write:
_write_lock = threading.Lock()

def __new__(cls, partition: str, replica: str = None):
if partition not in cls._instances:
cls._instances[partition] = super(Write, cls).__new__(cls)
return cls._instances[partition]
if f"{partition}-{replica}" not in cls._instances:
cls._instances[f"{partition}-{replica}"] = super(Write, cls).__new__(cls)
return cls._instances[f"{partition}-{replica}"]

def __init__(self, partition: str, replica: str):
if hasattr(self, 'initialized'):
Expand Down Expand Up @@ -50,6 +50,10 @@ def replicate_data(self, key: str, value: bytes):
return self.segment.approve_appending()

def send_to_replica(self, key: str, value: str) -> bool:
if self.replica is None:
print("replica is None, skip it")
return True

url = f'{self.replica}/replica/data'
response = requests.post(
url,
Expand Down

0 comments on commit 0cb9c7a

Please sign in to comment.