Skip to content

Commit

Permalink
Updated code base.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmyroslav committed May 3, 2024
1 parent 54061b8 commit 3243be9
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 72 deletions.
47 changes: 25 additions & 22 deletions examples/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ package main
import (
"context"
"fmt"
"github.com/IBM/sarama"
saramahealth "github.com/vmyroslav/sarama-health"
"log"
"net/http"

"github.com/IBM/sarama"
saramahealth "github.com/vmyroslav/sarama-health"
)

var (
topics = []string{"my-topic"}
brokers = []string{"localhost:9092"}
consumerGroup = "my-consumer-group"
)

func main() {
ctx := context.Background()

config := sarama.NewConfig()
config.Version = sarama.V3_0_1_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Return.Errors = false

// Create a new consumer group
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-consumer-group", config)
group, err := sarama.NewConsumerGroup(brokers, consumerGroup, config)
if err != nil {
log.Panicf("Error creating consumer group: %v", err)
}
Expand All @@ -26,26 +34,22 @@ func main() {
}
}()

healhMonitor, err := saramahealth.NewHealthChecker(saramahealth.Config{
Brokers: []string{"localhost:9092"},
Topics: []string{"my-topic"},
healthMonitor, err := saramahealth.NewHealthChecker(saramahealth.Config{
Brokers: brokers,
Topics: topics,
SaramaConfig: config,
})

if err != nil {
log.Panicf("Error creating health monitor: %v", err)
}

// Consumer group handler
ctx := context.Background()
consumer := Consumer{
healthMonitor: healhMonitor,
}
consumer := Consumer{healthMonitor: healthMonitor}

// Consume messages
go func() {
for {
err := group.Consume(ctx, []string{"my-topic"}, &consumer)
err := group.Consume(ctx, topics, &consumer)
if err != nil {
log.Printf("Error from consumer: %v", err)
}
Expand All @@ -58,11 +62,12 @@ func main() {

// Start HTTP server
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
isOk, err := healhMonitor.Healthy(context.Background())
isOk, err := healthMonitor.Healthy(context.Background())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if !isOk {
http.Error(w, "Not OK", http.StatusServiceUnavailable)
return
Expand All @@ -84,7 +89,7 @@ func main() {
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
healthMonitor *saramahealth.HealthCheckerImpl
healthMonitor *saramahealth.HealthChecker
}

// Setup is run at the beginning of a new session, before ConsumeClaim
Expand All @@ -104,21 +109,19 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
for {
select {
case <-ctx.Done():
println("done")
fmt.Printf("session closed: %v\n", ctx.Err())

consumer.healthMonitor.Release(ctx, claim.Topic(), claim.Partition())

return nil
case message, ok := <-claim.Messages():
if !ok {
return nil
}

err := consumer.healthMonitor.Track(ctx, message)
if err != nil {
println(err.Error())
}
consumer.healthMonitor.Track(ctx, message)

if string(message.Value) == "fail" {
if string(message.Value) == "fail" { // Simulate a failure
return fmt.Errorf("error")
}

Expand Down
62 changes: 28 additions & 34 deletions check.go → monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,84 +2,82 @@ package saramahealth

import (
"context"
"io"
"log/slog"

"github.com/IBM/sarama"
"github.com/pkg/errors"
"log/slog"
)

type HealthMonitor interface {
Track(ctx context.Context, msg *sarama.ConsumerMessage) error
Release(ctx context.Context, topic string, partition int32) error
Track(ctx context.Context, msg *sarama.ConsumerMessage)
Release(ctx context.Context, topic string, partition int32)
Healthy(ctx context.Context) (bool, error)
}

type State struct {
stateMap map[string]map[int32]int64
}

type HealthCheckerImpl struct {
topics []string
client sarama.Client
tracker *Tracker
latestState *State
prevState *State
logger *slog.Logger
type HealthChecker struct {

Check failure on line 22 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 64 pointer bytes could be 48 (govet)
topics []string
client sarama.Client
tracker *tracker
prevState *State
logger *slog.Logger
}

func NewHealthChecker(cfg Config) (*HealthCheckerImpl, error) {
func NewHealthChecker(cfg Config) (*HealthChecker, error) {
client, err := sarama.NewClient(cfg.Brokers, cfg.SaramaConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create sarama client")
}

return &HealthCheckerImpl{
return &HealthChecker{
client: client,
tracker: NewTracker(),
tracker: newTracker(),
topics: cfg.Topics,
prevState: nil,
logger: slog.New(slog.NewJSONHandler(io.Discard, nil)),
}, nil
}

func (h *HealthCheckerImpl) Healthy(ctx context.Context) (bool, error) {
latestState := &State{stateMap: make(map[string]map[int32]int64)}

func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) {

Check warning on line 45 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
// get the latest offset for each topic
latestStateMap := make(map[string]map[int32]int64)
for _, topic := range h.topics {

Check failure on line 48 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

ranges should only be cuddled with assignments used in the iteration (wsl)
latestOffset, err := h.getLatestOffset(topic)
if err != nil {
return false, err
}

latestState.stateMap[topic] = latestOffset
latestStateMap[topic] = latestOffset
}

currentState := h.tracker.CurrentOffsets()
currentState := h.tracker.currentOffsets()
if h.prevState == nil {
h.prevState = &State{stateMap: currentState}

return true, nil
return true, nil // return true if this is the first time we check the state
}

// check if the current state equals to the latest state
// return true only if the current state equals to the latest state
// otherwise go to the next check

var topicRes bool
for topic := range currentState {

Check failure on line 68 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

ranges should only be cuddled with assignments used in the iteration (wsl)
for partition := range currentState[topic] {
if currentState[topic][partition] == latestState.stateMap[topic][partition] {
if currentState[topic][partition] == latestStateMap[topic][partition] {
topicRes = true
}
}
}

if topicRes {
return true, nil
return true, nil // return true if the current state equals to the latest state
}

// check if the current state is greater than the previous state
// return true only if the current state is greater than the previous state for all topics and partitions
// otherwise return false

for topic := range currentState {
for partition := range currentState[topic] {
if currentState[topic][partition] <= h.prevState.stateMap[topic][partition] {
Expand All @@ -91,20 +89,16 @@ func (h *HealthCheckerImpl) Healthy(ctx context.Context) (bool, error) {
return true, nil
}

func (h *HealthCheckerImpl) Track(ctx context.Context, msg *sarama.ConsumerMessage) error {
func (h *HealthChecker) Track(_ context.Context, msg *sarama.ConsumerMessage) {
h.tracker.Track(msg)

return nil
}

func (h *HealthCheckerImpl) Release(ctx context.Context, topic string, partition int32) error {
h.tracker.Reset(topic, partition)

return nil
func (h *HealthChecker) Release(_ context.Context, topic string, partition int32) {
h.tracker.drop(topic, partition)
}

func (h *HealthCheckerImpl) getLatestOffset(topic string) (map[int32]int64, error) {
var offsets = make(map[int32]int64)
func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) {
offsets := make(map[int32]int64)

partitions, err := h.client.Partitions(topic)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion state.go

This file was deleted.

25 changes: 10 additions & 15 deletions tracker.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package saramahealth

import (
"github.com/IBM/sarama"
"sync"

"github.com/IBM/sarama"
)

type Tracker struct {
topicPartitionOffsets map[string]map[int32]int64 //todo: change to sync.Map
type tracker struct {
topicPartitionOffsets map[string]map[int32]int64
mu sync.RWMutex
}

func NewTracker() *Tracker {
top := make(map[string]map[int32]int64)

return &Tracker{topicPartitionOffsets: top}
func newTracker() *tracker {
return &tracker{topicPartitionOffsets: make(map[string]map[int32]int64)}
}

func (t *Tracker) Track(m *sarama.ConsumerMessage) {
func (t *tracker) Track(m *sarama.ConsumerMessage) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -27,20 +26,16 @@ func (t *Tracker) Track(m *sarama.ConsumerMessage) {
t.topicPartitionOffsets[m.Topic][m.Partition] = m.Offset
}

func (t *Tracker) CurrentOffsets() map[string]map[int32]int64 {
func (t *tracker) currentOffsets() map[string]map[int32]int64 {
t.mu.RLock()
defer t.mu.RUnlock()

return t.topicPartitionOffsets
}

func (t *Tracker) Reset(topic string, partition int32) {
func (t *tracker) drop(topic string, partition int32) {
t.mu.Lock()
defer t.mu.Unlock()

if _, ok := t.topicPartitionOffsets[topic]; !ok {
t.topicPartitionOffsets[topic] = make(map[int32]int64)
}

t.topicPartitionOffsets[topic][partition] = 0
delete(t.topicPartitionOffsets[topic], partition)
}

0 comments on commit 3243be9

Please sign in to comment.