diff --git a/examples/simple/main.go b/examples/simple/main.go index 5553295..f3db05f 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -3,20 +3,28 @@ package main import ( "context" "fmt" - "github.com/IBM/sarama" - saramahealth "github.com/vmyroslav/sarama-health" "log" "net/http" + + "github.com/IBM/sarama" + saramahealth "github.com/vmyroslav/sarama-health" +) + +var ( + topics = []string{"my-topic"} + brokers = []string{"localhost:9092"} + consumerGroup = "my-consumer-group" ) func main() { + ctx := context.Background() + config := sarama.NewConfig() config.Version = sarama.V3_0_1_0 - config.Consumer.Return.Errors = true - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Return.Errors = false // Create a new consumer group - group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-consumer-group", config) + group, err := sarama.NewConsumerGroup(brokers, consumerGroup, config) if err != nil { log.Panicf("Error creating consumer group: %v", err) } @@ -26,26 +34,22 @@ func main() { } }() - healhMonitor, err := saramahealth.NewHealthChecker(saramahealth.Config{ - Brokers: []string{"localhost:9092"}, - Topics: []string{"my-topic"}, + healthMonitor, err := saramahealth.NewHealthChecker(saramahealth.Config{ + Brokers: brokers, + Topics: topics, SaramaConfig: config, }) - if err != nil { log.Panicf("Error creating health monitor: %v", err) } // Consumer group handler - ctx := context.Background() - consumer := Consumer{ - healthMonitor: healhMonitor, - } + consumer := Consumer{healthMonitor: healthMonitor} // Consume messages go func() { for { - err := group.Consume(ctx, []string{"my-topic"}, &consumer) + err := group.Consume(ctx, topics, &consumer) if err != nil { log.Printf("Error from consumer: %v", err) } @@ -58,11 +62,12 @@ func main() { // Start HTTP server http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - isOk, err := healhMonitor.Healthy(context.Background()) + isOk, err := healthMonitor.Healthy(context.Background()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } + if !isOk { http.Error(w, "Not OK", http.StatusServiceUnavailable) return @@ -84,7 +89,7 @@ func main() { // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool - healthMonitor *saramahealth.HealthCheckerImpl + healthMonitor *saramahealth.HealthChecker } // Setup is run at the beginning of a new session, before ConsumeClaim @@ -104,21 +109,19 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai for { select { case <-ctx.Done(): - println("done") + fmt.Printf("session closed: %v\n", ctx.Err()) consumer.healthMonitor.Release(ctx, claim.Topic(), claim.Partition()) + return nil case message, ok := <-claim.Messages(): if !ok { return nil } - err := consumer.healthMonitor.Track(ctx, message) - if err != nil { - println(err.Error()) - } + consumer.healthMonitor.Track(ctx, message) - if string(message.Value) == "fail" { + if string(message.Value) == "fail" { // Simulate a failure return fmt.Errorf("error") } diff --git a/check.go b/monitor.go similarity index 56% rename from check.go rename to monitor.go index abc6656..31c8ddd 100644 --- a/check.go +++ b/monitor.go @@ -2,14 +2,16 @@ package saramahealth import ( "context" + "io" + "log/slog" + "github.com/IBM/sarama" "github.com/pkg/errors" - "log/slog" ) type HealthMonitor interface { - Track(ctx context.Context, msg *sarama.ConsumerMessage) error - Release(ctx context.Context, topic string, partition int32) error + Track(ctx context.Context, msg *sarama.ConsumerMessage) + Release(ctx context.Context, topic string, partition int32) Healthy(ctx context.Context) (bool, error) } @@ -17,69 +19,65 @@ type State struct { stateMap map[string]map[int32]int64 } -type HealthCheckerImpl struct { - topics []string - client sarama.Client - tracker *Tracker - latestState *State - prevState *State - logger *slog.Logger +type HealthChecker struct { + topics []string + client sarama.Client + tracker *tracker + prevState *State + logger *slog.Logger } -func NewHealthChecker(cfg Config) (*HealthCheckerImpl, error) { +func NewHealthChecker(cfg Config) (*HealthChecker, error) { client, err := sarama.NewClient(cfg.Brokers, cfg.SaramaConfig) if err != nil { return nil, errors.Wrap(err, "failed to create sarama client") } - return &HealthCheckerImpl{ + return &HealthChecker{ client: client, - tracker: NewTracker(), + tracker: newTracker(), topics: cfg.Topics, prevState: nil, + logger: slog.New(slog.NewJSONHandler(io.Discard, nil)), }, nil } -func (h *HealthCheckerImpl) Healthy(ctx context.Context) (bool, error) { - latestState := &State{stateMap: make(map[string]map[int32]int64)} - +func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) { + // get the latest offset for each topic + latestStateMap := make(map[string]map[int32]int64) for _, topic := range h.topics { latestOffset, err := h.getLatestOffset(topic) if err != nil { return false, err } - latestState.stateMap[topic] = latestOffset + latestStateMap[topic] = latestOffset } - currentState := h.tracker.CurrentOffsets() + currentState := h.tracker.currentOffsets() if h.prevState == nil { h.prevState = &State{stateMap: currentState} - return true, nil + return true, nil // return true if this is the first time we check the state } // check if the current state equals to the latest state // return true only if the current state equals to the latest state // otherwise go to the next check - var topicRes bool for topic := range currentState { for partition := range currentState[topic] { - if currentState[topic][partition] == latestState.stateMap[topic][partition] { + if currentState[topic][partition] == latestStateMap[topic][partition] { topicRes = true } } } if topicRes { - return true, nil + return true, nil // return true if the current state equals to the latest state } // check if the current state is greater than the previous state - // return true only if the current state is greater than the previous state for all topics and partitions - // otherwise return false - for topic := range currentState { for partition := range currentState[topic] { if currentState[topic][partition] <= h.prevState.stateMap[topic][partition] { @@ -91,20 +89,16 @@ func (h *HealthCheckerImpl) Healthy(ctx context.Context) (bool, error) { return true, nil } -func (h *HealthCheckerImpl) Track(ctx context.Context, msg *sarama.ConsumerMessage) error { +func (h *HealthChecker) Track(_ context.Context, msg *sarama.ConsumerMessage) { h.tracker.Track(msg) - - return nil } -func (h *HealthCheckerImpl) Release(ctx context.Context, topic string, partition int32) error { - h.tracker.Reset(topic, partition) - - return nil +func (h *HealthChecker) Release(_ context.Context, topic string, partition int32) { + h.tracker.drop(topic, partition) } -func (h *HealthCheckerImpl) getLatestOffset(topic string) (map[int32]int64, error) { - var offsets = make(map[int32]int64) +func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) { + offsets := make(map[int32]int64) partitions, err := h.client.Partitions(topic) if err != nil { diff --git a/state.go b/state.go deleted file mode 100644 index 7f5cacc..0000000 --- a/state.go +++ /dev/null @@ -1 +0,0 @@ -package saramahealth diff --git a/tracker.go b/tracker.go index c07b7fe..550ff58 100644 --- a/tracker.go +++ b/tracker.go @@ -1,22 +1,21 @@ package saramahealth import ( - "github.com/IBM/sarama" "sync" + + "github.com/IBM/sarama" ) -type Tracker struct { - topicPartitionOffsets map[string]map[int32]int64 //todo: change to sync.Map +type tracker struct { + topicPartitionOffsets map[string]map[int32]int64 mu sync.RWMutex } -func NewTracker() *Tracker { - top := make(map[string]map[int32]int64) - - return &Tracker{topicPartitionOffsets: top} +func newTracker() *tracker { + return &tracker{topicPartitionOffsets: make(map[string]map[int32]int64)} } -func (t *Tracker) Track(m *sarama.ConsumerMessage) { +func (t *tracker) Track(m *sarama.ConsumerMessage) { t.mu.Lock() defer t.mu.Unlock() @@ -27,20 +26,16 @@ func (t *Tracker) Track(m *sarama.ConsumerMessage) { t.topicPartitionOffsets[m.Topic][m.Partition] = m.Offset } -func (t *Tracker) CurrentOffsets() map[string]map[int32]int64 { +func (t *tracker) currentOffsets() map[string]map[int32]int64 { t.mu.RLock() defer t.mu.RUnlock() return t.topicPartitionOffsets } -func (t *Tracker) Reset(topic string, partition int32) { +func (t *tracker) drop(topic string, partition int32) { t.mu.Lock() defer t.mu.Unlock() - if _, ok := t.topicPartitionOffsets[topic]; !ok { - t.topicPartitionOffsets[topic] = make(map[int32]int64) - } - - t.topicPartitionOffsets[topic][partition] = 0 + delete(t.topicPartitionOffsets[topic], partition) }