Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	.github/workflows/ci.yml
#	kafka_server/broker/file/indexer.py
#	kafka_server/broker/file/write.py
#	kafka_server/coordinator/api/broker/api.py
#	kafka_server/coordinator/services/broker/subscribe.py
  • Loading branch information
hoseinaghaei committed Feb 15, 2024
2 parents 7d3bd6a + 8cf5ec5 commit 9c71625
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 106 deletions.
16 changes: 6 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand All @@ -89,10 +89,10 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.x

Expand All @@ -112,7 +112,7 @@ jobs:

steps:
- name: checkout repo
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: set up ssh keys
uses: shimataro/ssh-key-action@v2
Expand All @@ -122,8 +122,6 @@ jobs:
- name: connect and pull
run: |
ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit"
- name: cleanup
run: rm ~/.ssh/id_rsa
deploy-prod-coordinator:
name: Deploy Coordinator to Production
Expand All @@ -132,7 +130,7 @@ jobs:

steps:
- name: checkout repo
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: set up ssh keys
uses: shimataro/ssh-key-action@v2
Expand All @@ -143,5 +141,3 @@ jobs:
run: |
ls
ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit"
- name: cleanup
run: rm ~/.ssh/id_rsa
2 changes: 1 addition & 1 deletion kafka_server/broker/file/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ def send_to_replica(self):
url = f'{self.replica}/replica/index'
data = {'partition': self.partition, 'read': self._read, 'sync': self._sync}
print(data, "to Replica")
response = requests.post(url, json=data)
response = requests.post(url, json=data, timeout=2)
if response.status_code != 200:
raise Exception(f'indexed not yet updated {response}')
2 changes: 1 addition & 1 deletion kafka_server/broker/file/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def write_data(self, key: str, value: str):
with self._write_lock:
md5 = hash_md5(key)
partition_count = get_partition_count()
if int(md5, 16) % partition_count != (int(self.partition) - 1 % partition_count):
if int(md5, 16) % partition_count != int(self.partition) - 1:
raise Exception(f"key is not for this partition, fuck {md5} {key} {partition_count}")

appended = self.segment.append(key, value)
Expand Down
21 changes: 12 additions & 9 deletions kafka_server/coordinator/api/broker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import sys
from random import SystemRandom

COORDINATOR_PROJECT_PATH = os.getenv("COORDINATOR_PROJECT_PATH", "/app/")
sys.path.append(os.path.abspath(COORDINATOR_PROJECT_PATH))
Expand All @@ -12,6 +13,7 @@


api_blueprint = Blueprint('api', __name__)
cryptogen = SystemRandom()


@api_blueprint.route('/init', methods=['GET'])
Expand All @@ -31,16 +33,17 @@ def init_broker():
if response_code != 200:
return jsonify("Error during initializing broker"), response_code

# if len(all_brokers) > 1:
# keys = all_brokers.keys()
# key = random.choice(list(keys))
# replica_url = all_brokers[key]
# response_code = broker_database.add_replica_for_broker(broker_id, replica_url)
# if response_code != 200:
# return jsonify("Error during initializing broker"), response_code
# return jsonify(replica_url), 200
replica_url = None
if broker_id not in all_brokers_replicas:
keys = all_brokers.keys()
key = cryptogen.choice(list(keys))
replica_url = all_brokers[key]
response_code = broker_database.add_replica_for_broker(broker_id, replica_url)
if response_code != 200:
return jsonify("Error during initializing broker"), response_code
else:
replica_url = all_brokers_replicas[broker_id]

replica_url = all_brokers_replicas[broker_id]
partition_count = len(all_brokers)
master_coordinator_url = config.MASTER_COORDINATOR_URL
backup_coordinator_url = config.BACKUP_COORDINATOR_URL
Expand Down
113 changes: 89 additions & 24 deletions kafka_server/coordinator/api/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,37 +49,102 @@ def subscribe():
random_id = cryptogen.randint(1, 1000000)

response_code, all_subscriptions = broker_subscribe_service.get_all_subscriptions()
print("################allllll subscriptions\n", all_subscriptions)
if response_code != 200:
return jsonify("Error during getting list of brokers from database"), response_code

response_code, response_data = broker_database.list_all_brokers()
response_code, all_brokers = broker_database.list_all_brokers()
if response_code != 200:
return jsonify("Error during getting list of brokers from database"), response_code

min_length = 10000
selected_broker_id = None
for key in response_data.keys():
if len(response_data[key]) < min_length:
selected_broker_id = key

broker_data = f"{selected_broker_id}:{response_data[selected_broker_id]}"
broker_url = response_data[selected_broker_id]

tmp_dict = {}
if broker_data in all_subscriptions:
for subs in all_subscriptions[broker_data]:
tmp_dict[subs[1]] = subs[0]
else:
tmp_dict[random_id] = client_addr
response_code = broker_subscribe_service.send_subscribe_to_broker(broker_url, tmp_dict)
if response_code != 200:
return jsonify("Error during sending subscription to broker"), response_code

response_code = client_database.add_subscription_plan(broker_data, client_addr, random_id)
all_subscribers = [[client_addr, random_id]]
for broker_id_url in all_subscriptions.keys():
for sub in all_subscriptions[broker_id_url]:
all_subscribers.append(sub)

print("######all subscriber before\n", all_subscribers)
unique_tuples = set(tuple(x) for x in all_subscribers)
all_subscribers = [list(x) for x in unique_tuples]

all_subscribers = list(all_subscribers)
tmp_subscriptions = {}
all_subscribers_length = len(all_subscribers)
all_brokers_length = len(all_brokers)
print("######all subscribers\n", all_subscribers)
ex = all_subscribers_length // all_brokers_length
print("###########ex ", ex)
j = 0
for broker_id in all_brokers:
i = ex
while i > 0:
if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions:
tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = []
tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[j])
j += 1
i -= 1

print("####aghaei\n", tmp_subscriptions)

for index in range(j, len(all_subscribers)):
for broker_id in all_brokers:
if f"{broker_id}:{all_brokers[broker_id]}" not in tmp_subscriptions:
tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"] = []
tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"].append(all_subscribers[index])

print("####aghaei2\n", tmp_subscriptions)

all_brokers_for_client = []
for t_s in tmp_subscriptions.keys():
for tupl in tmp_subscriptions[t_s]:
if client_addr in tupl:
all_brokers_for_client.append(t_s)

for broker_id in all_brokers.keys():
t = {}
for sub in tmp_subscriptions[f"{broker_id}:{all_brokers[broker_id]}"]:
t[sub[1]] = sub[0]
print("##########senda subscriptions to broker")
print(t)
broker_subscribe_service.send_subscribe_to_broker(all_brokers[broker_id], t)

print("@@@@@@@@tmp_subscriptions to write\n", tmp_subscriptions)
response_code = broker_subscribe_service.write_subscriptions(tmp_subscriptions)
if response_code != 200:
return jsonify("Error during adding subscription to database"), response_code

return jsonify({"broker_url": broker_data, "id": random_id}), 200
return jsonify("Error during finding broker for subscribe"), response_code

return jsonify({"id": random_id}), 200

# min_length = 1000000
# selected_broker_id = None
# print("#######find broker for subscriber")
# for key in all_brokers.keys():
# if f"{key}:{all_brokers[key]}" not in all_subscriptions.keys():
# print("check by first if")
# selected_broker_id = key
# break
# if len(all_subscriptions[f"{key}:{all_brokers[key]}"]) < min_length:
# print("check by second if")
# min_length = len(all_subscriptions[f"{key}:{all_brokers[key]}"])
# selected_broker_id = key

# broker_data = f"{selected_broker_id}:{all_brokers[selected_broker_id]}"
# broker_url = all_brokers[selected_broker_id]

# tmp_dict = {}
# if broker_data in all_subscriptions:
# for subs in all_subscriptions[broker_data]:
# tmp_dict[subs[1]] = subs[0]
# else:
# tmp_dict[random_id] = client_addr
# response_code = broker_subscribe_service.send_subscribe_to_broker(broker_url, tmp_dict)
# if response_code != 200:
# return jsonify("Error during sending subscription to broker"), response_code
#
# response_code = client_database.add_subscription_plan(broker_data, client_addr, random_id)
# if response_code != 200:
# return jsonify("Error during adding subscription to database"), response_code
#
# return jsonify({"broker_url": broker_data, "id": random_id}), 200


@api_blueprint.route('/heartbeat', methods=['POST'])
Expand Down
Loading

0 comments on commit 9c71625

Please sign in to comment.