Skip to content

Commit

Permalink
fix: parsing of kafka configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Frederico Araujo <[email protected]>
  • Loading branch information
araujof committed Apr 24, 2024
1 parent dac8a89 commit 7ef05f3
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 30 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ COPY makefile.manifest.inc ${SRC_ROOT}

# Build
RUN cd ${SRC_ROOT} && \
make BACKEND_TAG=$BACKEND_TAG \
SYSFLOW_VERSION=$VERSION \
SYSFLOW_BUILD_NUMBER=$BUILD_NUMBER \
install
make BACKEND_TAG=${BACKEND_TAG} \
SYSFLOW_VERSION=${VERSION} \
SYSFLOW_BUILD_NUMBER=${BUILD_NUMBER} \
install

#-----------------------
# Stage: runtime
Expand Down
26 changes: 13 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
include ./makefile.manifest.inc

# Basic go commands
PATH=$(shell printenv PATH):/usr/local/go/bin
BACKEND_TAG?=flatrecord
GOCMD=go
GOBUILD=$(GOCMD) build -trimpath -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}"
GOCLEAN=$(GOCMD) clean
GOTEST=$(GOCMD) test -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}"
GOGET=$(GOCMD) get -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}"
BIN=sfprocessor
OUTPUT=$(BIN)
SRC=./driver
PACKDIR=./scripts/cpack
INSTALL_PATH=/usr/local/sysflow
PATH = $(shell printenv PATH):/usr/local/go/bin
BACKEND_TAG ?= flatrecord
GOCMD = go
GOBUILD = $(GOCMD) build -trimpath -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}"
GOCLEAN = $(GOCMD) clean
GOTEST = $(GOCMD) test -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}"
GOGET = $(GOCMD) get -tags "exclude_graphdriver_btrfs ${BACKEND_TAG}"
BIN = sfprocessor
OUTPUT = $(BIN)
SRC = ./driver
PACKDIR = ./scripts/cpack
INSTALL_PATH = /usr/local/sysflow

.PHONY: build
build: version deps
Expand Down Expand Up @@ -71,7 +71,7 @@ docker-build: docker-plugin-builder

.PHONY: docker-plugin-builder
docker-plugin-builder:
( DOCKER_BUILDKIT=1 docker build -t sysflowtelemetry/plugin-builder:${SYSFLOW_VERSION} --build-arg UBI_VER=$(UBI_VERSION) --target=base -f Dockerfile . )
( DOCKER_BUILDKIT=1 docker build -t sysflowtelemetry/plugin-builder:${SYSFLOW_VERSION} --build-arg BACKEND_TAG=$(BACKEND_TAG) --build-arg UBI_VER=$(UBI_VERSION) --target=base -f Dockerfile . )

.PHONY: pull
pull:
Expand Down
6 changes: 2 additions & 4 deletions core/exporter/commons/kafkaconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ func CreateKafkaConfig(bc Config, conf map[string]interface{}) (c KafkaConfig, e

// parse config map
if v, ok := conf[KafkaConfigKey].(map[string]interface{}); ok {
cm := kafka.ConfigMap{}
for key, value := range v {
cm.SetKey(key, value)
c.ConfigMap.SetKey(key, value)
}
if _, ok := cm["bootstrap.servers"]; !ok {
if _, ok := c.ConfigMap["bootstrap.servers"]; !ok {
return c, fmt.Errorf("no broker list found to initialize the kafka producer")
}
c.ConfigMap = cm
} else {
return c, fmt.Errorf("no kafka config map defined in configuration")
}
Expand Down
3 changes: 2 additions & 1 deletion core/policyengine/source/otel/contextualizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func (c *Contextualizer) GetRules(r *ResourceLogs) []policy.Rule[*ResourceLogs]
return nil
}

func (c *Contextualizer) AddTags(r *ResourceLogs, tags ...string) {}
func (c *Contextualizer) AddTags(r *ResourceLogs, tags ...string) {
}

func (c *Contextualizer) GetTags(r *ResourceLogs) []string {
return nil
Expand Down
15 changes: 8 additions & 7 deletions driver/otel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,27 @@ func CreateKafkaConfig(conf map[string]interface{}) (c KafkaConfig, err error) {

// parse config map
if v, ok := conf[KafkaConfigKey].(map[string]interface{}); ok {
cm := kafka.ConfigMap{}
for key, value := range v {
cm.SetKey(key, value)
c.ConfigMap.SetKey(key, value)
}
if _, ok := cm["bootstrap.servers"]; !ok {
if _, ok := c.ConfigMap["bootstrap.servers"]; !ok {
return c, fmt.Errorf("no broker list found to initialize the kafka consumer")
}
c.ConfigMap = cm
} else {
return c, fmt.Errorf("no kafka config map defined in configuration")
}
if v, ok := conf[KafkaTopicsKey].([]string); ok {
c.Topics = v
if v, ok := conf[KafkaTopicsKey].([]interface{}); ok {
topics := []string{}
for _, value := range v {
topics = append(topics, value.(string))
}
c.Topics = topics
} else {
return c, fmt.Errorf("no kafka topics defined in configuration")
}
if v, ok := conf[KafkaEncodingKey].(string); ok {
c.Encoding = parseEncodingConfig(v)
}

return
}

Expand Down
2 changes: 1 addition & 1 deletion driver/otel/kafkadriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *KafkaDriver) Init(pipeline plugins.SFPipeline, config map[string]interf
return fmt.Errorf("could not create kafka consumer")
}

err = s.consumer.SubscribeTopics(conf.Topics, nil)
err = consumer.SubscribeTopics(conf.Topics, nil)
if err != nil {
return fmt.Errorf("unable to subscribe to kafka topics: %v", conf.Topics)
}
Expand Down

0 comments on commit 7ef05f3

Please sign in to comment.