From bedec48a3cf0a24f20caada19a5725d21ba53d06 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 19:26:09 +0330 Subject: [PATCH 01/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 4 ++++ kafka_server/coordinator_database/main.py | 7 ++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index d212116..a094041 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -73,11 +73,15 @@ def update_replica_partition_of_a_down_broker(down_broker_id, down_broker_replic def update_brokers_list(broker_url): + print("===========================") + print(broker_url) response_code, all_brokers = broker_database.list_all_brokers() + print(all_brokers) if response_code != 200: raise Exception("Error during getting list of brokers from database") for broker_id in all_brokers.keys(): data = all_brokers[broker_id] + print(data) if broker_url == data: response = requests.post( "http://127.0.0.1:5001/broker/delete", diff --git a/kafka_server/coordinator_database/main.py b/kafka_server/coordinator_database/main.py index 0d53d2f..0d3806c 100644 --- a/kafka_server/coordinator_database/main.py +++ b/kafka_server/coordinator_database/main.py @@ -48,9 +48,10 @@ def list_brokers(): def delete_broker(): data = json.loads(request.data.decode('utf-8')) broker_id = data['broker_id'] - thread = threading.Thread(target=broker.delete_broker, args=(broker_id,)) - thread.start() - return jsonify({"message": "Broker successfully deleted"}), 200 + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(broker.delete_broker, broker_id) + result = future.result() + return jsonify("Successfully deleted") @app.route('/broker/get_replica', methods=['GET']) From e72a788788f1332796bb095bf8bc18a70c79e4a6 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 19:36:50 +0330 Subject: [PATCH 02/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index a094041..fa3f86c 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -79,10 +79,13 @@ def update_brokers_list(broker_url): print(all_brokers) if response_code != 200: raise Exception("Error during getting list of brokers from database") + down_broker_id = None for broker_id in all_brokers.keys(): data = all_brokers[broker_id] print(data) if broker_url == data: + down_broker_id = broker_id + print("########################sending request") response = requests.post( "http://127.0.0.1:5001/broker/delete", data=json.dumps({"broker_id": broker_id}), @@ -90,8 +93,8 @@ def update_brokers_list(broker_url): ) if response.status_code != 200: print(f"Error during sending subscription to broker #{broker_url}") - prepare_updating(all_brokers, broker_id, broker_url) - update_brokers_subscriptions() + prepare_updating(all_brokers, down_broker_id, broker_url) + update_brokers_subscriptions() def check_heartbeat(): @@ -103,7 +106,7 @@ def check_heartbeat(): for key in data.keys(): datetime_seconds = float(data[key]) diff_seconds = datetime.now().timestamp() - datetime_seconds - if diff_seconds > 30: + if diff_seconds > 15: requests.post( "http://127.0.0.1:5001/broker/delete_heartbeat", data=json.dumps({"broker_url": key}), From 8fe4d9bca69ac0a11e68020a213400a0b3604526 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 19:53:54 +0330 Subject: [PATCH 03/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index fa3f86c..aa4d3ba 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -61,22 +61,23 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): def update_replica_partition_of_a_broker_which_is_in_down_broker(broker_url): + print(f"###############sedning request to {broker_url} to aware its replica") r = requests.get(f"{broker_url}/replica/down", timeout=2) + print(r.status_code) if r.status_code != 200: raise Exception("Error in sending request to broker to tell it its replica is down") def update_replica_partition_of_a_down_broker(down_broker_id, down_broker_replica_url): + print(f"###############sedning request to sync replica of down broker# {down_broker_id}:{down_broker_replica_url}") r = requests.post(f"{down_broker_replica_url}/broker/down", data=json.dumps({"partition": down_broker_id})) + print(r.status_code) if r.status_code != 200: raise Exception("Error in sending request to broker which has the replica of a down broker") def update_brokers_list(broker_url): - print("===========================") - print(broker_url) response_code, all_brokers = broker_database.list_all_brokers() - print(all_brokers) if response_code != 200: raise Exception("Error during getting list of brokers from database") down_broker_id = None @@ -85,7 +86,7 @@ def update_brokers_list(broker_url): print(data) if broker_url == data: down_broker_id = broker_id - print("########################sending request") + print(f"########################sending request tp delete broker {broker_id}:{broker_url}") response = requests.post( "http://127.0.0.1:5001/broker/delete", data=json.dumps({"broker_id": broker_id}), From 832770f61a87c5f9c6dfe5ca10e8cee7e652a2d6 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:00:00 +0330 Subject: [PATCH 04/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index aa4d3ba..0adc0fe 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -48,9 +48,11 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): raise Exception("Error during getting list of brokers replicas") # find replica of a broker which is on down broker + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") for broker_id in all_brokers_replicas.keys(): if all_brokers_replicas[broker_id] == down_broker_url: update_replica_partition_of_a_broker_which_is_in_down_broker(all_brokers[broker_id]) + print("@@@@@@@@@@@@@@@@@@@@@@") # update all brokers update_brokers_subscriptions() @@ -83,10 +85,9 @@ def update_brokers_list(broker_url): down_broker_id = None for broker_id in all_brokers.keys(): data = all_brokers[broker_id] - print(data) if broker_url == data: down_broker_id = broker_id - print(f"########################sending request tp delete broker {broker_id}:{broker_url}") + print(f"########################sending request to delete broker {broker_id}:{broker_url}") response = requests.post( "http://127.0.0.1:5001/broker/delete", data=json.dumps({"broker_id": broker_id}), From 57d83919e0d6e651373df3bde4af4797c34cafba Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:04:23 +0330 Subject: [PATCH 05/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 0adc0fe..91233a6 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -50,7 +50,9 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): # find replica of a broker which is on down broker print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") for broker_id in all_brokers_replicas.keys(): + print("hi") if all_brokers_replicas[broker_id] == down_broker_url: + print("hi2") update_replica_partition_of_a_broker_which_is_in_down_broker(all_brokers[broker_id]) print("@@@@@@@@@@@@@@@@@@@@@@") From 209a6655bb3a3f8724b525b862add1885efd5bcb Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:07:53 +0330 Subject: [PATCH 06/91] fix bug of subscribe for broker --- kafka_server/coordinator_database/services/broker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator_database/services/broker.py b/kafka_server/coordinator_database/services/broker.py index 19c1dda..5d025a2 100644 --- a/kafka_server/coordinator_database/services/broker.py +++ b/kafka_server/coordinator_database/services/broker.py @@ -43,9 +43,9 @@ def init_brokers_replicas_file(): with open(path, 'w', encoding='utf8') as f: replicas = { "replica": { - "1": "http://185.226.116.193:5678", - "2": "http://185.226.116.193:5679", - "3": "http://185.226.116.193:5680" + "1": "http://5.34.192.132:5678", + "2": "http://5.34.192.132:5679", + "3": "http://5.34.192.132:5680" } } f.write(json.dumps(replicas)) From c5d25302ff645cd44c2bf3eae15387acc927819a Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:10:55 +0330 Subject: [PATCH 07/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 91233a6..7ea712a 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -51,6 +51,8 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") for broker_id in all_brokers_replicas.keys(): print("hi") + print("a ", all_brokers_replicas[broker_id]) + print("b ", down_broker_url) if all_brokers_replicas[broker_id] == down_broker_url: print("hi2") update_replica_partition_of_a_broker_which_is_in_down_broker(all_brokers[broker_id]) From 93d5b60ef29f58caaef60c7ec5b5c531a1bcd616 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:21:34 +0330 Subject: [PATCH 08/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 7ea712a..85ef836 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -54,6 +54,7 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): print("a ", all_brokers_replicas[broker_id]) print("b ", down_broker_url) if all_brokers_replicas[broker_id] == down_broker_url: + print(broker_id) print("hi2") update_replica_partition_of_a_broker_which_is_in_down_broker(all_brokers[broker_id]) print("@@@@@@@@@@@@@@@@@@@@@@") From f489810fc4268ed000819d3bde946a79d1b6975b Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:24:38 +0330 Subject: [PATCH 09/91] fix bug of subscribe for broker --- kafka_server/coordinator_database/services/broker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator_database/services/broker.py b/kafka_server/coordinator_database/services/broker.py index 5d025a2..2bca2be 100644 --- a/kafka_server/coordinator_database/services/broker.py +++ b/kafka_server/coordinator_database/services/broker.py @@ -43,9 +43,9 @@ def init_brokers_replicas_file(): with open(path, 'w', encoding='utf8') as f: replicas = { "replica": { - "1": "http://5.34.192.132:5678", - "2": "http://5.34.192.132:5679", - "3": "http://5.34.192.132:5680" + "1": "http://5.34.192.132:5679", + "2": "http://5.34.192.132:5670", + "3": "http://5.34.192.132:5678" } } f.write(json.dumps(replicas)) From 1e344910932d26b1f7031f39de7f829b331036cb Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:28:41 +0330 Subject: [PATCH 10/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 85ef836..bc02caf 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -69,7 +69,7 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): def update_replica_partition_of_a_broker_which_is_in_down_broker(broker_url): print(f"###############sedning request to {broker_url} to aware its replica") - r = requests.get(f"{broker_url}/replica/down", timeout=2) + r = requests.post(f"{broker_url}/replica/down", timeout=2) print(r.status_code) if r.status_code != 200: raise Exception("Error in sending request to broker to tell it its replica is down") From 79ce64d33d19fb3d3ba1e07167216e70f215c12c Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 20:31:59 +0330 Subject: [PATCH 11/91] fix bug of subscribe for broker --- kafka_server/coordinator_database/services/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator_database/services/broker.py b/kafka_server/coordinator_database/services/broker.py index 2bca2be..2b777a9 100644 --- a/kafka_server/coordinator_database/services/broker.py +++ b/kafka_server/coordinator_database/services/broker.py @@ -44,7 +44,7 @@ def init_brokers_replicas_file(): replicas = { "replica": { "1": "http://5.34.192.132:5679", - "2": "http://5.34.192.132:5670", + "2": "http://5.34.192.132:5680", "3": "http://5.34.192.132:5678" } } From 4c7f18b9b465f2d17ec4bd3da34e2f9f23900591 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:25:47 +0330 Subject: [PATCH 12/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/broker/api.py | 20 ++++++++++--------- .../coordinator/services/client/subscribe.py | 3 +++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/kafka_server/coordinator/api/broker/api.py b/kafka_server/coordinator/api/broker/api.py index 34f5ea7..e5b5be2 100644 --- a/kafka_server/coordinator/api/broker/api.py +++ b/kafka_server/coordinator/api/broker/api.py @@ -2,6 +2,7 @@ import json import os import sys +import random from coordinator.services.broker import subscribe as broker_subscriber_service from coordinator.services.broker import database as broker_database @@ -31,16 +32,17 @@ def init_broker(): if response_code != 200: return jsonify("Error during initializing broker"), response_code - # if len(all_brokers) > 1: - # keys = all_brokers.keys() - # key = random.choice(list(keys)) - # replica_url = all_brokers[key] - # response_code = broker_database.add_replica_for_broker(broker_id, replica_url) - # if response_code != 200: - # return jsonify("Error during initializing broker"), response_code - # return jsonify(replica_url), 200 + replica_url = None + if broker_id not in all_brokers_replicas: + keys = all_brokers.keys() + key = random.choice(list(keys)) + replica_url = all_brokers[key] + response_code = broker_database.add_replica_for_broker(broker_id, replica_url) + if response_code != 200: + return jsonify("Error during initializing broker"), response_code + else: + replica_url = all_brokers_replicas[broker_id] - replica_url = all_brokers_replicas[broker_id] partition_count = len(all_brokers) master_coordinator_url = config.MASTER_COORDINATOR_URL backup_coordinator_url = config.BACKUP_COORDINATOR_URL diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index 05cae14..9145fca 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -34,11 +34,14 @@ def check_heartbeat(): datetime_seconds = float(data[key]) diff_seconds = datetime.now().timestamp() - datetime_seconds if diff_seconds > 30: + print("##############delete client heartbeat") requests.post( "http://127.0.0.1:5001/client/delete_heartbeat", data=json.dumps({"client_url": key}), timeout=2, ) + + print("##############delete all subscriptions for client") requests.post( "http://127.0.0.1:5001/subscribe/delete", data=json.dumps({"client_url": key}), From 921a97bf1ba14827a6de2031ea520794b62f3d01 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:33:33 +0330 Subject: [PATCH 13/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/client/subscribe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index 9145fca..fd59ce3 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -22,12 +22,13 @@ def update_brokers_subscription_plan(): def check_heartbeat(): + print("##############checing heartbeats") response = requests.get( 'http://127.0.0.1:5001/client/list_all_heartbeats', timeout=2, ) data = response.json() - + print(data) if len(data) == 0: return for key in data.keys(): From 6310565c7872d8c9296736b5b4d1f9baec8831d5 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:41:31 +0330 Subject: [PATCH 14/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index addbcaa..a494a9e 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -55,10 +55,11 @@ def subscribe(): if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code - min_length = 10000 + min_length = 1000000 selected_broker_id = None for key in response_data.keys(): if len(response_data[key]) < min_length: + min_length = len(response_data[key]) selected_broker_id = key broker_data = f"{selected_broker_id}:{response_data[selected_broker_id]}" From 7907cfb43f30e45a453386ea98a2669882ba48a2 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:45:22 +0330 Subject: [PATCH 15/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index a494a9e..e66f9c2 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -58,8 +58,8 @@ def subscribe(): min_length = 1000000 selected_broker_id = None for key in response_data.keys(): - if len(response_data[key]) < min_length: - min_length = len(response_data[key]) + if len(all_subscriptions[key]) < min_length: + min_length = len(all_subscriptions[key]) selected_broker_id = key broker_data = f"{selected_broker_id}:{response_data[selected_broker_id]}" From 65ef92850708eab0b1acefe373ae6c8ea605a95f Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:50:50 +0330 Subject: [PATCH 16/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index e66f9c2..b69eae6 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -51,19 +51,23 @@ def subscribe(): if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code - response_code, response_data = broker_database.list_all_brokers() + response_code, all_brokers = broker_database.list_all_brokers() if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code min_length = 1000000 selected_broker_id = None - for key in response_data.keys(): + + for key in all_brokers.keys(): + if key not in all_subscriptions.keys(): + selected_broker_id = key + break if len(all_subscriptions[key]) < min_length: min_length = len(all_subscriptions[key]) selected_broker_id = key - broker_data = f"{selected_broker_id}:{response_data[selected_broker_id]}" - broker_url = response_data[selected_broker_id] + broker_data = f"{selected_broker_id}:{all_brokers[selected_broker_id]}" + broker_url = all_brokers[selected_broker_id] tmp_dict = {} if broker_data in all_subscriptions: From abdad5104b854aee56b16cbb9894d69377356844 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Wed, 14 Feb 2024 23:54:39 +0330 Subject: [PATCH 17/91] remove cast to str. --- kafka_server/broker/file/write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/broker/file/write.py b/kafka_server/broker/file/write.py index 7f23fef..4c72e2c 100644 --- a/kafka_server/broker/file/write.py +++ b/kafka_server/broker/file/write.py @@ -32,7 +32,7 @@ def write_data(self, key: str, value: bytes): with self._write_lock: md5 = hash_md5(key) partition_count = get_partition_count() - if int(str(md5), 16) % partition_count != int(self.partition) - 1: + if int(md5, 16) % partition_count != int(self.partition) - 1: raise Exception(f"key is not for this partition, fuck {md5} {key}") appended = self.segment.append(key, value) From bb70a800ce306e7a826e01914d7aebbb5b704bf3 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:54:45 +0330 Subject: [PATCH 18/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index b69eae6..dd80676 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -57,8 +57,11 @@ def subscribe(): min_length = 1000000 selected_broker_id = None - + print("#######find broker for subscriber") for key in all_brokers.keys(): + print(key) + print(all_brokers) + print(all_subscriptions) if key not in all_subscriptions.keys(): selected_broker_id = key break From 5d87785788877cf5fb034124b18e98cd6aa4ecb3 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Wed, 14 Feb 2024 23:59:25 +0330 Subject: [PATCH 19/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index dd80676..2cde4d4 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -62,11 +62,13 @@ def subscribe(): print(key) print(all_brokers) print(all_subscriptions) - if key not in all_subscriptions.keys(): + if f"{key}:{all_brokers[key]}" not in all_subscriptions.keys(): + print("check by first if") selected_broker_id = key break - if len(all_subscriptions[key]) < min_length: - min_length = len(all_subscriptions[key]) + if len(all_subscriptions[f"{key}:{all_brokers[key]}"]) < min_length: + print("check by second if") + min_length = len(all_subscriptions[f"{key}:{all_brokers[key]}"]) selected_broker_id = key broker_data = f"{selected_broker_id}:{all_brokers[selected_broker_id]}" From 0f23aed8026c40007324f1d9772c54c0bd5c854d Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 02:20:28 +0330 Subject: [PATCH 20/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 3 - .../coordinator/services/broker/subscribe.py | 54 ++++++++++-------- .../coordinator/services/client/subscribe.py | 57 ++++++++++--------- 3 files changed, 60 insertions(+), 54 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 2cde4d4..83c840c 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -59,9 +59,6 @@ def subscribe(): selected_broker_id = None print("#######find broker for subscriber") for key in all_brokers.keys(): - print(key) - print(all_brokers) - print(all_subscriptions) if f"{key}:{all_brokers[key]}" not in all_subscriptions.keys(): print("check by first if") selected_broker_id = key diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index bc02caf..db4a84a 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -30,16 +30,19 @@ def update_brokers_subscriptions(): if response_code != 200: raise Exception("Error during getting list of clients from database") - for client_url in all_clients: - requests.post( - f"{client_url}/subscription", - data=json.dumps({"brokers": all_brokers}), - timeout=2, - ) + try: + for client_url in all_clients: + requests.post( + f"{client_url}/subscription", + data=json.dumps({"brokers": all_brokers}), + timeout=2, + ) - for broker_id in all_brokers.keys(): - requests.post(f"{all_brokers[broker_id]}/subscription", data=json.dumps({"brokers": all_brokers}), - headers={"Content-Type": "application/json"}, timeout=2) + for broker_id in all_brokers.keys(): + requests.post(f"{all_brokers[broker_id]}/subscription", data=json.dumps({"brokers": all_brokers}), + headers={"Content-Type": "application/json"}, timeout=2) + except Exception as e: + print(str(e)) def prepare_updating(all_brokers, down_broker_id, down_broker_url): @@ -105,21 +108,24 @@ def update_brokers_list(broker_url): def check_heartbeat(): - response = requests.get('http://127.0.0.1:5001/broker/list_all_heartbeats', timeout=2) - data = response.json() - - if len(data) == 0: - return - for key in data.keys(): - datetime_seconds = float(data[key]) - diff_seconds = datetime.now().timestamp() - datetime_seconds - if diff_seconds > 15: - requests.post( - "http://127.0.0.1:5001/broker/delete_heartbeat", - data=json.dumps({"broker_url": key}), - timeout=2, - ) - update_brokers_list(key) + try: + response = requests.get('http://127.0.0.1:5001/broker/list_all_heartbeats', timeout=2) + data = response.json() + + if len(data) == 0: + return + for key in data.keys(): + datetime_seconds = float(data[key]) + diff_seconds = datetime.now().timestamp() - datetime_seconds + if diff_seconds > 15: + requests.post( + "http://127.0.0.1:5001/broker/delete_heartbeat", + data=json.dumps({"broker_url": key}), + timeout=2, + ) + update_brokers_list(key) + except Exception as e: + print(str(e)) def run_check_heartbeat_job(): diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index fd59ce3..9a79928 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -22,33 +22,36 @@ def update_brokers_subscription_plan(): def check_heartbeat(): - print("##############checing heartbeats") - response = requests.get( - 'http://127.0.0.1:5001/client/list_all_heartbeats', - timeout=2, - ) - data = response.json() - print(data) - if len(data) == 0: - return - for key in data.keys(): - datetime_seconds = float(data[key]) - diff_seconds = datetime.now().timestamp() - datetime_seconds - if diff_seconds > 30: - print("##############delete client heartbeat") - requests.post( - "http://127.0.0.1:5001/client/delete_heartbeat", - data=json.dumps({"client_url": key}), - timeout=2, - ) - - print("##############delete all subscriptions for client") - requests.post( - "http://127.0.0.1:5001/subscribe/delete", - data=json.dumps({"client_url": key}), - timeout=2, - ) - # update_brokers_subscription_plan() + try: + print("##############checing heartbeats") + response = requests.get( + 'http://127.0.0.1:5001/client/list_all_heartbeats', + timeout=2, + ) + data = response.json() + print(data) + if len(data) == 0: + return + for key in data.keys(): + datetime_seconds = float(data[key]) + diff_seconds = datetime.now().timestamp() - datetime_seconds + if diff_seconds > 30: + print("##############delete client heartbeat") + requests.post( + "http://127.0.0.1:5001/client/delete_heartbeat", + data=json.dumps({"client_url": key}), + timeout=2, + ) + + print("##############delete all subscriptions for client") + requests.post( + "http://127.0.0.1:5001/subscribe/delete", + data=json.dumps({"client_url": key}), + timeout=2, + ) + # update_brokers_subscription_plan() + except Exception as e: + print(str(e)) def run_check_heartbeat_job(): From fa484aca6dc73fee7c48dd1d180ca60d60ae3db0 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:25:53 +0330 Subject: [PATCH 21/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 101 ++++++++++++------ .../coordinator/services/broker/subscribe.py | 5 + kafka_server/coordinator_database/main.py | 14 +++ .../services/subscribe.py | 9 ++ 4 files changed, 99 insertions(+), 30 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 83c840c..0aded68 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -55,37 +55,78 @@ def subscribe(): if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code - min_length = 1000000 - selected_broker_id = None - print("#######find broker for subscriber") - for key in all_brokers.keys(): - if f"{key}:{all_brokers[key]}" not in all_subscriptions.keys(): - print("check by first if") - selected_broker_id = key - break - if len(all_subscriptions[f"{key}:{all_brokers[key]}"]) < min_length: - print("check by second if") - min_length = len(all_subscriptions[f"{key}:{all_brokers[key]}"]) - selected_broker_id = key - - broker_data = f"{selected_broker_id}:{all_brokers[selected_broker_id]}" - broker_url = all_brokers[selected_broker_id] - - tmp_dict = {} - if broker_data in all_subscriptions: - for subs in all_subscriptions[broker_data]: - tmp_dict[subs[1]] = subs[0] - else: - tmp_dict[random_id] = client_addr - response_code = broker_subscribe_service.send_subscribe_to_broker(broker_url, tmp_dict) + all_subscribers = [[client_addr, random_id]] + for broker_id_url in all_subscriptions.keys(): + for sub in all_subscribers[broker_id_url]: + all_subscribers.append(sub) + + tmp_subscriptions = {} + all_subscribers_length = len(all_subscribers) + all_brokers_length = len(all_brokers) + ex = all_subscribers_length // all_brokers_length + j = 0 + for broker_id in all_brokers: + i = ex + while i > 0: + if f"{broker_id}:{all_subscriptions[broker_id]}" not in tmp_subscriptions: + tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"] = [] + tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"].append(all_subscribers[j]) + j += 1 + i -= 1 + + for index in range(j, len(all_subscribers)): + for broker_id in all_brokers: + if f"{broker_id}:{all_subscriptions[broker_id]}" not in tmp_subscriptions: + tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"] = [] + tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"].append(all_subscribers[j]) + + all_brokers_for_client = [] + for t_s in tmp_subscriptions.keys(): + for tupl in tmp_subscriptions[t_s]: + if client_addr in tupl: + all_brokers_for_client.append(t_s) + + for broker_id in all_brokers.keys(): + broker_subscribe_service.send_subscribe_to_broker(all_brokers[broker_id], tmp_subscriptions[ + f"{broker_id}:{all_subscriptions[broker_id]}"]) + + response_code = broker_subscribe_service.write_subscriptions(tmp_subscriptions) if response_code != 200: - return jsonify("Error during sending subscription to broker"), response_code - - response_code = client_database.add_subscription_plan(broker_data, client_addr, random_id) - if response_code != 200: - return jsonify("Error during adding subscription to database"), response_code - - return jsonify({"broker_url": broker_data, "id": random_id}), 200 + return jsonify("Error during finding broker for subscribe"), response_code + + return jsonify(all_brokers_for_client), 200 + + # min_length = 1000000 + # selected_broker_id = None + # print("#######find broker for subscriber") + # for key in all_brokers.keys(): + # if f"{key}:{all_brokers[key]}" not in all_subscriptions.keys(): + # print("check by first if") + # selected_broker_id = key + # break + # if len(all_subscriptions[f"{key}:{all_brokers[key]}"]) < min_length: + # print("check by second if") + # min_length = len(all_subscriptions[f"{key}:{all_brokers[key]}"]) + # selected_broker_id = key + + # broker_data = f"{selected_broker_id}:{all_brokers[selected_broker_id]}" + # broker_url = all_brokers[selected_broker_id] + + # tmp_dict = {} + # if broker_data in all_subscriptions: + # for subs in all_subscriptions[broker_data]: + # tmp_dict[subs[1]] = subs[0] + # else: + # tmp_dict[random_id] = client_addr + # response_code = broker_subscribe_service.send_subscribe_to_broker(broker_url, tmp_dict) + # if response_code != 200: + # return jsonify("Error during sending subscription to broker"), response_code + # + # response_code = client_database.add_subscription_plan(broker_data, client_addr, random_id) + # if response_code != 200: + # return jsonify("Error during adding subscription to database"), response_code + # + # return jsonify({"broker_url": broker_data, "id": random_id}), 200 @api_blueprint.route('/heartbeat', methods=['POST']) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index db4a84a..1f66b09 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -12,6 +12,11 @@ def get_all_subscriptions(): return r.status_code, r.json() +def write_subscriptions(subscriptions): + r = requests.get("http://127.0.0.1:5001//subscribe/write_subscriptions", data=json.dumps(subscriptions), timeout=2) + return r.status_code, r.json() + + def send_subscribe_to_broker(broker_url, data): r = requests.post( f"{broker_url}/subscribers", diff --git a/kafka_server/coordinator_database/main.py b/kafka_server/coordinator_database/main.py index 0d3806c..b0c52c8 100644 --- a/kafka_server/coordinator_database/main.py +++ b/kafka_server/coordinator_database/main.py @@ -195,5 +195,19 @@ def list_of_replicas(): return jsonify(result), 200 +@app.route('/subscribe/write_subscriptions', methods=['POST']) +def write_subscriptions(): + data = json.loads(request.data.decode('utf-8')) + data = data["subscriptions"] + with concurrent.futures.ThreadPoolExecutor() as executor: + try: + future = executor.submit(subscribe.write_subscriptions, data) + result = future.result() + except Exception as e: + logger.error(e) + return jsonify("Error deleting broker heartbeat"), 500 + return jsonify(result), 200 + + if __name__ == '__main__': app.run(port=5001) diff --git a/kafka_server/coordinator_database/services/subscribe.py b/kafka_server/coordinator_database/services/subscribe.py index 27b1155..bf7a074 100644 --- a/kafka_server/coordinator_database/services/subscribe.py +++ b/kafka_server/coordinator_database/services/subscribe.py @@ -55,3 +55,12 @@ def get_all_subscriptions(): json_data = json.load(f) return json_data["subscriptions"] + + +def write_subscriptions(subscriptions): + with lock: + json_data = {} + with open(config.SUBSCRIBER_DATABASE_FILE_PATH, 'w', encoding='utf8') as f: + json_data["subscriptions"] = subscriptions + f.write(json.dumps(json_data)) + f.close() From 21be146a403112135c8f6f59324b1b35bfa8f7e4 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:27:52 +0330 Subject: [PATCH 22/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 0aded68..31a93f3 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -68,7 +68,7 @@ def subscribe(): for broker_id in all_brokers: i = ex while i > 0: - if f"{broker_id}:{all_subscriptions[broker_id]}" not in tmp_subscriptions: + if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"] = [] tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"].append(all_subscribers[j]) j += 1 @@ -76,7 +76,7 @@ def subscribe(): for index in range(j, len(all_subscribers)): for broker_id in all_brokers: - if f"{broker_id}:{all_subscriptions[broker_id]}" not in tmp_subscriptions: + if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"] = [] tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"].append(all_subscribers[j]) From a4a23c42c025297766a38bf231bfc0b57cd48785 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:28:21 +0330 Subject: [PATCH 23/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 31a93f3..1ada368 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -69,16 +69,16 @@ def subscribe(): i = ex while i > 0: if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: - tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"] = [] - tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"].append(all_subscribers[j]) + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j]) j += 1 i -= 1 for index in range(j, len(all_subscribers)): for broker_id in all_brokers: if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: - tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"] = [] - tmp_subscriptions[f"{broker_id}:{all_subscriptions[broker_id]}"].append(all_subscribers[j]) + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j]) all_brokers_for_client = [] for t_s in tmp_subscriptions.keys(): From 44ec905d7dce20890e85bc4906ae0329c0a4f5b1 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:29:41 +0330 Subject: [PATCH 24/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 1ada368..9dbeafb 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -88,7 +88,7 @@ def subscribe(): for broker_id in all_brokers.keys(): broker_subscribe_service.send_subscribe_to_broker(all_brokers[broker_id], tmp_subscriptions[ - f"{broker_id}:{all_subscriptions[broker_id]}"]) + f"{broker_id}:{all_brokers[broker_id]}"]) response_code = broker_subscribe_service.write_subscriptions(tmp_subscriptions) if response_code != 200: From 957915f16468771c6900416ade47c9677079b0df Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:32:23 +0330 Subject: [PATCH 25/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 2 +- kafka_server/coordinator_database/main.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 1f66b09..acfae5a 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -14,7 +14,7 @@ def get_all_subscriptions(): def write_subscriptions(subscriptions): r = requests.get("http://127.0.0.1:5001//subscribe/write_subscriptions", data=json.dumps(subscriptions), timeout=2) - return r.status_code, r.json() + return r.status_code def send_subscribe_to_broker(broker_url, data): diff --git a/kafka_server/coordinator_database/main.py b/kafka_server/coordinator_database/main.py index b0c52c8..738a636 100644 --- a/kafka_server/coordinator_database/main.py +++ b/kafka_server/coordinator_database/main.py @@ -202,11 +202,11 @@ def write_subscriptions(): with concurrent.futures.ThreadPoolExecutor() as executor: try: future = executor.submit(subscribe.write_subscriptions, data) - result = future.result() + _ = future.result() except Exception as e: logger.error(e) return jsonify("Error deleting broker heartbeat"), 500 - return jsonify(result), 200 + return jsonify("Successfully write subscriptions"), 200 if __name__ == '__main__': From d0c8ba28bd85fdcdb35f86ac874830d92f065dcd Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:33:37 +0330 Subject: [PATCH 26/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index acfae5a..b3151ce 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -13,7 +13,7 @@ def get_all_subscriptions(): def write_subscriptions(subscriptions): - r = requests.get("http://127.0.0.1:5001//subscribe/write_subscriptions", data=json.dumps(subscriptions), timeout=2) + r = requests.post("http://127.0.0.1:5001//subscribe/write_subscriptions", data=json.dumps(subscriptions), timeout=2) return r.status_code From d7406759460c8feaf95687c967f79ad04d1554e8 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:35:05 +0330 Subject: [PATCH 27/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index b3151ce..3d7b01a 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -13,7 +13,8 @@ def get_all_subscriptions(): def write_subscriptions(subscriptions): - r = requests.post("http://127.0.0.1:5001//subscribe/write_subscriptions", data=json.dumps(subscriptions), timeout=2) + r = requests.post("http://127.0.0.1:5001//subscribe/write_subscriptions", + data=json.dumps({"subscriptions": subscriptions}), timeout=2) return r.status_code From fedecd2111e18d5de76731083feb011eb3421830 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:37:42 +0330 Subject: [PATCH 28/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 9dbeafb..c9e0f8c 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -94,7 +94,7 @@ def subscribe(): if response_code != 200: return jsonify("Error during finding broker for subscribe"), response_code - return jsonify(all_brokers_for_client), 200 + return jsonify({"id": random_id}), 200 # min_length = 1000000 # selected_broker_id = None From ad40ba40535f2a00e830bc083f5c96e8ce389570 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 04:51:18 +0330 Subject: [PATCH 29/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index c9e0f8c..83fa9e3 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -87,8 +87,12 @@ def subscribe(): all_brokers_for_client.append(t_s) for broker_id in all_brokers.keys(): - broker_subscribe_service.send_subscribe_to_broker(all_brokers[broker_id], tmp_subscriptions[ - f"{broker_id}:{all_brokers[broker_id]}"]) + t = {} + for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]: + t[sub[1]] = sub[0] + print("##########senda subscriptions to broker") + print(t) + broker_subscribe_service.send_subscribe_to_broker(all_brokers[broker_id], t) response_code = broker_subscribe_service.write_subscriptions(tmp_subscriptions) if response_code != 200: From bfa9a95dbcd602019a055352492258ca47975609 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:08:52 +0330 Subject: [PATCH 30/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 2 +- kafka_server/coordinator/services/client/subscribe.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 83fa9e3..0296fbf 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -57,7 +57,7 @@ def subscribe(): all_subscribers = [[client_addr, random_id]] for broker_id_url in all_subscriptions.keys(): - for sub in all_subscribers[broker_id_url]: + for sub in all_subscriptions[broker_id_url]: all_subscribers.append(sub) tmp_subscriptions = {} diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index 9a79928..b550b09 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -12,9 +12,13 @@ def update_brokers_subscription_plan(): raise Exception("Error during getting list of brokers from database") for broker_url in all_subscriptions.keys(): data = all_subscriptions[broker_url] + t = {} + for sub in data: + t[sub[0]] = sub[1] + response = requests.post( - f"http://{broker_url}/subscription-plan", - data=json.dumps({"subscription_plans": data}), + f"http://{broker_url}/subscribers", + data=json.dumps({"subscribers": data}), timeout=2, ) if response.status_code != 200: @@ -49,7 +53,7 @@ def check_heartbeat(): data=json.dumps({"client_url": key}), timeout=2, ) - # update_brokers_subscription_plan() + update_brokers_subscription_plan() except Exception as e: print(str(e)) From 61d0b29fbec194bb3c5469da152e1b081b832a84 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:09:20 +0330 Subject: [PATCH 31/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/client/subscribe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index b550b09..d380112 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -53,7 +53,7 @@ def check_heartbeat(): data=json.dumps({"client_url": key}), timeout=2, ) - update_brokers_subscription_plan() + update_brokers_subscription_plan() except Exception as e: print(str(e)) From cef980bb4bdbb9f58d6abe0992a5372a4316edcf Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:17:32 +0330 Subject: [PATCH 32/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 0296fbf..958af1f 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -64,6 +64,7 @@ def subscribe(): all_subscribers_length = len(all_subscribers) all_brokers_length = len(all_brokers) ex = all_subscribers_length // all_brokers_length + print("###########ex ", ex) j = 0 for broker_id in all_brokers: i = ex @@ -74,12 +75,16 @@ def subscribe(): j += 1 i -= 1 + print("####aghaei\n", tmp_subscriptions) + for index in range(j, len(all_subscribers)): for broker_id in all_brokers: if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j]) + print("####aghaei2\n", tmp_subscriptions) + all_brokers_for_client = [] for t_s in tmp_subscriptions.keys(): for tupl in tmp_subscriptions[t_s]: From 66d916e6f6f660c6fd721dc44fad87375d146b6f Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:22:31 +0330 Subject: [PATCH 33/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/client/subscribe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index d380112..1c740e3 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -17,7 +17,7 @@ def update_brokers_subscription_plan(): t[sub[0]] = sub[1] response = requests.post( - f"http://{broker_url}/subscribers", + f"{broker_url[2:]}/subscribers", data=json.dumps({"subscribers": data}), timeout=2, ) @@ -53,7 +53,7 @@ def check_heartbeat(): data=json.dumps({"client_url": key}), timeout=2, ) - update_brokers_subscription_plan() + update_brokers_subscription_plan() except Exception as e: print(str(e)) From 213d9b5be845713e2c1ee98d48564750764a66bb Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:23:36 +0330 Subject: [PATCH 34/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 958af1f..e2ea3c0 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -63,6 +63,7 @@ def subscribe(): tmp_subscriptions = {} all_subscribers_length = len(all_subscribers) all_brokers_length = len(all_brokers) + print("######all subscribers\n", all_subscribers) ex = all_subscribers_length // all_brokers_length print("###########ex ", ex) j = 0 From 039b2ed6f0983794a43b4553cebd2382d57d4bdc Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:28:12 +0330 Subject: [PATCH 35/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index e2ea3c0..cc2223f 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -55,10 +55,11 @@ def subscribe(): if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code - all_subscribers = [[client_addr, random_id]] + all_subscribers = set() + all_subscribers.add([client_addr, random_id]) for broker_id_url in all_subscriptions.keys(): for sub in all_subscriptions[broker_id_url]: - all_subscribers.append(sub) + all_subscribers.add(sub) tmp_subscriptions = {} all_subscribers_length = len(all_subscribers) From 5718929b10c4fa300f12d38539255ff67f864e69 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:29:04 +0330 Subject: [PATCH 36/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index cc2223f..1a067eb 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -61,6 +61,7 @@ def subscribe(): for sub in all_subscriptions[broker_id_url]: all_subscribers.add(sub) + all_subscribers = list(all_subscribers) tmp_subscriptions = {} all_subscribers_length = len(all_subscribers) all_brokers_length = len(all_brokers) From 8cf7abf662fbc195d174ea7b994967ef2b5606fa Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:32:49 +0330 Subject: [PATCH 37/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 1a067eb..69510ed 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -55,11 +55,13 @@ def subscribe(): if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code - all_subscribers = set() - all_subscribers.add([client_addr, random_id]) + all_subscribers = [[client_addr, random_id]] for broker_id_url in all_subscriptions.keys(): for sub in all_subscriptions[broker_id_url]: - all_subscribers.add(sub) + all_subscribers.append(sub) + + unique_tuples = set(tuple(x) for x in all_subscribers) + all_subscribers = [list(x) for x in unique_tuples] all_subscribers = list(all_subscribers) tmp_subscriptions = {} From f2a52fe9cf38246e6d4353658a780d2af977b544 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:39:20 +0330 Subject: [PATCH 38/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 69510ed..941990f 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -60,6 +60,7 @@ def subscribe(): for sub in all_subscriptions[broker_id_url]: all_subscribers.append(sub) + print("######all subscriber before\n", all_subscribers) unique_tuples = set(tuple(x) for x in all_subscribers) all_subscribers = [list(x) for x in unique_tuples] From 8a0b3bc0c804ea9ab2d4d92fc56b389de1ec7ae3 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:43:54 +0330 Subject: [PATCH 39/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 941990f..2f157c9 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -48,6 +48,7 @@ def subscribe(): random_id = random.randint(1, 1000000) response_code, all_subscriptions = broker_subscribe_service.get_all_subscriptions() + print("################allllll subscriptions\n", all_subscriptions) if response_code != 200: return jsonify("Error during getting list of brokers from database"), response_code From c1a41c44adf7a0c080797ed720c8daf99e6782ab Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:47:32 +0330 Subject: [PATCH 40/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 2f157c9..390e0bd 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -106,6 +106,7 @@ def subscribe(): print(t) broker_subscribe_service.send_subscribe_to_broker(all_brokers[broker_id], t) + print("@@@@@@@@tmp_subscriptions to write\n", tmp_subscriptions) response_code = broker_subscribe_service.write_subscriptions(tmp_subscriptions) if response_code != 200: return jsonify("Error during finding broker for subscribe"), response_code From 550f69fc9f6979e96d8a5475e6087c881d8131a5 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:51:17 +0330 Subject: [PATCH 41/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 390e0bd..af10274 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -89,6 +89,7 @@ def subscribe(): if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j]) + j += 1 print("####aghaei2\n", tmp_subscriptions) From 9028e6445eb1076b95632456698e01afab636eed Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 05:51:39 +0330 Subject: [PATCH 42/91] fix bug of subscribe for broker --- kafka_server/coordinator/api/client/api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index af10274..e8c93f6 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -88,8 +88,7 @@ def subscribe(): for broker_id in all_brokers: if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] - tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j]) - j += 1 + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[index]) print("####aghaei2\n", tmp_subscriptions) From a2bb95a7eee10d9f25f3ab76760f2c8068d4bb69 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 06:02:20 +0330 Subject: [PATCH 43/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/client/subscribe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index 1c740e3..9e57932 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -18,7 +18,7 @@ def update_brokers_subscription_plan(): response = requests.post( f"{broker_url[2:]}/subscribers", - data=json.dumps({"subscribers": data}), + data=json.dumps({"subscribers": t}), timeout=2, ) if response.status_code != 200: From e1d03bb0ea38a7e63b33ddcc01f6e4fde623fcea Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 06:11:58 +0330 Subject: [PATCH 44/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 3d7b01a..93351f7 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -28,6 +28,7 @@ def send_subscribe_to_broker(broker_url, data): def update_brokers_subscriptions(): + print("######update brokers subscriptions") response_code, all_brokers = broker_database.list_all_brokers() if response_code != 200: raise Exception("Error during getting list of brokers from database") @@ -37,12 +38,15 @@ def update_brokers_subscriptions(): raise Exception("Error during getting list of clients from database") try: + print("########all brokers\n", all_brokers) for client_url in all_clients: - requests.post( - f"{client_url}/subscription", + r = requests.post( + f"{client_url}/update-brokers", data=json.dumps({"brokers": all_brokers}), timeout=2, ) + if r.status_code != 200: + print("ey vayyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy") for broker_id in all_brokers.keys(): requests.post(f"{all_brokers[broker_id]}/subscription", data=json.dumps({"brokers": all_brokers}), From 38be8c01a397abae5891ebcd7b39b4b46984a5c3 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 07:01:52 +0330 Subject: [PATCH 45/91] fix bug of subscribe for broker --- .../coordinator/services/broker/subscribe.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 93351f7..9e2880b 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -117,6 +117,67 @@ def update_brokers_list(broker_url): update_brokers_subscriptions() +def update_subscribers(): + print("#########3updating subscribers after broker down") + response_code, all_subscriptions = get_all_subscriptions() + print("################allllll subscriptions\n", all_subscriptions) + if response_code != 200: + raise Exception("Error during getting list of brokers from database") + + response_code, all_brokers = broker_database.list_all_brokers() + if response_code != 200: + raise Exception("Error during getting list of brokers from database") + + all_subscribers = [] + for broker_id_url in all_subscriptions.keys(): + for sub in all_subscriptions[broker_id_url]: + all_subscribers.append(sub) + + print("######all subscriber before\n", all_subscribers) + unique_tuples = set(tuple(x) for x in all_subscribers) + all_subscribers = [list(x) for x in unique_tuples] + + all_subscribers = list(all_subscribers) + tmp_subscriptions = {} + all_subscribers_length = len(all_subscribers) + all_brokers_length = len(all_brokers) + print("######all subscribers\n", all_subscribers) + ex = all_subscribers_length // all_brokers_length + print("###########ex ", ex) + j = 0 + for broker_id in all_brokers: + i = ex + while i > 0: + if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j]) + j += 1 + i -= 1 + + print("####aghaei\n", tmp_subscriptions) + + for index in range(j, len(all_subscribers)): + for broker_id in all_brokers: + if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions: + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = [] + tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[index]) + + print("####aghaei2\n", tmp_subscriptions) + + for broker_id in all_brokers.keys(): + t = {} + for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]: + t[sub[1]] = sub[0] + print("##########senda subscriptions to broker") + print(t) + send_subscribe_to_broker(all_brokers[broker_id], t) + + print("@@@@@@@@tmp_subscriptions to write\n", tmp_subscriptions) + response_code = write_subscriptions(tmp_subscriptions) + if response_code != 200: + raise Exception("Error during finding broker for subscribe") + + def check_heartbeat(): try: response = requests.get('http://127.0.0.1:5001/broker/list_all_heartbeats', timeout=2) @@ -134,6 +195,7 @@ def check_heartbeat(): timeout=2, ) update_brokers_list(key) + update_subscribers() except Exception as e: print(str(e)) From ccce0dd7bdb38bf877946d4d73f6de1b17c5684e Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 07:10:14 +0330 Subject: [PATCH 46/91] fix bug of subscribe for broker --- kafka_server/coordinator/services/broker/subscribe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 9e2880b..946c5d2 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -75,6 +75,9 @@ def prepare_updating(all_brokers, down_broker_id, down_broker_url): # update all brokers update_brokers_subscriptions() + # update new subscription plans + update_subscribers() + # find replica of down broker down_broker_replica_url = all_brokers_replicas[down_broker_id] update_replica_partition_of_a_down_broker(down_broker_id, down_broker_replica_url) @@ -195,7 +198,6 @@ def check_heartbeat(): timeout=2, ) update_brokers_list(key) - update_subscribers() except Exception as e: print(str(e)) From 0d4ccb37311f8c3007d82689507617a7be54c830 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 08:55:38 +0330 Subject: [PATCH 47/91] fix bug of scale down --- kafka_server/coordinator/services/broker/subscribe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 946c5d2..90386e5 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -171,7 +171,7 @@ def update_subscribers(): t = {} for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]: t[sub[1]] = sub[0] - print("##########senda subscriptions to broker") + print("##########send subscriptions to broker") print(t) send_subscribe_to_broker(all_brokers[broker_id], t) From b3daba8676141ce3d7df564b5aeb443db487d2c2 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 09:00:04 +0330 Subject: [PATCH 48/91] fix bug of scale down --- kafka_server/coordinator/services/broker/subscribe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 90386e5..6fa202d 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -167,7 +167,9 @@ def update_subscribers(): print("####aghaei2\n", tmp_subscriptions) + print("$$$brokers\n", all_brokers) for broker_id in all_brokers.keys(): + print(broker_id) t = {} for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]: t[sub[1]] = sub[0] From 3c0303714da50fba6343d65b0a50ed6e5e955297 Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 09:04:23 +0330 Subject: [PATCH 49/91] fix bug of scale down --- kafka_server/coordinator/services/broker/subscribe.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 6fa202d..6502c28 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -171,8 +171,10 @@ def update_subscribers(): for broker_id in all_brokers.keys(): print(broker_id) t = {} - for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]: - t[sub[1]] = sub[0] + print(tmp_subscriptions) + if f"{broker_id}:{all_brokers[broker_id]}" in tmp_subscriptions.keys(): + for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]: + t[sub[1]] = sub[0] print("##########send subscriptions to broker") print(t) send_subscribe_to_broker(all_brokers[broker_id], t) From 59d7cd06e6080efe4a45ecfc707ddd6ec10c2c9e Mon Sep 17 00:00:00 2001 From: amkamir82 Date: Thu, 15 Feb 2024 09:09:43 +0330 Subject: [PATCH 50/91] fix bug of scale down --- kafka_server/coordinator/services/broker/subscribe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 6502c28..d2b81fb 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -117,7 +117,6 @@ def update_brokers_list(broker_url): if response.status_code != 200: print(f"Error during sending subscription to broker #{broker_url}") prepare_updating(all_brokers, down_broker_id, broker_url) - update_brokers_subscriptions() def update_subscribers(): From 7ef5997d2965aebdde0f0d6a1707e8e591d8a4d8 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 16:18:56 +0330 Subject: [PATCH 51/91] Complete cicd --- .github/workflows/ci.yml | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63bda7b..168c3d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,18 +120,26 @@ jobs: known_hosts: ${{ secrets.SSH_HOST }} - name: connect and pull run: | - ssh -i ~/.ssh/id_rsa ubuntu@37.152.176.203 "docker compose up -d broker-1 broker-2 broker-3 && exit" + ssh -i ~/.ssh/id_rsa ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit" - name: cleanup run: rm -rf ~/.ssh deploy-prod-coordinator: name: Deploy Coordinator to Production - runs-on: ubuntu-latest + runs-on: self-hosted needs: build-push-coordinator steps: - - name: Deploy to production - run: | - # Here you can add your deployment commands/scripts - # For example: - ssh user@your-production-server 'docker pull ghcr.io/your-github-username/your-repo-name:latest && docker-compose up -d' + - name: checkout repo + uses: actions/checkout@v2 + + - name: set up ssh keys + uses: shimataro/ssh-key-action@v2 + with: + key: ${{ secrets.SSH_PRIVATE_KEY }} + known_hosts: ${{ secrets.SSH_HOST }} + - name: connect and pull + run: | + ssh -i ~/.ssh/id_rsa ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit" + - name: cleanup + run: rm -rf ~/.ssh From 1df50b76a8730100c67cb259478dd8f08eac760b Mon Sep 17 00:00:00 2001 From: Mohammad Mahdi Gheidi <59335039+mahdigheidi@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:26:49 +0330 Subject: [PATCH 52/91] Update ci.yml --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 168c3d0..8d186e5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,6 +120,7 @@ jobs: known_hosts: ${{ secrets.SSH_HOST }} - name: connect and pull run: | + ls ssh -i ~/.ssh/id_rsa ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit" - name: cleanup run: rm -rf ~/.ssh @@ -140,6 +141,7 @@ jobs: known_hosts: ${{ secrets.SSH_HOST }} - name: connect and pull run: | + ls ssh -i ~/.ssh/id_rsa ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit" - name: cleanup run: rm -rf ~/.ssh From 59381d8e7a77a8f1a448e59687c70324f1e57496 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 16:32:39 +0330 Subject: [PATCH 53/91] debug --- .github/workflows/ci.yml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8d186e5..76b9508 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,6 +30,7 @@ jobs: uses: docker/build-push-action@v5 with: context: ./kafka_server/broker/ + platforms: linux/amd64,linux/arm64 push: true tags: mahdigheidi/sad-broker:latest @@ -57,6 +58,7 @@ jobs: uses: docker/build-push-action@v5 with: context: ./kafka_server/coordinator/ + platforms: linux/amd64,linux/arm64 push: true tags: mahdigheidi/sad-coordinator:latest @@ -120,10 +122,11 @@ jobs: known_hosts: ${{ secrets.SSH_HOST }} - name: connect and pull run: | + pwd ls - ssh -i ~/.ssh/id_rsa ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit" + ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit" - name: cleanup - run: rm -rf ~/.ssh + run: rm ~/.ssh/id_rsa deploy-prod-coordinator: name: Deploy Coordinator to Production @@ -142,6 +145,6 @@ jobs: - name: connect and pull run: | ls - ssh -i ~/.ssh/id_rsa ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit" + ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit" - name: cleanup - run: rm -rf ~/.ssh + run: rm ~/.ssh/id_rsa From e489a3771d49417b0b3d972c3737329279ffb230 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 16:48:34 +0330 Subject: [PATCH 54/91] Fix deploy stage pipeline --- .github/workflows/ci.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 76b9508..11173a5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,6 @@ jobs: uses: docker/build-push-action@v5 with: context: ./kafka_server/broker/ - platforms: linux/amd64,linux/arm64 push: true tags: mahdigheidi/sad-broker:latest @@ -122,8 +121,6 @@ jobs: known_hosts: ${{ secrets.SSH_HOST }} - name: connect and pull run: | - pwd - ls ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit" - name: cleanup run: rm ~/.ssh/id_rsa From b63ab32a760991475c6f42ecdeea5c82dccda4af Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 17:07:38 +0330 Subject: [PATCH 55/91] Fix static code issues --- .github/workflows/ci.yml | 2 +- kafka_server/broker/controller/coordinator.py | 4 ++-- kafka_server/broker/controller/produce.py | 3 ++- kafka_server/broker/file/hash.py | 2 +- kafka_server/coordinator/api/client/api.py | 6 +++--- kafka_server/coordinator/main.py | 3 ++- kafka_server/coordinator/services/broker/subscribe.py | 6 +++++- 7 files changed, 16 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 11173a5..d931664 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -102,7 +102,7 @@ jobs: - name: Run bandit run: | - bandit -r . + bandit --skip B104 -r . deploy-prod-broker: diff --git a/kafka_server/broker/controller/coordinator.py b/kafka_server/broker/controller/coordinator.py index b29691e..058e2d1 100644 --- a/kafka_server/broker/controller/coordinator.py +++ b/kafka_server/broker/controller/coordinator.py @@ -42,7 +42,7 @@ def _post(data, url: str): print(f"master coordinator {_master_coordinator_url()} is not alive") try: - response = requests.post(coordinator, json=data, data=json.dumps(data)) + response = requests.post(coordinator, json=data, data=json.dumps(data), timeout=2) return response.status_code == 200 except requests.RequestException as e: print(f"Error on post {e}") @@ -55,7 +55,7 @@ def _get(data, url: str): coordinator = _url(master_not_replica=master_alive, url=url) if not master_alive: print(f"master coordinator {_master_coordinator_url()} is not alive") - response = requests.get(coordinator, json=data, data=json.dumps(data)) + response = requests.get(coordinator, json=data, data=json.dumps(data), timeout=2) if response.status_code == 200: return json.loads(response.content.decode("utf-8")) except requests.RequestException as e: diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index ae24cb8..8008823 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -182,4 +182,5 @@ def ack(): crun.daemon = True crun.start() -app.run('0.0.0.0', port=5003, debug=True) +broker_listening_addr = '0.0.0.0' +app.run(broker_listening_addr, port=5003) diff --git a/kafka_server/broker/file/hash.py b/kafka_server/broker/file/hash.py index f9e5bf9..59fea6a 100644 --- a/kafka_server/broker/file/hash.py +++ b/kafka_server/broker/file/hash.py @@ -2,4 +2,4 @@ def hash_md5(key: str): - return hashlib.md5(key.encode()).hexdigest() + return hashlib.md5(key.encode(), usedforsecurity=False).hexdigest() diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index e8c93f6..340ac0b 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -1,7 +1,7 @@ import datetime import json import os -import random +from random import SystemRandom import sys from coordinator.services.broker import subscribe as broker_subscribe_service @@ -11,7 +11,7 @@ COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) - +cryptogen = SystemRandom() api_blueprint = Blueprint('api', __name__) @@ -45,7 +45,7 @@ def subscribe(): data = json.loads(request.data.decode('utf-8')) client_addr = f'http://{data["ip"]}:{data["port"]}' - random_id = random.randint(1, 1000000) + random_id = cryptogen.randint(1, 1000000) response_code, all_subscriptions = broker_subscribe_service.get_all_subscriptions() print("################allllll subscriptions\n", all_subscriptions) diff --git a/kafka_server/coordinator/main.py b/kafka_server/coordinator/main.py index 749496b..992a298 100644 --- a/kafka_server/coordinator/main.py +++ b/kafka_server/coordinator/main.py @@ -27,4 +27,5 @@ def main_route(): client_subscribe_service.run_check_heartbeat_job() broker_subscribe_service.run_check_heartbeat_job() - app.run(host='0.0.0.0', port=5000) + coordinator_listening_addr = '0.0.0.0' + app.run(coordinator_listening_addr, port=5000) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index d2b81fb..8257cb1 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -93,7 +93,11 @@ def update_replica_partition_of_a_broker_which_is_in_down_broker(broker_url): def update_replica_partition_of_a_down_broker(down_broker_id, down_broker_replica_url): print(f"###############sedning request to sync replica of down broker# {down_broker_id}:{down_broker_replica_url}") - r = requests.post(f"{down_broker_replica_url}/broker/down", data=json.dumps({"partition": down_broker_id})) + r = requests.post( + f"{down_broker_replica_url}/broker/down", + data=json.dumps({"partition": down_broker_id}), + timeout=2, + ) print(r.status_code) if r.status_code != 200: raise Exception("Error in sending request to broker which has the replica of a down broker") From b93cdda7a6afde05fb37d7f94452900500c14802 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 17:56:39 +0330 Subject: [PATCH 56/91] fix broker dockerfile --- kafka_server/broker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/broker/Dockerfile b/kafka_server/broker/Dockerfile index 09110e6..c79f33d 100644 --- a/kafka_server/broker/Dockerfile +++ b/kafka_server/broker/Dockerfile @@ -3,7 +3,7 @@ WORKDIR /app ARG BROKER_PROJECT_PATH="/app/" RUN echo "The ARG variable value is $BROKER_PROJECT_PATH" COPY requirements.txt /app -COPY . . +COPY kafka_server/broker/ . RUN pip install -r requirements.txt EXPOSE 5003 ENV FLASK_APP=app From dd3acfd75a09e645765e94467e2e213d4513bff2 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 18:32:59 +0330 Subject: [PATCH 57/91] Fix broker run --- .github/workflows/ci.yml | 4 ++-- kafka_server/broker/Dockerfile | 5 +++-- kafka_server/broker/controller/produce.py | 10 ++++++---- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d931664..078c01d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,10 +75,10 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 + pip install flake8 - name: Analysing the code with flake8 run: | - flake8 --ignore E501 kafka_server/ + flake8 --ignore=E401,E402,E501 kafka_server/ # - name: Test with pytest # run: | # pytest diff --git a/kafka_server/broker/Dockerfile b/kafka_server/broker/Dockerfile index c79f33d..7354b5a 100644 --- a/kafka_server/broker/Dockerfile +++ b/kafka_server/broker/Dockerfile @@ -3,8 +3,9 @@ WORKDIR /app ARG BROKER_PROJECT_PATH="/app/" RUN echo "The ARG variable value is $BROKER_PROJECT_PATH" COPY requirements.txt /app -COPY kafka_server/broker/ . +COPY . . RUN pip install -r requirements.txt EXPOSE 5003 ENV FLASK_APP=app -CMD ["python", "-u", "controller/produce.py"] \ No newline at end of file +CMD ["python", "controller/produce.py"] +# CMD ["python", "controller/produce.py"] \ No newline at end of file diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index 8008823..2e48e50 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -2,19 +2,21 @@ import os import sys import threading -from main import init + + +BROKER_PROJECT_PATH = os.getenv("BROKER_PROJECT_PATH", "/app/") +sys.path.append(os.path.abspath(BROKER_PROJECT_PATH)) + from file.indexer import Indexer from file.read import Read from file.write import Write +from main import init from flask import Flask, request, jsonify from manager.env import get_primary_partition, get_replica_url from metrics import coordinator_write_requests, coordinator_replicate_index_requests from prometheus_client import make_wsgi_app from werkzeug.middleware.dispatcher import DispatcherMiddleware -BROKER_PROJECT_PATH = os.getenv("BROKER_PROJECT_PATH", "/app/") -sys.path.append(os.path.abspath(BROKER_PROJECT_PATH)) - app = Flask(__name__) app.wsgi_app = DispatcherMiddleware(app.wsgi_app, { From a4ad460670a4707eefd9ab6701eef15f2e8d5bc5 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Wed, 14 Feb 2024 18:51:59 +0330 Subject: [PATCH 58/91] Fix import dependencies --- kafka_server/coordinator/api/broker/api.py | 10 +++++----- kafka_server/coordinator/api/client/api.py | 11 ++++++----- kafka_server/coordinator/main.py | 8 ++++---- kafka_server/coordinator/services/broker/subscribe.py | 10 ++++++++-- kafka_server/coordinator/services/client/subscribe.py | 7 ++++++- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/kafka_server/coordinator/api/broker/api.py b/kafka_server/coordinator/api/broker/api.py index e5b5be2..a0d1556 100644 --- a/kafka_server/coordinator/api/broker/api.py +++ b/kafka_server/coordinator/api/broker/api.py @@ -4,13 +4,13 @@ import sys import random -from coordinator.services.broker import subscribe as broker_subscriber_service -from coordinator.services.broker import database as broker_database -from coordinator import config -from flask import Blueprint, jsonify, request - COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) +from services.broker import subscribe as broker_subscriber_service +from services.broker import database as broker_database +import config +from flask import Blueprint, jsonify, request + api_blueprint = Blueprint('api', __name__) diff --git a/kafka_server/coordinator/api/client/api.py b/kafka_server/coordinator/api/client/api.py index 340ac0b..388c660 100644 --- a/kafka_server/coordinator/api/client/api.py +++ b/kafka_server/coordinator/api/client/api.py @@ -4,13 +4,14 @@ from random import SystemRandom import sys -from coordinator.services.broker import subscribe as broker_subscribe_service -from coordinator.services.client import database as client_database -from coordinator.services.broker import database as broker_database -from flask import Blueprint, request, jsonify - COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) + +from services.broker import subscribe as broker_subscribe_service +from services.client import database as client_database +from services.broker import database as broker_database +from flask import Blueprint, request, jsonify + cryptogen = SystemRandom() api_blueprint = Blueprint('api', __name__) diff --git a/kafka_server/coordinator/main.py b/kafka_server/coordinator/main.py index 992a298..0f2c993 100644 --- a/kafka_server/coordinator/main.py +++ b/kafka_server/coordinator/main.py @@ -1,16 +1,16 @@ import os import sys +COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") +sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) from flask import Flask, jsonify from api.client.api import api_blueprint as client_api from api.broker.api import api_blueprint as broker_api -from coordinator.services.client import subscribe as client_subscribe_service -from coordinator.services.broker import subscribe as broker_subscribe_service +from services.client import subscribe as client_subscribe_service +from services.broker import subscribe as broker_subscribe_service -COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") -sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) app = Flask(__name__) diff --git a/kafka_server/coordinator/services/broker/subscribe.py b/kafka_server/coordinator/services/broker/subscribe.py index 8257cb1..4d945e8 100644 --- a/kafka_server/coordinator/services/broker/subscribe.py +++ b/kafka_server/coordinator/services/broker/subscribe.py @@ -1,10 +1,16 @@ from datetime import datetime import json import threading +import os +import sys import requests -from coordinator.services.broker import database as broker_database -from coordinator.services.client import database as client_database +COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") +sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) + + +from services.broker import database as broker_database +from services.client import database as client_database def get_all_subscriptions(): diff --git a/kafka_server/coordinator/services/client/subscribe.py b/kafka_server/coordinator/services/client/subscribe.py index 9e57932..d101671 100644 --- a/kafka_server/coordinator/services/client/subscribe.py +++ b/kafka_server/coordinator/services/client/subscribe.py @@ -1,9 +1,14 @@ from datetime import datetime import json import threading +import os +import sys import requests -from coordinator.services.broker import subscribe as broker_subscribe_service +COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") +sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) + +from services.broker import subscribe as broker_subscribe_service def update_brokers_subscription_plan(): From 30ea29e0f1e9b95740500c6f65cd9232f93f3037 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 00:47:38 +0330 Subject: [PATCH 59/91] fix data path. --- kafka_server/broker/controller/produce.py | 4 ++-- kafka_server/broker/file/read.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index 2e48e50..8b7d773 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -100,7 +100,7 @@ def subscription(): data = json.loads(request.data.decode("utf-8")) brokers = data['brokers'] - brokers_file_path = os.path.join(os.getcwd(), '../data', 'subscriptions', 'brokers.json') + brokers_file_path = os.path.join(os.getcwd(), 'data', 'subscriptions', 'brokers.json') with open(brokers_file_path, "w") as file: json.dump(brokers, file) @@ -122,7 +122,7 @@ def subscribers(): data = json.loads(request.data.decode("utf-8")) brokers = data['subscribers'] - subscribers_file_path = os.path.join(os.getcwd(), '../data', 'subscriptions', 'subscribers.json') + subscribers_file_path = os.path.join(os.getcwd(), 'data', 'subscriptions', 'subscribers.json') with open(subscribers_file_path, "w+") as file: json.dump(brokers, file) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 75a9146..00e3234 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -125,7 +125,7 @@ def check_data_exist(self): def get_subscribers(): subscriptions_file_path = os.path.join( os.getcwd(), - '../data', + 'data', 'subscriptions', 'subscribers.json' ) From 5d366a112780799e5498cc7332d5e930aea47129 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 00:57:39 +0330 Subject: [PATCH 60/91] fix path. --- kafka_server/broker/main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka_server/broker/main.py b/kafka_server/broker/main.py index 92ba2c8..d899fff 100644 --- a/kafka_server/broker/main.py +++ b/kafka_server/broker/main.py @@ -38,10 +38,10 @@ def init_broker(): def init(): - os.makedirs(os.path.join(os.getcwd(), '../data'), exist_ok=True) - os.makedirs(os.path.join(os.getcwd(), '../data', 'subscriptions'), exist_ok=True) - os.makedirs(os.path.join(os.getcwd(), '../data', 'partition_data'), exist_ok=True) - os.makedirs(os.path.join(os.getcwd(), '../data', 'partition_index'), exist_ok=True) + os.makedirs(os.path.join(os.getcwd(), 'data'), exist_ok=True) + os.makedirs(os.path.join(os.getcwd(), 'data', 'subscriptions'), exist_ok=True) + os.makedirs(os.path.join(os.getcwd(), 'data', 'partition_data'), exist_ok=True) + os.makedirs(os.path.join(os.getcwd(), 'data', 'partition_index'), exist_ok=True) init_broker() read_thread = threading.Thread(target=schedule_read_thread) read_thread.daemon = True From 4e10250b757585d37acf8a724d78cf29877975d4 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 01:11:38 +0330 Subject: [PATCH 61/91] add partition_count to log. --- kafka_server/broker/file/write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/broker/file/write.py b/kafka_server/broker/file/write.py index 4c72e2c..33b1f53 100644 --- a/kafka_server/broker/file/write.py +++ b/kafka_server/broker/file/write.py @@ -33,7 +33,7 @@ def write_data(self, key: str, value: bytes): md5 = hash_md5(key) partition_count = get_partition_count() if int(md5, 16) % partition_count != int(self.partition) - 1: - raise Exception(f"key is not for this partition, fuck {md5} {key}") + raise Exception(f"key is not for this partition, fuck {md5} {key} {partition_count}") appended = self.segment.append(key, value) if not appended: From 7d454e5a0d7a33e159a0b114990b30fd6ab9a207 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 01:56:05 +0330 Subject: [PATCH 62/91] remove log. --- kafka_server/broker/controller/coordinator.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/kafka_server/broker/controller/coordinator.py b/kafka_server/broker/controller/coordinator.py index 058e2d1..293e92f 100644 --- a/kafka_server/broker/controller/coordinator.py +++ b/kafka_server/broker/controller/coordinator.py @@ -78,12 +78,7 @@ def heartbeat(): 'ip': os.getenv("IP"), 'port': os.getenv("PORT"), } - - if _post(payload, heartbeat_url): - print(f"Heartbeat {payload}") - else: - print("Heartbeat Failed") - + _post(payload, heartbeat_url) time.sleep(3) # 3 Seconds wait for another heartbeat From 6bb24673355d3c493c8021378bbe930399abdcac Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 02:09:48 +0330 Subject: [PATCH 63/91] fix partition count. --- kafka_server/broker/manager/env.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka_server/broker/manager/env.py b/kafka_server/broker/manager/env.py index 6e03b91..820ef2b 100644 --- a/kafka_server/broker/manager/env.py +++ b/kafka_server/broker/manager/env.py @@ -1,3 +1,4 @@ +import json import os @@ -21,7 +22,12 @@ def get_replica_coordinator() -> str: def get_partition_count() -> int: - return int(os.getenv('PARTITION_COUNT', 3)) + brokers_file_path = os.path.join(os.getcwd(), 'data', 'subscriptions', 'brokers.json') + if os.path.exists(brokers_file_path): + with open(brokers_file_path, 'r', encoding='utf8') as file: + brokers = json.load(file) + return len(brokers) + return 3 def is_replica_mirror_down() -> bool: From 022a1bf0157fb7f063780e79e3fd9f490d55cbd6 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 03:28:15 +0330 Subject: [PATCH 64/91] remove logs. --- kafka_server/broker/file/indexer.py | 3 --- kafka_server/broker/file/read.py | 1 - 2 files changed, 4 deletions(-) diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 8f4cd2d..8959131 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -17,9 +17,6 @@ def __new__(cls, partition: str, replica: str = None): cls._instances[partition] = super().__new__(cls) cls._instances[partition].partition = partition cls._instances[partition].replica = replica - cls._instances[partition]._write = 0 - cls._instances[partition]._read = 0 - cls._instances[partition]._sync = 0 cls._instances[partition].load() path = cls._instances[partition].__dir_path() os.makedirs(path, exist_ok=True) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 00e3234..04d29cb 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -165,7 +165,6 @@ def load_message_in_fly(self): if os.path.exists(message_file_path): with open(message_file_path, 'r', encoding='utf8') as f: data = json.load(f) - print(data) self.message_in_fly = data.get('message_in_fly', False) self.message_in_fly_since = datetime.fromisoformat( data.get('message_in_fly_since', datetime.now().isoformat())) From 12c54ccb0977d4f126b6a9bfe3d1c612052d7fee Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 04:42:19 +0330 Subject: [PATCH 65/91] add logs. --- kafka_server/broker/file/read.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 04d29cb..f8742aa 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -65,6 +65,7 @@ def read_data(self): md5 = hash_md5(key) partition_count = get_partition_count() if int(md5, 16) % partition_count != int(self.partition) - 1: + print("data for other partition", flush=True) self.segment.approve_reading() return self.read_data() @@ -130,6 +131,7 @@ def get_subscribers(): 'subscribers.json' ) if not os.path.exists(subscriptions_file_path): + print("No subscriptions file found") return [] with open(subscriptions_file_path, 'r', encoding='utf8') as f: From 3fbc14e31f3a1e8741e2e1ea77b36b383cca54dd Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 04:44:09 +0330 Subject: [PATCH 66/91] add logs. --- kafka_server/broker/file/read.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index f8742aa..4386212 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -29,12 +29,12 @@ def __new__(cls, *args, **kwargs): return cls._instance def __init__(self, partition: str, replica: str): + self.subscribers = None if not hasattr(self, 'initialized'): self.partition = partition self.message_in_fly = False self.message_in_fly_since = datetime.now() self.segment = Segment(partition, replica) - self.subscribers = self.get_subscribers() self.initialized = True self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) @@ -51,6 +51,7 @@ def toggle_message_in_fly(self): time.sleep(5) def read_data(self): + self.subscribers = self.get_subscribers() if len(self.subscribers) == 0: return None, None self.load_message_in_fly() @@ -137,8 +138,6 @@ def get_subscribers(): with open(subscriptions_file_path, 'r', encoding='utf8') as f: subscribers = json.load(f) - if len(subscribers) == 0: - raise Exception('No subscribers found') return subscribers def send_to_subscriber(self, key: str, value: str) -> bool: From 00355493d49f50794801fab2588f1df3bafd2799 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 04:44:47 +0330 Subject: [PATCH 67/91] add logs. --- kafka_server/broker/file/read.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 4386212..84b663a 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -53,6 +53,7 @@ def toggle_message_in_fly(self): def read_data(self): self.subscribers = self.get_subscribers() if len(self.subscribers) == 0: + print("No subscribers") return None, None self.load_message_in_fly() if self.message_in_fly: From 403705a55bbf595f12dd4d5e10e5b84e5bbb8820 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 04:50:11 +0330 Subject: [PATCH 68/91] use singleton on subscribers. --- kafka_server/broker/file/read.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 84b663a..77d9c3c 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -29,12 +29,12 @@ def __new__(cls, *args, **kwargs): return cls._instance def __init__(self, partition: str, replica: str): - self.subscribers = None if not hasattr(self, 'initialized'): self.partition = partition self.message_in_fly = False self.message_in_fly_since = datetime.now() self.segment = Segment(partition, replica) + self.subscribers = self.get_subscribers() self.initialized = True self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) @@ -51,7 +51,9 @@ def toggle_message_in_fly(self): time.sleep(5) def read_data(self): - self.subscribers = self.get_subscribers() + if len(self.subscribers) == 0: + self.subscribers = self.get_subscribers() + if len(self.subscribers) == 0: print("No subscribers") return None, None From 7097c84a5b969cb74935971cfffd0a1d08e8455a Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 04:59:31 +0330 Subject: [PATCH 69/91] add log on error. --- kafka_server/broker/file/read.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 77d9c3c..e782e22 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -150,6 +150,9 @@ def send_to_subscriber(self, key: str, value: str) -> bool: try: response = requests.post(url, json={'key': key, 'value': value}, timeout=2) + if response != 200: + print(response.json(), response.content, response.status_code) + return response.status_code == 200 except Exception as e: print(e) From b255056caa9e1fe2e123a06c237b271c06564f7e Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 04:59:45 +0330 Subject: [PATCH 70/91] add log on error. --- kafka_server/broker/file/read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index e782e22..f4e8d96 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -150,7 +150,7 @@ def send_to_subscriber(self, key: str, value: str) -> bool: try: response = requests.post(url, json={'key': key, 'value': value}, timeout=2) - if response != 200: + if response.status_code != 200: print(response.json(), response.content, response.status_code) return response.status_code == 200 From cdb401f5f46faf752d47b92845c1f37f3f8759ca Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 05:59:58 +0330 Subject: [PATCH 71/91] add log on error. --- kafka_server/broker/file/read.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index f4e8d96..b0eb161 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -140,6 +140,7 @@ def get_subscribers(): with open(subscriptions_file_path, 'r', encoding='utf8') as f: subscribers = json.load(f) + print("Found {} subscribers".format(subscribers)) return subscribers @@ -164,7 +165,10 @@ def choose_subscriber(self): id_to_key = {} for i, key in enumerate(self.subscribers.keys()): id_to_key[i] = key + + print("id_to_key", id_to_key) chosen_key = id_to_key[read_index % subscriber_count] + print("chosen_key", chosen_key) return chosen_key, self.subscribers[chosen_key] def load_message_in_fly(self): From f9ad75d084e3acb9945e40f0cbb0c1c15502d3e7 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 06:02:43 +0330 Subject: [PATCH 72/91] get subscriptions on each req. --- kafka_server/broker/file/read.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index b0eb161..4bec853 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -29,12 +29,12 @@ def __new__(cls, *args, **kwargs): return cls._instance def __init__(self, partition: str, replica: str): + self.subscribers = None if not hasattr(self, 'initialized'): self.partition = partition self.message_in_fly = False self.message_in_fly_since = datetime.now() self.segment = Segment(partition, replica) - self.subscribers = self.get_subscribers() self.initialized = True self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) @@ -51,9 +51,7 @@ def toggle_message_in_fly(self): time.sleep(5) def read_data(self): - if len(self.subscribers) == 0: - self.subscribers = self.get_subscribers() - + self.subscribers = self.get_subscribers() if len(self.subscribers) == 0: print("No subscribers") return None, None @@ -166,9 +164,7 @@ def choose_subscriber(self): for i, key in enumerate(self.subscribers.keys()): id_to_key[i] = key - print("id_to_key", id_to_key) chosen_key = id_to_key[read_index % subscriber_count] - print("chosen_key", chosen_key) return chosen_key, self.subscribers[chosen_key] def load_message_in_fly(self): From 173512f34720aacde646078a5f5fda633a8546c4 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 06:28:34 +0330 Subject: [PATCH 73/91] change write style --- kafka_server/broker/file/segment.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 192bb99..6e3fdeb 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -34,11 +34,11 @@ def append(self, key: str, value: bytes): data_file_path = os.path.join(segment_path, f'{self.indexer.get_write()}.dat') kb = 1024 - with open(data_file_path, 'wb+') as entry_file: - entry_file.write(f'{key}: '.encode('utf-8')) + with open(data_file_path, 'w') as entry_file: + entry_file.write(f'{key}: ') for i in range(0, len(value), kb): chunk = value[i:i + kb] - entry_file.write(chunk) + entry_file.write(chunk.decode('utf-8')) except Exception as e: print(f"Error appending data to segment: {e}") return False @@ -81,10 +81,10 @@ def read(self): if os.path.exists(data_file_path): # TODO: get lock try: - with open(data_file_path, 'rb') as entry_file: + with open(data_file_path, 'r') as entry_file: data = entry_file.read() - key, value = data.decode('utf-8').split(': ', 1) + key, value = data.split(': ', 1) return key, value except Exception as e: print(f"Error reading file {data_file_path}: {e}") From 2dd9e8e49a8eaa2e1f07eb2e96abd78984290011 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 06:37:02 +0330 Subject: [PATCH 74/91] change to string. --- kafka_server/broker/controller/produce.py | 2 +- kafka_server/broker/file/segment.py | 8 ++------ kafka_server/broker/file/write.py | 7 +++---- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index 8b7d773..46f9ebd 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -36,7 +36,7 @@ def write(): # Assuming the request body is in JSON format with 'key' and 'value' fields data = request.get_json() key = data.get('key') - value = data.get('value').encode('utf-8') + value = data.get('value') print(key, value) write_instance = Write(get_primary_partition(), get_replica_url()) diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 6e3fdeb..0e831ae 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -24,7 +24,7 @@ def __new__(cls, partition: str, replica: str): cls._instances[partition].indexer = Indexer(partition, replica) return cls._instances[partition] - def append(self, key: str, value: bytes): + def append(self, key: str, value: str): try: with self._append_lock: segment_path = self.write_segment_path() @@ -32,13 +32,9 @@ def append(self, key: str, value: bytes): os.makedirs(segment_path, exist_ok=True) data_file_path = os.path.join(segment_path, f'{self.indexer.get_write()}.dat') - kb = 1024 with open(data_file_path, 'w') as entry_file: - entry_file.write(f'{key}: ') - for i in range(0, len(value), kb): - chunk = value[i:i + kb] - entry_file.write(chunk.decode('utf-8')) + entry_file.write(f'{key}: {value}') except Exception as e: print(f"Error appending data to segment: {e}") return False diff --git a/kafka_server/broker/file/write.py b/kafka_server/broker/file/write.py index 33b1f53..e3860ed 100644 --- a/kafka_server/broker/file/write.py +++ b/kafka_server/broker/file/write.py @@ -28,7 +28,7 @@ def __init__(self, partition: str, replica: str): self.segment = Segment(partition, replica) self.replica = replica - def write_data(self, key: str, value: bytes): + def write_data(self, key: str, value: str): with self._write_lock: md5 = hash_md5(key) partition_count = get_partition_count() @@ -49,12 +49,11 @@ def replicate_data(self, key: str, value: bytes): if self.segment.append(key, value): return self.segment.approve_appending() - def send_to_replica(self, key: str, value: bytes) -> bool: + def send_to_replica(self, key: str, value: str) -> bool: url = f'{self.replica}/replica/data' - value_str = value.decode('utf-8') response = requests.post( url, - json={'key': key, 'value': value_str, 'partition': self.partition}, + json={'key': key, 'value': value, 'partition': self.partition}, timeout=2, ) From 662bd9d215aefea69a1f7bd0880b4d0318c802c0 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 06:53:11 +0330 Subject: [PATCH 75/91] sync replica up. --- kafka_server/broker/main.py | 4 ++++ kafka_server/broker/read/read_cronjob.py | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/kafka_server/broker/main.py b/kafka_server/broker/main.py index d899fff..325783e 100644 --- a/kafka_server/broker/main.py +++ b/kafka_server/broker/main.py @@ -55,4 +55,8 @@ def init(): # sync_thread.daemon = True # sync_thread.start() + sync_replica_thread = threading.Thread(target=sync_replica) + sync_replica_thread.daemon = True + sync_replica_thread.start() + schedule.run_pending() diff --git a/kafka_server/broker/read/read_cronjob.py b/kafka_server/broker/read/read_cronjob.py index 7a201fb..6347408 100644 --- a/kafka_server/broker/read/read_cronjob.py +++ b/kafka_server/broker/read/read_cronjob.py @@ -14,8 +14,9 @@ def read_sample_data(): with fetch_lock: read_instance = Read(get_primary_partition(), get_replica_url()) - print("reading sample data") - print(read_instance.read_data()) + # print("reading sample data") + # print(read_instance.read_data()) + read_instance.read_data() def schedule_read(): From a578bbcf6503a36db5d091b54458820fae1867fc Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 06:54:39 +0330 Subject: [PATCH 76/91] sync replica up. --- kafka_server/broker/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka_server/broker/main.py b/kafka_server/broker/main.py index 325783e..465926f 100644 --- a/kafka_server/broker/main.py +++ b/kafka_server/broker/main.py @@ -51,9 +51,9 @@ def init(): heartbeat_thread.daemon = True heartbeat_thread.start() - # sync_thread = threading.Thread(target=schedule_sync_thread) - # sync_thread.daemon = True - # sync_thread.start() + sync_thread = threading.Thread(target=schedule_sync_thread) + sync_thread.daemon = True + sync_thread.start() sync_replica_thread = threading.Thread(target=sync_replica) sync_replica_thread.daemon = True From 334b01fc29418572a619ab4f6216afae73b25400 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 07:12:44 +0330 Subject: [PATCH 77/91] log. --- kafka_server/broker/file/indexer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 8959131..1867e80 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -103,6 +103,7 @@ def __dir_path(self) -> str: def send_to_replica(self): if self.replica is None: + print("No replica found /n/n/n") return url = f'{self.replica}/replica/index' data = {'partition': self.partition, 'read': self._read, 'sync': self._sync} From 554c6643fa1a50951bf4ea5229b770e0b739104d Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 07:19:15 +0330 Subject: [PATCH 78/91] log. --- kafka_server/broker/file/indexer.py | 6 +++++- kafka_server/broker/file/segment.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 1867e80..9147b36 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -40,7 +40,9 @@ def get_write(self): def inc_read(self): with self._read_lock: self._read += 1 + print("save read index\n\n") self._save_variable(self._read, 'read') + print("send read index\n\n") self.send_to_replica() def get_read(self): @@ -102,11 +104,13 @@ def __dir_path(self) -> str: ) def send_to_replica(self): + print("sending to replica started!!!\n\n\n\n") if self.replica is None: print("No replica found /n/n/n") return + url = f'{self.replica}/replica/index' data = {'partition': self.partition, 'read': self._read, 'sync': self._sync} - response = requests.post(url, json=data, timeout=2) + response = requests.post(url, json=data) if response.status_code != 200: raise Exception(f'indexed not yet updated {response}') diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 0e831ae..eeb6b37 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -54,6 +54,7 @@ def approve_appending(self): def approve_reading(self): try: with self._read_lock: + print("approve_reading \n\n") self.indexer.inc_read() except Exception as e: print(f"Error inc read index: {e}") From 306427b18c0f0cf06c13d59c5b2f3ce846980c1c Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 07:44:41 +0330 Subject: [PATCH 79/91] fix none replica. --- kafka_server/broker/file/read.py | 23 +++++++++++------------ kafka_server/broker/file/segment.py | 3 ++- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 4bec853..fbc87fc 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -21,25 +21,24 @@ class Read: _read_lock = threading.Lock() _instance = None - def __new__(cls, *args, **kwargs): - if cls._instance is None: + def __new__(cls, partition: str, replica: str): + if cls._instance is None or (cls._instance.replica is None and replica is not None): with cls._instances_lock: if cls._instance is None: cls._instance = super().__new__(cls) + cls._instance.replica = replica return cls._instance def __init__(self, partition: str, replica: str): self.subscribers = None - if not hasattr(self, 'initialized'): - self.partition = partition - self.message_in_fly = False - self.message_in_fly_since = datetime.now() - self.segment = Segment(partition, replica) - self.initialized = True - - self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) - self.toggle_thread.daemon = True - self.toggle_thread.start() + self.partition = partition + self.message_in_fly = False + self.message_in_fly_since = datetime.now() + self.segment = Segment(partition, replica) + + self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) + self.toggle_thread.daemon = True + self.toggle_thread.start() def toggle_message_in_fly(self): while True: diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index eeb6b37..1b1bec9 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -18,10 +18,11 @@ class Segment: def __new__(cls, partition: str, replica: str): with cls._instances_lock: - if partition not in cls._instances: + if partition not in cls._instances or (cls._instances[partition].replica is None and replica is not None): cls._instances[partition] = super(Segment, cls).__new__(cls) cls._instances[partition].partition = partition cls._instances[partition].indexer = Indexer(partition, replica) + cls._instances[partition].replica = replica return cls._instances[partition] def append(self, key: str, value: str): From f60622fd9e34d5257edac99e8e5c4af2c8674739 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 07:48:20 +0330 Subject: [PATCH 80/91] fix none replica. --- kafka_server/broker/file/read.py | 4 +++- kafka_server/broker/file/segment.py | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index fbc87fc..603369d 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -22,11 +22,13 @@ class Read: _instance = None def __new__(cls, partition: str, replica: str): - if cls._instance is None or (cls._instance.replica is None and replica is not None): + if cls._instance is None: with cls._instances_lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.replica = replica + if replica is not None and cls._instance.replica is None: + cls._instance.segment = Segment(partition, replica) return cls._instance def __init__(self, partition: str, replica: str): diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 1b1bec9..65ceb2f 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -18,11 +18,15 @@ class Segment: def __new__(cls, partition: str, replica: str): with cls._instances_lock: - if partition not in cls._instances or (cls._instances[partition].replica is None and replica is not None): + if partition not in cls._instances: cls._instances[partition] = super(Segment, cls).__new__(cls) cls._instances[partition].partition = partition cls._instances[partition].indexer = Indexer(partition, replica) cls._instances[partition].replica = replica + + if replica is not None and cls._instances[partition].replica is None: + cls._instances[partition].indexer = Indexer(partition, replica) + return cls._instances[partition] def append(self, key: str, value: str): From 9d31c7d9c767bef06db60935ff14cfce923bad92 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 07:51:24 +0330 Subject: [PATCH 81/91] fix none replica. --- kafka_server/broker/file/read.py | 13 +++++-------- kafka_server/broker/file/segment.py | 14 +++++--------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 603369d..e4bfde4 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -19,17 +19,14 @@ class Read: _instances_lock = threading.Lock() _read_lock = threading.Lock() - _instance = None + _instances = {} def __new__(cls, partition: str, replica: str): - if cls._instance is None: + if f"{partition}-{replica}" not in cls._instances[f"{partition}-{replica}"]: with cls._instances_lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance.replica = replica - if replica is not None and cls._instance.replica is None: - cls._instance.segment = Segment(partition, replica) - return cls._instance + cls._instances[f"{partition}-{replica}"] = super().__new__(cls) + + return cls._instances[f"{partition}-{replica}"] def __init__(self, partition: str, replica: str): self.subscribers = None diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 65ceb2f..075e2cd 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -18,16 +18,12 @@ class Segment: def __new__(cls, partition: str, replica: str): with cls._instances_lock: - if partition not in cls._instances: - cls._instances[partition] = super(Segment, cls).__new__(cls) - cls._instances[partition].partition = partition - cls._instances[partition].indexer = Indexer(partition, replica) - cls._instances[partition].replica = replica + if f"{partition}-{replica}" not in cls._instances: + cls._instances[f"{partition}-{replica}"] = super(Segment, cls).__new__(cls) + cls._instances[f"{partition}-{replica}"].partition = partition + cls._instances[f"{partition}-{replica}"].indexer = Indexer(partition, replica) - if replica is not None and cls._instances[partition].replica is None: - cls._instances[partition].indexer = Indexer(partition, replica) - - return cls._instances[partition] + return cls._instances[f"{partition}-{replica}"] def append(self, key: str, value: str): try: From 7009c1abeda8c6004100694f181555fe80999627 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 07:52:14 +0330 Subject: [PATCH 82/91] fix none replica. --- kafka_server/broker/file/indexer.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 9147b36..69dcaaa 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -13,14 +13,15 @@ class Indexer: def __new__(cls, partition: str, replica: str = None): with cls._lock: - if partition not in cls._instances: - cls._instances[partition] = super().__new__(cls) - cls._instances[partition].partition = partition - cls._instances[partition].replica = replica - cls._instances[partition].load() - path = cls._instances[partition].__dir_path() + if f"{partition}-{replica}" not in cls._instances: + cls._instances[f"{partition}-{replica}"] = super().__new__(cls) + cls._instances[f"{partition}-{replica}"].partition = partition + cls._instances[f"{partition}-{replica}"].replica = replica + cls._instances[f"{partition}-{replica}"].load() + path = cls._instances[f"{partition}-{replica}"].__dir_path() os.makedirs(path, exist_ok=True) - return cls._instances[partition] + + return cls._instances[f"{partition}-{replica}"] def load(self): self._write = self._load_variable('write') From 8c9a0932ccf3d8caee991e5c0089dd3349a6f430 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 08:00:13 +0330 Subject: [PATCH 83/91] some logs. --- kafka_server/broker/file/read.py | 2 ++ kafka_server/broker/file/segment.py | 1 + 2 files changed, 3 insertions(+) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index e4bfde4..15515cc 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -86,10 +86,12 @@ def pull_data(self): if self.message_in_fly: print("there is message in fly") return None, None + print("there is no message in fly", flush=True) if not self.check_data_exist(): return None, None with self._read_lock: + print("start pulling data", flush=True) key, value = self.segment.read() md5 = hash_md5(key) diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 075e2cd..ffb0987 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -73,6 +73,7 @@ def approve_sync(self): def read(self): read_index = self.indexer.get_read() + print("read_index", read_index) segment_path = self.read_segment_path() data_file_path = os.path.join(segment_path, f'{read_index}.dat') From 8dce07b194fcea680aed49fa03e0e174eafafa36 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 08:01:59 +0330 Subject: [PATCH 84/91] fix error. --- kafka_server/broker/file/read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 15515cc..18b8ab7 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -22,7 +22,7 @@ class Read: _instances = {} def __new__(cls, partition: str, replica: str): - if f"{partition}-{replica}" not in cls._instances[f"{partition}-{replica}"]: + if f"{partition}-{replica}" not in cls._instances: with cls._instances_lock: cls._instances[f"{partition}-{replica}"] = super().__new__(cls) From 0de02cae68a6a7b038f19372b808999403fb4f9c Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 08:14:36 +0330 Subject: [PATCH 85/91] logs. --- kafka_server/broker/controller/produce.py | 1 + kafka_server/broker/file/indexer.py | 4 +--- kafka_server/broker/file/read.py | 21 ++++++++++++--------- kafka_server/broker/file/segment.py | 1 - 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index 46f9ebd..e470b06 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -158,6 +158,7 @@ def replica_down(): @app.route('/pull', methods=['GET']) def pull(): try: + print("replica url", get_replica_url()) read = Read(get_primary_partition(), get_replica_url()) key, value = read.pull_data() return jsonify({'key': key, 'value': value}), 200 diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 69dcaaa..7476ff9 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -41,9 +41,7 @@ def get_write(self): def inc_read(self): with self._read_lock: self._read += 1 - print("save read index\n\n") self._save_variable(self._read, 'read') - print("send read index\n\n") self.send_to_replica() def get_read(self): @@ -105,13 +103,13 @@ def __dir_path(self) -> str: ) def send_to_replica(self): - print("sending to replica started!!!\n\n\n\n") if self.replica is None: print("No replica found /n/n/n") return url = f'{self.replica}/replica/index' data = {'partition': self.partition, 'read': self._read, 'sync': self._sync} + print(data, "to Replica") response = requests.post(url, json=data) if response.status_code != 200: raise Exception(f'indexed not yet updated {response}') diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 18b8ab7..0227972 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -29,15 +29,18 @@ def __new__(cls, partition: str, replica: str): return cls._instances[f"{partition}-{replica}"] def __init__(self, partition: str, replica: str): - self.subscribers = None - self.partition = partition - self.message_in_fly = False - self.message_in_fly_since = datetime.now() - self.segment = Segment(partition, replica) + if not hasattr(self, 'initialized'): + self.subscribers = None + self.partition = partition + self.message_in_fly = False + self.message_in_fly_since = datetime.now() + self.segment = Segment(partition, replica) + + self.initialized = True - self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) - self.toggle_thread.daemon = True - self.toggle_thread.start() + self.toggle_thread = threading.Thread(target=self.toggle_message_in_fly) + self.toggle_thread.daemon = True + self.toggle_thread.start() def toggle_message_in_fly(self): while True: @@ -107,9 +110,9 @@ def pull_data(self): def ack_message(self): if self.message_in_fly: with self._read_lock: - self.segment.approve_reading() self.message_in_fly = False self.save_message_in_fly() + self.segment.approve_reading() def check_data_exist(self): if self.segment.get_read_index() >= self.segment.get_write_index(): diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index ffb0987..2a59461 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -55,7 +55,6 @@ def approve_appending(self): def approve_reading(self): try: with self._read_lock: - print("approve_reading \n\n") self.indexer.inc_read() except Exception as e: print(f"Error inc read index: {e}") From c4fc6310dfa98a3e3cd1659e030e9906daebbebf Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 08:26:12 +0330 Subject: [PATCH 86/91] write without replica. --- kafka_server/broker/controller/produce.py | 1 - kafka_server/broker/file/segment.py | 1 - kafka_server/broker/file/write.py | 10 +++++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka_server/broker/controller/produce.py b/kafka_server/broker/controller/produce.py index e470b06..46f9ebd 100644 --- a/kafka_server/broker/controller/produce.py +++ b/kafka_server/broker/controller/produce.py @@ -158,7 +158,6 @@ def replica_down(): @app.route('/pull', methods=['GET']) def pull(): try: - print("replica url", get_replica_url()) read = Read(get_primary_partition(), get_replica_url()) key, value = read.pull_data() return jsonify({'key': key, 'value': value}), 200 diff --git a/kafka_server/broker/file/segment.py b/kafka_server/broker/file/segment.py index 2a59461..09f6a04 100644 --- a/kafka_server/broker/file/segment.py +++ b/kafka_server/broker/file/segment.py @@ -72,7 +72,6 @@ def approve_sync(self): def read(self): read_index = self.indexer.get_read() - print("read_index", read_index) segment_path = self.read_segment_path() data_file_path = os.path.join(segment_path, f'{read_index}.dat') diff --git a/kafka_server/broker/file/write.py b/kafka_server/broker/file/write.py index e3860ed..b5572e1 100644 --- a/kafka_server/broker/file/write.py +++ b/kafka_server/broker/file/write.py @@ -15,9 +15,9 @@ class Write: _write_lock = threading.Lock() def __new__(cls, partition: str, replica: str = None): - if partition not in cls._instances: - cls._instances[partition] = super(Write, cls).__new__(cls) - return cls._instances[partition] + if f"{partition}-{replica}" not in cls._instances: + cls._instances[f"{partition}-{replica}"] = super(Write, cls).__new__(cls) + return cls._instances[f"{partition}-{replica}"] def __init__(self, partition: str, replica: str): if hasattr(self, 'initialized'): @@ -50,6 +50,10 @@ def replicate_data(self, key: str, value: bytes): return self.segment.approve_appending() def send_to_replica(self, key: str, value: str) -> bool: + if self.replica is None: + print("replica is None, skip it") + return True + url = f'{self.replica}/replica/data' response = requests.post( url, From e76ebedb10cebf1465f34c24d74b4f0abafd3070 Mon Sep 17 00:00:00 2001 From: hoseinaghaei Date: Thu, 15 Feb 2024 08:27:49 +0330 Subject: [PATCH 87/91] fix. --- kafka_server/broker/file/read.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka_server/broker/file/read.py b/kafka_server/broker/file/read.py index 0227972..8cb9717 100644 --- a/kafka_server/broker/file/read.py +++ b/kafka_server/broker/file/read.py @@ -11,7 +11,6 @@ from file.segment import Segment from manager.env import get_partition_count - BROKER_PROJECT_PATH = os.getenv("BROKER_PROJECT_PATH", "/app/") sys.path.append(os.path.abspath(BROKER_PROJECT_PATH)) @@ -116,8 +115,7 @@ def ack_message(self): def check_data_exist(self): if self.segment.get_read_index() >= self.segment.get_write_index(): - print(f"No key found {self.segment.get_read_index()} " - f"in {self.segment.get_write_index()}") + print(f"No key found {self.segment.get_read_index()} in {self.segment.get_write_index()}") return False key, _ = self.segment.read() From 59aedac1740dc7393bf083fe5f718c666ee8cc84 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Thu, 15 Feb 2024 12:38:42 +0330 Subject: [PATCH 88/91] add timeout to request and fix static analysis --- kafka_server/broker/file/indexer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_server/broker/file/indexer.py b/kafka_server/broker/file/indexer.py index 7476ff9..5b50d60 100644 --- a/kafka_server/broker/file/indexer.py +++ b/kafka_server/broker/file/indexer.py @@ -110,6 +110,6 @@ def send_to_replica(self): url = f'{self.replica}/replica/index' data = {'partition': self.partition, 'read': self._read, 'sync': self._sync} print(data, "to Replica") - response = requests.post(url, json=data) + response = requests.post(url, json=data, timeout=2) if response.status_code != 200: raise Exception(f'indexed not yet updated {response}') From 1b2bf70fd12396d63862815a455836f469e0ca38 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Thu, 15 Feb 2024 12:44:26 +0330 Subject: [PATCH 89/91] fix cicd --- .github/workflows/ci.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 078c01d..462b1e0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,8 +122,6 @@ jobs: - name: connect and pull run: | ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit" - - name: cleanup - run: rm ~/.ssh/id_rsa deploy-prod-coordinator: name: Deploy Coordinator to Production @@ -143,5 +141,3 @@ jobs: run: | ls ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit" - - name: cleanup - run: rm ~/.ssh/id_rsa From f737b869ce9b9b13916db8f95ab672cd84444e3f Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Thu, 15 Feb 2024 12:55:20 +0330 Subject: [PATCH 90/91] update checkout action --- .github/workflows/ci.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 462b1e0..67bc16e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,9 +67,9 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -89,10 +89,10 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.x @@ -112,7 +112,7 @@ jobs: steps: - name: checkout repo - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: set up ssh keys uses: shimataro/ssh-key-action@v2 @@ -130,7 +130,7 @@ jobs: steps: - name: checkout repo - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: set up ssh keys uses: shimataro/ssh-key-action@v2 From 8cf5ec531d3e85bc989211c2e5f292e4f7c7cef2 Mon Sep 17 00:00:00 2001 From: Mahdi Gheidi Date: Thu, 15 Feb 2024 13:27:00 +0330 Subject: [PATCH 91/91] fix lint --- kafka_server/coordinator/api/broker/api.py | 5 +++-- kafka_server/coordinator_database/main.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka_server/coordinator/api/broker/api.py b/kafka_server/coordinator/api/broker/api.py index a0d1556..ad1507d 100644 --- a/kafka_server/coordinator/api/broker/api.py +++ b/kafka_server/coordinator/api/broker/api.py @@ -2,7 +2,7 @@ import json import os import sys -import random +from random import SystemRandom COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/") sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH)) @@ -13,6 +13,7 @@ api_blueprint = Blueprint('api', __name__) +cryptogen = SystemRandom() @api_blueprint.route('/init', methods=['GET']) @@ -35,7 +36,7 @@ def init_broker(): replica_url = None if broker_id not in all_brokers_replicas: keys = all_brokers.keys() - key = random.choice(list(keys)) + key = cryptogen.choice(list(keys)) replica_url = all_brokers[key] response_code = broker_database.add_replica_for_broker(broker_id, replica_url) if response_code != 200: diff --git a/kafka_server/coordinator_database/main.py b/kafka_server/coordinator_database/main.py index 738a636..0e00121 100644 --- a/kafka_server/coordinator_database/main.py +++ b/kafka_server/coordinator_database/main.py @@ -50,7 +50,7 @@ def delete_broker(): broker_id = data['broker_id'] with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(broker.delete_broker, broker_id) - result = future.result() + _ = future.result() return jsonify("Successfully deleted")