generated from vmyroslav/go-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
128 lines (105 loc) · 3.06 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package saramahealth
import (
"context"
"io"
"log/slog"
"github.com/IBM/sarama"
"github.com/pkg/errors"
)
type HealthMonitor interface {
Track(ctx context.Context, msg *sarama.ConsumerMessage)
Release(ctx context.Context, topic string, partition int32)
Healthy(ctx context.Context) (bool, error)
}
type State struct {
stateMap map[string]map[int32]int64
}
type HealthChecker struct {
topics []string
client kafkaClient
tracker *tracker
prevState *State
logger *slog.Logger
}
func NewHealthChecker(cfg Config) (*HealthChecker, error) {
client, err := sarama.NewClient(cfg.Brokers, cfg.SaramaConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create sarama client")
}
return &HealthChecker{
client: client,
tracker: newTracker(),
topics: cfg.Topics,
prevState: nil,
logger: slog.New(slog.NewJSONHandler(io.Discard, nil)),
}, nil
}
func (h *HealthChecker) Healthy(_ context.Context) (bool, error) {
// get the latest offset for each topic
latestStateMap := make(map[string]map[int32]int64)
for _, topic := range h.topics {
latestOffset, err := h.getLatestOffset(topic)
if err != nil {
return false, err
}
latestStateMap[topic] = latestOffset
}
currentState := h.tracker.currentOffsets()
if h.prevState == nil {
h.prevState = &State{stateMap: currentState}
return true, nil // return true if this is the first time we check the state
}
// check if the current state equals to the latest state
// return true only if the current state equals to the latest state
// otherwise go to the next check
allMatch := true
outer:
for topic := range currentState {
for partition := range currentState[topic] {
if currentState[topic][partition] != latestStateMap[topic][partition] {
allMatch = false
break outer
}
}
}
if allMatch {
return true, nil // return true if the current state equals to the latest state
}
// check if the current state is greater than the previous state
for topic := range currentState {
for partition := range currentState[topic] {
if currentState[topic][partition] <= h.prevState.stateMap[topic][partition] {
return false, nil
}
}
}
return true, nil
}
func (h *HealthChecker) Track(_ context.Context, msg *sarama.ConsumerMessage) {
h.tracker.track(msg)
}
func (h *HealthChecker) Release(_ context.Context, topic string, partition int32) {
h.tracker.drop(topic, partition)
}
func (h *HealthChecker) SetLogger(l *slog.Logger) {
h.logger = l
}
func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) {
offsets := make(map[int32]int64)
partitions, err := h.client.Partitions(topic)
if err != nil {
return offsets, err
}
for _, partition := range partitions {
offset, err := h.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return offsets, err
}
offsets[partition] = offset - 1 // subtract 1 to get the latest offset, not the next offset
}
return offsets, nil
}
type kafkaClient interface {
GetOffset(topic string, partition int32, time int64) (int64, error)
Partitions(topic string) ([]int32, error)
}