Skip to content

Commit

Permalink
Added basic example
Browse files Browse the repository at this point in the history
  • Loading branch information
vmyroslav committed May 5, 2024
1 parent 3243be9 commit ef31070
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 39 deletions.
16 changes: 3 additions & 13 deletions examples/simple/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
# Start from the latest golang base image
# Dockerfile
FROM golang:1.22

# Add Maintainer Info
LABEL maintainer="myroslavvivcharyk"

# Set the Current Working Directory inside the container
WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./

# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed
RUN go mod download

# Copy the source from the current directory to the Working Directory inside the container
COPY . .

# Build the Go app
RUN go build -o main .
RUN go build -o main ./examples/simple
RUN chmod +x main

# Expose port 8080 to the outside world
EXPOSE 8080

# Command to run the executable
CMD ["./main"]
67 changes: 60 additions & 7 deletions examples/simple/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,64 @@ version: '3.8'
name: sarama-health-simple

services:
consumer_1:
build:
context: ./../..
dockerfile: ./examples/simple/Dockerfile
environment:
SARAMA_DEMO_BROKER: kafka:29092
SARAMA_DEMO_TOPIC: health-check-topic
ports:
- "8083:8080"
depends_on:
- kafka
networks:
- sarama-health-simple-network
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8080/health" ]
interval: 10s
timeout: 5s
retries: 2

consumer_2:
build:
context: ./../..
dockerfile: ./examples/simple/Dockerfile
environment:
SARAMA_DEMO_BROKER: kafka:29092
SARAMA_DEMO_TOPIC: health-check-topic
ports:
- "8084:8080"
depends_on:
- kafka
networks:
- sarama-health-simple-network
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8080/health" ]
interval: 10s
timeout: 5s
retries: 2

consumer_3:
build:
context: ./../..
dockerfile: ./examples/simple/Dockerfile
environment:
SARAMA_DEMO_BROKER: kafka:29092
SARAMA_DEMO_TOPIC: health-check-topic
ports:
- "8085:8080"
depends_on:
- kafka
networks:
- sarama-health-simple-network
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8080/health" ]
interval: 10s
timeout: 5s
retries: 2
restart: on-failure

kafka:
image: confluentinc/confluent-local:7.6.0
hostname: broker
Expand All @@ -16,14 +74,9 @@ services:
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_NUM_PARTITIONS: '${KAFKA_NUM_PARTITIONS:-3}'
healthcheck:
test: [ "CMD", "nc", "-vz", "broker", "29092" ]
interval: 3s
timeout: 5s
retries: 3
networks:
- sarame-health-simple-network
- sarama-health-simple-network

networks:
sarame-health-simple-network:
sarama-health-simple-network:
driver: bridge
Empty file added examples/simple/failure.txt
Empty file.
88 changes: 77 additions & 11 deletions examples/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import (
"context"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"strings"
"time"

"github.com/IBM/sarama"
saramahealth "github.com/vmyroslav/sarama-health"
)

var (
topics = []string{"my-topic"}
topics = []string{"health-check-topic"}
brokers = []string{"localhost:9092"}
consumerGroup = "my-consumer-group"
httpPort = 8080
)

func main() {
Expand All @@ -22,6 +27,22 @@ func main() {
config := sarama.NewConfig()
config.Version = sarama.V3_0_1_0
config.Consumer.Return.Errors = false
config.Consumer.MaxProcessingTime = 20 * time.Millisecond

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: false,
Level: slog.LevelDebug,
}))

topic := os.Getenv("SARAMA_DEMO_TOPIC")
if topic != "" {
topics = []string{topic}
}

broker := os.Getenv("SARAMA_DEMO_BROKER")
if broker != "" {
brokers = []string{broker}
}

// Create a new consumer group
group, err := sarama.NewConsumerGroup(brokers, consumerGroup, config)
Expand All @@ -43,8 +64,10 @@ func main() {
log.Panicf("Error creating health monitor: %v", err)
}

healthMonitor.SetLogger(logger)

// Consumer group handler
consumer := Consumer{healthMonitor: healthMonitor}
consumer := Consumer{healthMonitor: healthMonitor, l: logger}

// Consume messages
go func() {
Expand All @@ -62,22 +85,27 @@ func main() {

// Start HTTP server
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
logger.InfoContext(ctx, "Health check request received")
isOk, err := healthMonitor.Healthy(context.Background())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
logger.ErrorContext(ctx, "Error checking health", err)

return
}

if !isOk {
http.Error(w, "Not OK", http.StatusServiceUnavailable)

logger.ErrorContext(ctx, "Health check failed")
return
}

fmt.Fprintln(w, "OK")
_, _ = fmt.Fprintln(w, "OK")
})

go func() {
if err := http.ListenAndServe(":8083", nil); err != nil {
if err := http.ListenAndServe(fmt.Sprintf(":%d", httpPort), nil); err != nil {
log.Fatalf("Failed to start HTTP server: %v", err)
}
}()
Expand All @@ -90,36 +118,74 @@ func main() {
type Consumer struct {
ready chan bool
healthMonitor *saramahealth.HealthChecker
l *slog.Logger
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
c.l.Debug("Consumer group is ready!...")

return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
c.l.Debug("Consumer closing down!...")

return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()

for {
select {
case <-ctx.Done():
fmt.Printf("session closed: %v\n", ctx.Err())

consumer.healthMonitor.Release(ctx, claim.Topic(), claim.Partition())
c.l.DebugContext(
ctx,
"Session closed",
"topic",
claim.Topic(),
"partition",
claim.Partition(),
"error",
ctx.Err(),
)

c.healthMonitor.Release(ctx, claim.Topic(), claim.Partition())

return nil
case message, ok := <-claim.Messages():
if !ok {
return nil
}

consumer.healthMonitor.Track(ctx, message)
c.l.DebugContext(
ctx,
"Message claimed",
"topic",
message.Topic,
"partition",
message.Partition,
"offset",
message.Offset,
)

c.healthMonitor.Track(ctx, message)

// Read the file
data, err := os.ReadFile("./examples/simple/failure.txt")
if err != nil {
log.Println("Error reading file:", err)
continue
}

// Check if the file contains 'true'
if strings.TrimSpace(string(data)) == "fail" {
log.Println("Simulating a failure")
time.Sleep(40 * time.Second)
}

if string(message.Value) == "fail" { // Simulate a failure
return fmt.Errorf("error")
Expand Down
25 changes: 18 additions & 7 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type State struct {

type HealthChecker struct {

Check failure on line 22 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 64 pointer bytes could be 48 (govet)
topics []string
client sarama.Client
client kafkaClient
tracker *tracker
prevState *State
logger *slog.Logger
Expand All @@ -42,7 +42,7 @@ func NewHealthChecker(cfg Config) (*HealthChecker, error) {
}, nil
}

func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) {
func (h *HealthChecker) Healthy(_ context.Context) (bool, error) {

Check failure on line 45 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

calculated cyclomatic complexity for function Healthy is 11, max is 10 (cyclop)
// get the latest offset for each topic
latestStateMap := make(map[string]map[int32]int64)
for _, topic := range h.topics {

Check failure on line 48 in monitor.go

View workflow job for this annotation

GitHub Actions / lint

ranges should only be cuddled with assignments used in the iteration (wsl)
Expand All @@ -64,16 +64,18 @@ func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) {
// check if the current state equals to the latest state
// return true only if the current state equals to the latest state
// otherwise go to the next check
var topicRes bool
allMatch := true
outer:
for topic := range currentState {
for partition := range currentState[topic] {
if currentState[topic][partition] == latestStateMap[topic][partition] {
topicRes = true
if currentState[topic][partition] != latestStateMap[topic][partition] {
allMatch = false
break outer
}
}
}

if topicRes {
if allMatch {
return true, nil // return true if the current state equals to the latest state
}

Expand All @@ -90,13 +92,17 @@ func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) {
}

func (h *HealthChecker) Track(_ context.Context, msg *sarama.ConsumerMessage) {
h.tracker.Track(msg)
h.tracker.track(msg)
}

func (h *HealthChecker) Release(_ context.Context, topic string, partition int32) {
h.tracker.drop(topic, partition)
}

func (h *HealthChecker) SetLogger(l *slog.Logger) {
h.logger = l
}

func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) {
offsets := make(map[int32]int64)

Expand All @@ -115,3 +121,8 @@ func (h *HealthChecker) getLatestOffset(topic string) (map[int32]int64, error) {

return offsets, nil
}

type kafkaClient interface {
GetOffset(topic string, partition int32, time int64) (int64, error)
Partitions(topic string) ([]int32, error)
}
2 changes: 1 addition & 1 deletion tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newTracker() *tracker {
return &tracker{topicPartitionOffsets: make(map[string]map[int32]int64)}
}

func (t *tracker) Track(m *sarama.ConsumerMessage) {
func (t *tracker) track(m *sarama.ConsumerMessage) {
t.mu.Lock()
defer t.mu.Unlock()

Expand Down

0 comments on commit ef31070

Please sign in to comment.