From 7ef05f36d041cd2a9ca63d874566f8e16945e428 Mon Sep 17 00:00:00 2001 From: Frederico Araujo Date: Tue, 23 Apr 2024 18:36:33 -0400 Subject: [PATCH] fix: parsing of kafka configuration Signed-off-by: Frederico Araujo --- Dockerfile | 8 +++--- Makefile | 26 +++++++++---------- core/exporter/commons/kafkaconfig.go | 6 ++--- .../source/otel/contextualizer.go | 3 ++- driver/otel/config.go | 15 ++++++----- driver/otel/kafkadriver.go | 2 +- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/Dockerfile b/Dockerfile index a4cebba1..af0de030 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,10 +53,10 @@ COPY makefile.manifest.inc ${SRC_ROOT} # Build RUN cd ${SRC_ROOT} && \ - make BACKEND_TAG=$BACKEND_TAG \ - SYSFLOW_VERSION=$VERSION \ - SYSFLOW_BUILD_NUMBER=$BUILD_NUMBER \ - install + make BACKEND_TAG=${BACKEND_TAG} \ + SYSFLOW_VERSION=${VERSION} \ + SYSFLOW_BUILD_NUMBER=${BUILD_NUMBER} \ + install #----------------------- # Stage: runtime diff --git a/Makefile b/Makefile index 1ccf3b10..a51b47ab 100644 --- a/Makefile +++ b/Makefile @@ -10,18 +10,18 @@ include ./makefile.manifest.inc # Basic go commands -PATH=$(shell printenv PATH):/usr/local/go/bin -BACKEND_TAG?=flatrecord -GOCMD=go -GOBUILD=$(GOCMD) build -trimpath -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}" -GOCLEAN=$(GOCMD) clean -GOTEST=$(GOCMD) test -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}" -GOGET=$(GOCMD) get -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}" -BIN=sfprocessor -OUTPUT=$(BIN) -SRC=./driver -PACKDIR=./scripts/cpack -INSTALL_PATH=/usr/local/sysflow +PATH = $(shell printenv PATH):/usr/local/go/bin +BACKEND_TAG ?= flatrecord +GOCMD = go +GOBUILD = $(GOCMD) build -trimpath -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}" +GOCLEAN = $(GOCMD) clean +GOTEST = $(GOCMD) test -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}" +GOGET = $(GOCMD) get -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}" +BIN = sfprocessor +OUTPUT = $(BIN) +SRC = ./driver +PACKDIR = ./scripts/cpack +INSTALL_PATH = /usr/local/sysflow .PHONY: build build: version deps @@ -71,7 +71,7 @@ docker-build: docker-plugin-builder .PHONY: docker-plugin-builder docker-plugin-builder: - ( DOCKER_BUILDKIT=1 docker build -t sysflowtelemetry/plugin-builder:${SYSFLOW_VERSION} --build-arg UBI_VER=$(UBI_VERSION) --target=base -f Dockerfile . ) + ( DOCKER_BUILDKIT=1 docker build -t sysflowtelemetry/plugin-builder:${SYSFLOW_VERSION} --build-arg BACKEND_TAG=$(BACKEND_TAG) --build-arg UBI_VER=$(UBI_VERSION) --target=base -f Dockerfile . ) .PHONY: pull pull: diff --git a/core/exporter/commons/kafkaconfig.go b/core/exporter/commons/kafkaconfig.go index 81733cea..2044efe0 100644 --- a/core/exporter/commons/kafkaconfig.go +++ b/core/exporter/commons/kafkaconfig.go @@ -50,14 +50,12 @@ func CreateKafkaConfig(bc Config, conf map[string]interface{}) (c KafkaConfig, e // parse config map if v, ok := conf[KafkaConfigKey].(map[string]interface{}); ok { - cm := kafka.ConfigMap{} for key, value := range v { - cm.SetKey(key, value) + c.ConfigMap.SetKey(key, value) } - if _, ok := cm["bootstrap.servers"]; !ok { + if _, ok := c.ConfigMap["bootstrap.servers"]; !ok { return c, fmt.Errorf("no broker list found to initialize the kafka producer") } - c.ConfigMap = cm } else { return c, fmt.Errorf("no kafka config map defined in configuration") } diff --git a/core/policyengine/source/otel/contextualizer.go b/core/policyengine/source/otel/contextualizer.go index e2b9a4a1..4691031f 100644 --- a/core/policyengine/source/otel/contextualizer.go +++ b/core/policyengine/source/otel/contextualizer.go @@ -18,7 +18,8 @@ func (c *Contextualizer) GetRules(r *ResourceLogs) []policy.Rule[*ResourceLogs] return nil } -func (c *Contextualizer) AddTags(r *ResourceLogs, tags ...string) {} +func (c *Contextualizer) AddTags(r *ResourceLogs, tags ...string) { +} func (c *Contextualizer) GetTags(r *ResourceLogs) []string { return nil diff --git a/driver/otel/config.go b/driver/otel/config.go index 480b020f..47931519 100644 --- a/driver/otel/config.go +++ b/driver/otel/config.go @@ -50,26 +50,27 @@ func CreateKafkaConfig(conf map[string]interface{}) (c KafkaConfig, err error) { // parse config map if v, ok := conf[KafkaConfigKey].(map[string]interface{}); ok { - cm := kafka.ConfigMap{} for key, value := range v { - cm.SetKey(key, value) + c.ConfigMap.SetKey(key, value) } - if _, ok := cm["bootstrap.servers"]; !ok { + if _, ok := c.ConfigMap["bootstrap.servers"]; !ok { return c, fmt.Errorf("no broker list found to initialize the kafka consumer") } - c.ConfigMap = cm } else { return c, fmt.Errorf("no kafka config map defined in configuration") } - if v, ok := conf[KafkaTopicsKey].([]string); ok { - c.Topics = v + if v, ok := conf[KafkaTopicsKey].([]interface{}); ok { + topics := []string{} + for _, value := range v { + topics = append(topics, value.(string)) + } + c.Topics = topics } else { return c, fmt.Errorf("no kafka topics defined in configuration") } if v, ok := conf[KafkaEncodingKey].(string); ok { c.Encoding = parseEncodingConfig(v) } - return } diff --git a/driver/otel/kafkadriver.go b/driver/otel/kafkadriver.go index 1c4b264b..5332502e 100644 --- a/driver/otel/kafkadriver.go +++ b/driver/otel/kafkadriver.go @@ -71,7 +71,7 @@ func (s *KafkaDriver) Init(pipeline plugins.SFPipeline, config map[string]interf return fmt.Errorf("could not create kafka consumer") } - err = s.consumer.SubscribeTopics(conf.Topics, nil) + err = consumer.SubscribeTopics(conf.Topics, nil) if err != nil { return fmt.Errorf("unable to subscribe to kafka topics: %v", conf.Topics) }