diff --git a/examples/simple/Dockerfile b/examples/simple/Dockerfile index aa35286..f8416b2 100644 --- a/examples/simple/Dockerfile +++ b/examples/simple/Dockerfile @@ -1,26 +1,16 @@ -# Start from the latest golang base image +# Dockerfile FROM golang:1.22 -# Add Maintainer Info -LABEL maintainer="myroslavvivcharyk" - -# Set the Current Working Directory inside the container WORKDIR /app -# Copy go mod and sum files COPY go.mod go.sum ./ - -# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed RUN go mod download -# Copy the source from the current directory to the Working Directory inside the container COPY . . -# Build the Go app -RUN go build -o main . +RUN go build -o main ./examples/simple +RUN chmod +x main -# Expose port 8080 to the outside world EXPOSE 8080 -# Command to run the executable CMD ["./main"] \ No newline at end of file diff --git a/examples/simple/docker-compose.yml b/examples/simple/docker-compose.yml index 8446d52..f7f34db 100644 --- a/examples/simple/docker-compose.yml +++ b/examples/simple/docker-compose.yml @@ -3,6 +3,64 @@ version: '3.8' name: sarama-health-simple services: + consumer_1: + build: + context: ./../.. + dockerfile: ./examples/simple/Dockerfile + environment: + SARAMA_DEMO_BROKER: kafka:29092 + SARAMA_DEMO_TOPIC: health-check-topic + ports: + - "8083:8080" + depends_on: + - kafka + networks: + - sarama-health-simple-network + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8080/health" ] + interval: 10s + timeout: 5s + retries: 2 + + consumer_2: + build: + context: ./../.. + dockerfile: ./examples/simple/Dockerfile + environment: + SARAMA_DEMO_BROKER: kafka:29092 + SARAMA_DEMO_TOPIC: health-check-topic + ports: + - "8084:8080" + depends_on: + - kafka + networks: + - sarama-health-simple-network + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8080/health" ] + interval: 10s + timeout: 5s + retries: 2 + + consumer_3: + build: + context: ./../.. + dockerfile: ./examples/simple/Dockerfile + environment: + SARAMA_DEMO_BROKER: kafka:29092 + SARAMA_DEMO_TOPIC: health-check-topic + ports: + - "8085:8080" + depends_on: + - kafka + networks: + - sarama-health-simple-network + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8080/health" ] + interval: 10s + timeout: 5s + retries: 2 + restart: on-failure + kafka: image: confluentinc/confluent-local:7.6.0 hostname: broker @@ -16,14 +74,9 @@ services: KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_NUM_PARTITIONS: '${KAFKA_NUM_PARTITIONS:-3}' - healthcheck: - test: [ "CMD", "nc", "-vz", "broker", "29092" ] - interval: 3s - timeout: 5s - retries: 3 networks: - - sarame-health-simple-network + - sarama-health-simple-network networks: - sarame-health-simple-network: + sarama-health-simple-network: driver: bridge \ No newline at end of file diff --git a/examples/simple/failure.txt b/examples/simple/failure.txt new file mode 100644 index 0000000..e69de29 diff --git a/examples/simple/main.go b/examples/simple/main.go index f3db05f..de19869 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -4,16 +4,21 @@ import ( "context" "fmt" "log" + "log/slog" "net/http" + "os" + "strings" + "time" "github.com/IBM/sarama" saramahealth "github.com/vmyroslav/sarama-health" ) var ( - topics = []string{"my-topic"} + topics = []string{"health-check-topic"} brokers = []string{"localhost:9092"} consumerGroup = "my-consumer-group" + httpPort = 8080 ) func main() { @@ -22,6 +27,22 @@ func main() { config := sarama.NewConfig() config.Version = sarama.V3_0_1_0 config.Consumer.Return.Errors = false + config.Consumer.MaxProcessingTime = 20 * time.Millisecond + + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + AddSource: false, + Level: slog.LevelDebug, + })) + + topic := os.Getenv("SARAMA_DEMO_TOPIC") + if topic != "" { + topics = []string{topic} + } + + broker := os.Getenv("SARAMA_DEMO_BROKER") + if broker != "" { + brokers = []string{broker} + } // Create a new consumer group group, err := sarama.NewConsumerGroup(brokers, consumerGroup, config) @@ -43,8 +64,10 @@ func main() { log.Panicf("Error creating health monitor: %v", err) } + healthMonitor.SetLogger(logger) + // Consumer group handler - consumer := Consumer{healthMonitor: healthMonitor} + consumer := Consumer{healthMonitor: healthMonitor, l: logger} // Consume messages go func() { @@ -62,22 +85,27 @@ func main() { // Start HTTP server http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + logger.InfoContext(ctx, "Health check request received") isOk, err := healthMonitor.Healthy(context.Background()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) + logger.ErrorContext(ctx, "Error checking health", err) + return } if !isOk { http.Error(w, "Not OK", http.StatusServiceUnavailable) + + logger.ErrorContext(ctx, "Health check failed") return } - fmt.Fprintln(w, "OK") + _, _ = fmt.Fprintln(w, "OK") }) go func() { - if err := http.ListenAndServe(":8083", nil); err != nil { + if err := http.ListenAndServe(fmt.Sprintf(":%d", httpPort), nil); err != nil { log.Fatalf("Failed to start HTTP server: %v", err) } }() @@ -90,28 +118,42 @@ func main() { type Consumer struct { ready chan bool healthMonitor *saramahealth.HealthChecker + l *slog.Logger } // Setup is run at the beginning of a new session, before ConsumeClaim -func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { +func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { + c.l.Debug("Consumer group is ready!...") + return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited -func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { +func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + c.l.Debug("Consumer closing down!...") + return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). -func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { +func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { ctx := session.Context() for { select { case <-ctx.Done(): - fmt.Printf("session closed: %v\n", ctx.Err()) - - consumer.healthMonitor.Release(ctx, claim.Topic(), claim.Partition()) + c.l.DebugContext( + ctx, + "Session closed", + "topic", + claim.Topic(), + "partition", + claim.Partition(), + "error", + ctx.Err(), + ) + + c.healthMonitor.Release(ctx, claim.Topic(), claim.Partition()) return nil case message, ok := <-claim.Messages(): @@ -119,7 +161,31 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai return nil } - consumer.healthMonitor.Track(ctx, message) + c.l.DebugContext( + ctx, + "Message claimed", + "topic", + message.Topic, + "partition", + message.Partition, + "offset", + message.Offset, + ) + + c.healthMonitor.Track(ctx, message) + + // Read the file + data, err := os.ReadFile("./examples/simple/failure.txt") + if err != nil { + log.Println("Error reading file:", err) + continue + } + + // Check if the file contains 'true' + if strings.TrimSpace(string(data)) == "fail" { + log.Println("Simulating a failure") + time.Sleep(40 * time.Second) + } if string(message.Value) == "fail" { // Simulate a failure return fmt.Errorf("error") diff --git a/monitor.go b/monitor.go index 31c8ddd..36a07ed 100644 --- a/monitor.go +++ b/monitor.go @@ -21,7 +21,7 @@ type State struct { type HealthChecker struct { topics []string - client sarama.Client + client kafkaClient tracker *tracker prevState *State logger *slog.Logger @@ -42,7 +42,7 @@ func NewHealthChecker(cfg Config) (*HealthChecker, error) { }, nil } -func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) { +func (h *HealthChecker) Healthy(_ context.Context) (bool, error) { // get the latest offset for each topic latestStateMap := make(map[string]map[int32]int64) for _, topic := range h.topics { @@ -64,16 +64,18 @@ func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) { // check if the current state equals to the latest state // return true only if the current state equals to the latest state // otherwise go to the next check - var topicRes bool + allMatch := true +outer: for topic := range currentState { for partition := range currentState[topic] { - if currentState[topic][partition] == latestStateMap[topic][partition] { - topicRes = true + if currentState[topic][partition] != latestStateMap[topic][partition] { + allMatch = false + break outer } } } - if topicRes { + if allMatch { return true, nil // return true if the current state equals to the latest state } @@ -90,13 +92,17 @@ func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) { } func (h *HealthChecker) Track(_ context.Context, msg *sarama.ConsumerMessage) { - h.tracker.Track(msg) + h.tracker.track(msg) } func (h *HealthChecker) Release(_ context.Context, topic string, partition int32) { h.tracker.drop(topic, partition) } +func (h *HealthChecker) SetLogger(l *slog.Logger) { + h.logger = l +} + func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) { offsets := make(map[int32]int64) @@ -115,3 +121,8 @@ func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) { return offsets, nil } + +type kafkaClient interface { + GetOffset(topic string, partition int32, time int64) (int64, error) + Partitions(topic string) ([]int32, error) +} diff --git a/tracker.go b/tracker.go index 550ff58..cce274f 100644 --- a/tracker.go +++ b/tracker.go @@ -15,7 +15,7 @@ func newTracker() *tracker { return &tracker{topicPartitionOffsets: make(map[string]map[int32]int64)} } -func (t *tracker) Track(m *sarama.ConsumerMessage) { +func (t *tracker) track(m *sarama.ConsumerMessage) { t.mu.Lock() defer t.mu.Unlock()