-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.go
82 lines (70 loc) · 1.97 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package kafkaservice
import (
"context"
"fmt"
"os"
kafka "github.com/segmentio/kafka-go"
)
type KAFKA_TOPIC string
const (
ISSUED_TOKEN KAFKA_TOPIC = "issued-token"
NEW_USER_CREATED KAFKA_TOPIC = "new-user-created"
SIDE_TOPIC_FOR_RETRY KAFKA_TOPIC = "side-topic-for-retry"
)
func getBrokers(count int) []string {
broker := os.Getenv(fmt.Sprintf("KAFKA_HOST_%d", count))
fmt.Print("KAFKA_HOST: ", fmt.Sprintf("KAFKA_HOST_%d", count), "broker:", broker)
if broker == "" {
return []string{}
}
return append([]string{broker}, getBrokers(count+1)...)
}
var tokenWriter *kafka.Writer
var newUserWriter *kafka.Writer
var sideTopicWriter *kafka.Writer
func Produce(ctx context.Context, topic KAFKA_TOPIC, key []byte, value []byte) {
var kafkaWriter *kafka.Writer
switch topic {
case ISSUED_TOKEN:
if tokenWriter == nil {
tokenWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: getBrokers(1),
Topic: string(ISSUED_TOKEN),
Balancer: &kafka.Hash{},
})
}
kafkaWriter = tokenWriter
case NEW_USER_CREATED:
if newUserWriter == nil {
newUserWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: getBrokers(1),
Topic: string(NEW_USER_CREATED),
Balancer: &kafka.RoundRobin{},
})
}
kafkaWriter = newUserWriter
case SIDE_TOPIC_FOR_RETRY:
if sideTopicWriter == nil {
sideTopicWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: getBrokers(1),
Topic: string(SIDE_TOPIC_FOR_RETRY),
Balancer: &kafka.RoundRobin{},
})
}
kafkaWriter = newUserWriter
}
if kafkaWriter == nil {
fmt.Printf("Invalid topic: %s", topic)
return
}
fmt.Print("list of brokers:", getBrokers(1))
err := kafkaWriter.WriteMessages(ctx, kafka.Message{
// create an arbitrary message payload for the value
Value: value,
})
if err != nil {
fmt.Print("could not write message " + err.Error())
}
// log a confirmation once the message is written
fmt.Printf("\n\n::published \ntopic: %s\nkey:%s\nvalue:%s", topic, key, value)
}