Skip to content

Commit

Permalink
Updating dependencies (#14)
Browse files Browse the repository at this point in the history
* Updating dependencies
  • Loading branch information
dgrechka authored Oct 28, 2022
1 parent 9a07868 commit e68dd34
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ authors = [
{ name="Dmitry Grechka", email="[email protected]" },
]
dependencies = [
'kafka-python == 2.0.2',
'imageio == 2.21.1'
'kafka-python3 == 3.0.0',
'imageio == 2.22.2'
]

[tool.hatch.build.targets.wheel]
Expand Down
12 changes: 6 additions & 6 deletions src/kafkajobs/jobqueue/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import kafka
import kafka3
import os
from kafka.admin import KafkaAdminClient, NewTopic
from kafka3.admin import KafkaAdminClient, NewTopic

def strSerializer(jobName):
return jobName.encode('utf-8')
Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(self, kafkaBootstrapUrl,topicName, appName, num_partitions=None, re
try:
admin_client.create_topics(new_topics=topic_list, validate_only=False)
print("Topic {0} is created".format(topicName))
except kafka.errors.TopicAlreadyExistsError:
except kafka3.errors.TopicAlreadyExistsError:
print("Topic {0} already exists".format(topicName))
else:
print("Topic {0} already exists".format(topicName))
Expand All @@ -57,7 +57,7 @@ class JobQueueProducer(JobQueue):
def __init__(self, *args, **kwargs):
super(JobQueueProducer, self).__init__(*args, **kwargs)

self.producer = kafka.KafkaProducer( \
self.producer = kafka3.KafkaProducer( \
bootstrap_servers = self.kafkaBootstrapUrl, \
client_id = self.appName,
key_serializer = strSerializer,
Expand All @@ -75,7 +75,7 @@ def Enqueue(self, jobName, jobBody):
self.producer.send(self.topicName, value=jobBody, key= jobName)
self.producer.flush()
success = True
except kafka.errors.KafkaTimeoutError as err:
except kafka3.errors.KafkaTimeoutError as err:
attempt += 1
print(f"Error during kafka job message enqueue: {err}. Attempt {attempt}")
if success:
Expand All @@ -89,7 +89,7 @@ def __init__(self, group_id, max_permited_work_time_sec=300, *args, **kwargs):
super(JobQueueWorker, self).__init__(*args, **kwargs)

self.teardown = False
self.consumer = kafka.KafkaConsumer(self.topicName, \
self.consumer = kafka3.KafkaConsumer(self.topicName, \
bootstrap_servers = self.kafkaBootstrapUrl, \
client_id = self.appName,
group_id = group_id,
Expand Down

0 comments on commit e68dd34

Please sign in to comment.