Skip to content

Commit

Permalink
Fix another potential panic.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 14, 2023
1 parent d489361 commit 11be53f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
2 changes: 1 addition & 1 deletion processes/consumer/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func CommitOffset(ctx context.Context, topic string, partitionsToOffset map[stri
for _, msgs := range partitionsToOffset {
for _, msg := range msgs {
if msg.KafkaMsg != nil {
err = topicToConsumer[topic].CommitMessages(ctx, *msg.KafkaMsg)
err = topicToConsumer.Get(topic).CommitMessages(ctx, *msg.KafkaMsg)
if err != nil {
return err
}
Expand Down
34 changes: 30 additions & 4 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,37 @@ import (
"github.com/segmentio/kafka-go"
)

var topicToConsumer map[string]kafkalib.Consumer
var topicToConsumer *TopicToConsumer

func NewTopicToConsumer() *TopicToConsumer {
return &TopicToConsumer{
topicToConsumer: make(map[string]kafkalib.Consumer),
}
}

type TopicToConsumer struct {
topicToConsumer map[string]kafkalib.Consumer
sync.RWMutex
}

func (t *TopicToConsumer) Add(topic string, consumer kafkalib.Consumer) {
t.Lock()
defer t.Unlock()
t.topicToConsumer[topic] = consumer
return
}

func (t *TopicToConsumer) Get(topic string) kafkalib.Consumer {
t.RLock()
defer t.RUnlock()
return t.topicToConsumer[topic]
}

// SetKafkaConsumer - This is used for tests.
func SetKafkaConsumer(_topicToConsumer map[string]kafkalib.Consumer) {
topicToConsumer = _topicToConsumer
topicToConsumer = &TopicToConsumer{
topicToConsumer: _topicToConsumer,
}
}

func StartConsumer(ctx context.Context) {
Expand Down Expand Up @@ -60,7 +86,7 @@ func StartConsumer(ctx context.Context) {
}

tcFmtMap := NewTcFmtMap()
topicToConsumer = make(map[string]kafkalib.Consumer)
topicToConsumer = NewTopicToConsumer()
var topics []string
for _, topicConfig := range settings.Config.Kafka.TopicConfigs {
tcFmtMap.Add(topicConfig.Topic, TopicConfigFormatter{
Expand Down Expand Up @@ -88,7 +114,7 @@ func StartConsumer(ctx context.Context) {

kafkaCfg.Brokers = brokers
kafkaConsumer := kafka.NewReader(kafkaCfg)
topicToConsumer[topic] = kafkaConsumer
topicToConsumer.Add(topic, kafkaConsumer)
for {
kafkaMsg, err := kafkaConsumer.FetchMessage(ctx)
msg := artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)
Expand Down

0 comments on commit 11be53f

Please sign in to comment.