Skip to content

Commit

Permalink
allow creating consumer with name
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka committed Oct 1, 2024
1 parent bcb311f commit 1afdb49
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pulsar/connector/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (c *consumer) applyReconsumeDurationProperties(msg pulsar.Message) {
}

type createConsumerOptions struct {
consumerName string
Topic TopicName
Topics []TopicName
FullTopics []TopicName
Expand Down Expand Up @@ -248,6 +249,12 @@ func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsum
}
}

func WithName(name string) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.consumerName = name
}
}

func newConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) {
opts := &createConsumerOptions{}
opts.defaults(pulsarClient.GetConfig())
Expand Down Expand Up @@ -294,6 +301,7 @@ func newConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumerOption
}

pulsarConsumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Name: opts.consumerName,
Topic: topic,
Topics: topics,
SubscriptionName: opts.SubscriptionName,
Expand Down

0 comments on commit 1afdb49

Please sign in to comment.