Skip to content

Commit

Permalink
Added basic structure.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmyroslav committed May 3, 2024
1 parent 899172d commit 54061b8
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 13 deletions.
123 changes: 123 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package saramahealth

import (
"context"

Check failure on line 4 in check.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
"github.com/IBM/sarama"
"github.com/pkg/errors"
"log/slog"
)

type HealthMonitor interface {
Track(ctx context.Context, msg *sarama.ConsumerMessage) error
Release(ctx context.Context, topic string, partition int32) error
Healthy(ctx context.Context) (bool, error)
}

type State struct {
stateMap map[string]map[int32]int64
}

type HealthCheckerImpl struct {

Check failure on line 20 in check.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 72 pointer bytes could be 56 (govet)
topics []string
client sarama.Client
tracker *Tracker
latestState *State

Check failure on line 24 in check.go

View workflow job for this annotation

GitHub Actions / lint

field `latestState` is unused (unused)
prevState *State
logger *slog.Logger

Check failure on line 26 in check.go

View workflow job for this annotation

GitHub Actions / lint

field `logger` is unused (unused)
}

func NewHealthChecker(cfg Config) (*HealthCheckerImpl, error) {
client, err := sarama.NewClient(cfg.Brokers, cfg.SaramaConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create sarama client")
}

return &HealthCheckerImpl{
client: client,
tracker: NewTracker(),
topics: cfg.Topics,
prevState: nil,
}, nil
}

func (h *HealthCheckerImpl) Healthy(ctx context.Context) (bool, error) {

Check warning on line 43 in check.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
latestState := &State{stateMap: make(map[string]map[int32]int64)}

for _, topic := range h.topics {
latestOffset, err := h.getLatestOffset(topic)
if err != nil {
return false, err
}

latestState.stateMap[topic] = latestOffset
}

currentState := h.tracker.CurrentOffsets()
if h.prevState == nil {
h.prevState = &State{stateMap: currentState}

return true, nil
}

// check if the current state equals to the latest state
// return true only if the current state equals to the latest state
// otherwise go to the next check

var topicRes bool
for topic := range currentState {

Check failure on line 67 in check.go

View workflow job for this annotation

GitHub Actions / lint

ranges should only be cuddled with assignments used in the iteration (wsl)
for partition := range currentState[topic] {
if currentState[topic][partition] == latestState.stateMap[topic][partition] {
topicRes = true
}
}
}

if topicRes {
return true, nil
}

// check if the current state is greater than the previous state
// return true only if the current state is greater than the previous state for all topics and partitions
// otherwise return false

for topic := range currentState {
for partition := range currentState[topic] {
if currentState[topic][partition] <= h.prevState.stateMap[topic][partition] {
return false, nil
}
}
}

return true, nil
}

func (h *HealthCheckerImpl) Track(ctx context.Context, msg *sarama.ConsumerMessage) error {

Check warning on line 94 in check.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
h.tracker.Track(msg)

return nil
}

func (h *HealthCheckerImpl) Release(ctx context.Context, topic string, partition int32) error {

Check warning on line 100 in check.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
h.tracker.Reset(topic, partition)

return nil
}

func (h *HealthCheckerImpl) getLatestOffset(topic string) (map[int32]int64, error) {
var offsets = make(map[int32]int64)

partitions, err := h.client.Partitions(topic)
if err != nil {
return offsets, err

Check failure on line 111 in check.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (github.com/IBM/sarama.Client).Partitions(topic string) ([]int32, error) (wrapcheck)
}

for _, partition := range partitions {
offset, err := h.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return offsets, err

Check failure on line 117 in check.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (github.com/IBM/sarama.Client).GetOffset(topic string, partitionID int32, time int64) (int64, error) (wrapcheck)
}
offsets[partition] = offset - 1 // subtract 1 to get the latest offset, not the next offset
}

return offsets, nil
}
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package saramahealth

import "github.com/IBM/sarama"

type Config struct {

Check failure on line 5 in config.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 56 pointer bytes could be 40 (govet)
Brokers []string
Topics []string
SaramaConfig *sarama.Config
}
26 changes: 26 additions & 0 deletions examples/simple/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Start from the latest golang base image
FROM golang:1.22

# Add Maintainer Info
LABEL maintainer="myroslavvivcharyk"

# Set the Current Working Directory inside the container
WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./

# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed
RUN go mod download

# Copy the source from the current directory to the Working Directory inside the container
COPY . .

# Build the Go app
RUN go build -o main .

# Expose port 8080 to the outside world
EXPOSE 8080

# Command to run the executable
CMD ["./main"]
29 changes: 29 additions & 0 deletions examples/simple/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: '3.8'

name: sarama-health-simple

services:
kafka:
image: confluentinc/confluent-local:7.6.0
hostname: broker
container_name: broker
ports:
- "8082:8082"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_NUM_PARTITIONS: '${KAFKA_NUM_PARTITIONS:-3}'
healthcheck:
test: [ "CMD", "nc", "-vz", "broker", "29092" ]
interval: 3s
timeout: 5s
retries: 3
networks:
- sarame-health-simple-network

networks:
sarame-health-simple-network:
driver: bridge
128 changes: 128 additions & 0 deletions examples/simple/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"context"
"fmt"
"github.com/IBM/sarama"
saramahealth "github.com/vmyroslav/sarama-health"
"log"
"net/http"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V3_0_1_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

// Create a new consumer group
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-consumer-group", config)
if err != nil {
log.Panicf("Error creating consumer group: %v", err)
}
defer func() {
if err := group.Close(); err != nil {
log.Panicf("Error closing consumer group: %v", err)
}
}()

healhMonitor, err := saramahealth.NewHealthChecker(saramahealth.Config{
Brokers: []string{"localhost:9092"},
Topics: []string{"my-topic"},
SaramaConfig: config,
})

if err != nil {
log.Panicf("Error creating health monitor: %v", err)
}

// Consumer group handler
ctx := context.Background()
consumer := Consumer{
healthMonitor: healhMonitor,
}

// Consume messages
go func() {
for {
err := group.Consume(ctx, []string{"my-topic"}, &consumer)
if err != nil {
log.Printf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()

// Start HTTP server
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
isOk, err := healhMonitor.Healthy(context.Background())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !isOk {
http.Error(w, "Not OK", http.StatusServiceUnavailable)
return
}

fmt.Fprintln(w, "OK")
})

go func() {
if err := http.ListenAndServe(":8083", nil); err != nil {
log.Fatalf("Failed to start HTTP server: %v", err)
}
}()

<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
healthMonitor *saramahealth.HealthCheckerImpl
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()

for {
select {
case <-ctx.Done():
println("done")

consumer.healthMonitor.Release(ctx, claim.Topic(), claim.Partition())
return nil
case message, ok := <-claim.Messages():
if !ok {
return nil
}

err := consumer.healthMonitor.Track(ctx, message)
if err != nil {
println(err.Error())
}

if string(message.Value) == "fail" {
return fmt.Errorf("error")
}

session.MarkMessage(message, "")
}
}
}
28 changes: 26 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
module home-bartender-back
module github.com/vmyroslav/sarama-health

go 1.22

require github.com/vmyroslav/home-lib v0.1.0 // indirect
require (
github.com/IBM/sarama v1.43.2
github.com/pkg/errors v0.9.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
)
Loading

0 comments on commit 54061b8

Please sign in to comment.