Skip to content

Commit

Permalink
kafka topic configuration through env vars (#12)
Browse files Browse the repository at this point in the history
* kafka topic configuration through env vars
  • Loading branch information
dgrechka authored Aug 20, 2022
1 parent 88833e8 commit 9a07868
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/kafkajobs/jobqueue/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import base64
import kafka
import asyncio
import os
from kafka.admin import KafkaAdminClient, NewTopic

def strSerializer(jobName):
Expand All @@ -21,7 +20,14 @@ def dictDeserializer(jobBytes):
return json.loads(jobBytes.decode('utf-8'))

class JobQueue:
def __init__(self, kafkaBootstrapUrl,topicName, appName, num_partitions=8, replication_factor=3, retentionHours = 7*24):
def __init__(self, kafkaBootstrapUrl,topicName, appName, num_partitions=None, replication_factor=None, retentionHours = None):
if replication_factor is None:
replication_factor = int(os.environ.get('KAFKA_REPLICATION_FACTOR', '1'))
if num_partitions is None:
num_partitions = int(os.environ.get('KAFKA_NUM_PARTITIONS', '8'))
if retentionHours is None:
retentionHours = int(os.environ.get('KAFKA_RETENTION_HOURS', '168')) # 168 hours = 1 week

self.kafkaBootstrapUrl = kafkaBootstrapUrl
self.topicName = topicName
self.appName = appName
Expand All @@ -32,7 +38,7 @@ def __init__(self, kafkaBootstrapUrl,topicName, appName, num_partitions=8, repli

topic_list = []
topic_configs = {
#'log.retention.hours': str(retentionHours)
'retention.ms': str(retentionHours*60*60*1000),
}
topic_list.append(NewTopic(name=topicName, num_partitions=num_partitions, replication_factor=replication_factor,topic_configs=topic_configs))
topics = admin_client.list_topics()
Expand Down

0 comments on commit 9a07868

Please sign in to comment.