Skip to content

Commit

Permalink
logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
1 parent a21e49a commit f230513
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 13 deletions.
1 change: 1 addition & 0 deletions kafka_server/broker/controller/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def replica_down():
@app.route('/pull', methods=['GET'])
def pull():
try:
print("replica url", get_replica_url())
read = Read(get_primary_partition(), get_replica_url())
key, value = read.pull_data()
return jsonify({'key': key, 'value': value}), 200
Expand Down
4 changes: 1 addition & 3 deletions kafka_server/broker/file/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def get_write(self):
def inc_read(self):
with self._read_lock:
self._read += 1
print("save read index\n\n")
self._save_variable(self._read, 'read')
print("send read index\n\n")
self.send_to_replica()

def get_read(self):
Expand Down Expand Up @@ -105,13 +103,13 @@ def __dir_path(self) -> str:
)

def send_to_replica(self):
print("sending to replica started!!!\n\n\n\n")
if self.replica is None:
print("No replica found /n/n/n")
return

url = f'{self.replica}/replica/index'
data = {'partition': self.partition, 'read': self._read, 'sync': self._sync}
print(data, "to Replica")
response = requests.post(url, json=data)
if response.status_code != 200:
raise Exception(f'indexed not yet updated {response}')
21 changes: 12 additions & 9 deletions kafka_server/broker/file/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ def __new__(cls, partition: str, replica: str):
return cls._instances[f"{partition}-{replica}"]

def __init__(self, partition: str, replica: str):
self.subscribers = None
self.partition = partition
self.message_in_fly = False
self.message_in_fly_since = datetime.now()
self.segment = Segment(partition, replica)
if not hasattr(self, 'initialized'):
self.subscribers = None
self.partition = partition
self.message_in_fly = False
self.message_in_fly_since = datetime.now()
self.segment = Segment(partition, replica)

self.initialized = True

self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly)
self.toggle_thread.daemon = True
self.toggle_thread.start()
self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly)
self.toggle_thread.daemon = True
self.toggle_thread.start()

def toggle_message_in_fly(self):
while True:
Expand Down Expand Up @@ -107,9 +110,9 @@ def pull_data(self):
def ack_message(self):
if self.message_in_fly:
with self._read_lock:
self.segment.approve_reading()
self.message_in_fly = False
self.save_message_in_fly()
self.segment.approve_reading()

def check_data_exist(self):
if self.segment.get_read_index() >= self.segment.get_write_index():
Expand Down
1 change: 0 additions & 1 deletion kafka_server/broker/file/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def approve_appending(self):
def approve_reading(self):
try:
with self._read_lock:
print("approve_reading \n\n")
self.indexer.inc_read()
except Exception as e:
print(f"Error inc read index: {e}")
Expand Down

0 comments on commit f230513

Please sign in to comment.