From 569698a444e0ff09a3bc6442c6e28dc5815015f6 Mon Sep 17 00:00:00 2001 From: Diego Date: Sat, 22 Apr 2023 09:27:15 -0400 Subject: [PATCH] feat: add kafka auth --- settings.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/settings.py b/settings.py index 4016668..5fb6b81 100644 --- a/settings.py +++ b/settings.py @@ -23,8 +23,8 @@ "PARAMS": { "bootstrap.servers": os.environ["CONSUMER_SERVER"], "group.id": os.environ["CONSUMER_GROUP_ID"], - "auto.offset.reset":"beginning", - "max.poll.interval.ms": 3600000, + "auto.offset.reset": "beginning", + "max.poll.interval.ms": 3600000, "enable.partition.eof": os.getenv("ENABLE_PARTITION_EOF", False), }, "consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 10)), @@ -36,7 +36,7 @@ "PARAMS": { "bootstrap.servers": os.environ["PRODUCER_SERVER"], }, - "SCHEMA": SCHEMA + "SCHEMA": SCHEMA, } METRICS_CONFIG = { @@ -45,12 +45,13 @@ {"key": "candid", "format": lambda x: str(x)}, "oid", {"key": "detections", "format": lambda x: len(x), "alias": "n_det"}, - {"key": "non_detections", "format": lambda x: len(x), "alias": "n_non_det"} + {"key": "non_detections", "format": lambda x: len(x), "alias": "n_non_det"}, ], "PARAMS": { "PARAMS": { "bootstrap.servers": os.environ["METRICS_HOST"], - "auto.offset.reset":"smallest"}, + "auto.offset.reset": "smallest", + }, "TOPIC": os.environ["METRICS_TOPIC"], "SCHEMA": { "$schema": "http://json-schema.org/draft-07/schema", @@ -86,12 +87,26 @@ }, } +if os.getenv("KAFKA_USERNAME") and os.getenv("KAFKA_PASSWORD"): + CONSUMER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL" + CONSUMER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" + CONSUMER_CONFIG["PARAMS"]["sasl.username"] = os.getenv("KAFKA_USERNAME") + CONSUMER_CONFIG["PARAMS"]["sasl.password"] = os.getenv("KAFKA_PASSWORD") + PRODUCER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL" + PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" + PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv("KAFKA_USERNAME") + PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv("KAFKA_PASSWORD") + METRICS_CONFIG["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL" + METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" + METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv("KAFKA_USERNAME") + METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv("KAFKA_PASSWORD") + STEP_METADATA = { "STEP_VERSION": os.getenv("STEP_VERSION", "dev"), "STEP_ID": os.getenv("STEP_ID", "features"), "STEP_NAME": os.getenv("STEP_NAME", "features"), "STEP_COMMENTS": os.getenv("STEP_COMMENTS", ""), - "FEATURE_VERSION": os.getenv("FEATURE_VERSION", "dev") + "FEATURE_VERSION": os.getenv("FEATURE_VERSION", "dev"), } STEP_CONFIG = {