Skip to content

Commit

Permalink
shared key subscription support
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka committed Aug 7, 2024
1 parent 99ad293 commit 66a61ca
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pulsar/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func (p *pulsarClient) NewProducer(createProducerOption ...CreateProducerOption)
return newProducer(p, createProducerOption...)
}

// NewConsumer creates a new consumer. When subscription type is not provided, it defaults to Shared
func (p *pulsarClient) NewConsumer(createConsumerOpts ...CreateConsumerOption) (Consumer, error) {
return newSharedConsumer(p, createConsumerOpts...)
return newConsumer(p, createConsumerOpts...)
}

func (p *pulsarClient) ping() error {
Expand Down
20 changes: 18 additions & 2 deletions pulsar/connector/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type createConsumerOptions struct {
Topics []TopicName
FullTopics []TopicName
SubscriptionName string
subscriptionType *pulsar.SubscriptionType
MaxDeliveryAttempts uint32
dlqNamespace string
RedeliveryDelay time.Duration
Expand Down Expand Up @@ -203,6 +204,12 @@ func WithDefaultBackoffPolicy() CreateConsumerOption {
}
}

func WithSubscriptionType(subscriptionType pulsar.SubscriptionType) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.subscriptionType = &subscriptionType
}
}

// maxDeliveryAttempts before sending to DLQ - 0 means no DLQ
// by default, maxDeliveryAttempts is 5
func WithDLQ(maxDeliveryAttempts uint32) CreateConsumerOption {
Expand Down Expand Up @@ -241,7 +248,7 @@ func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsum
}
}

func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) {
func newConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) {
opts := &createConsumerOptions{}
opts.defaults(pulsarClient.GetConfig())
for _, o := range createConsumerOpts {
Expand Down Expand Up @@ -277,11 +284,20 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer
}
dlq = NewDlq(opts.Tenant, opts.dlqNamespace, topicName, opts.MaxDeliveryAttempts, opts.retryTopic)
}

var subscriptionType pulsar.SubscriptionType
if opts.subscriptionType != nil {
subscriptionType = *opts.subscriptionType
} else {
// default to shared subscription type if not specified
subscriptionType = pulsar.Shared
}

pulsarConsumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
Topics: topics,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
Type: subscriptionType,
MessageChannel: opts.MessageChannel,
DLQ: dlq,
EnableDefaultNackBackoffPolicy: opts.DefaultBackoffPolicy,
Expand Down
8 changes: 8 additions & 0 deletions pulsar/connector/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type produceMessageOptions struct {
pulsarClient Client
ctx context.Context
properties map[string]string
key string
}

type ProduceMessageOption func(*produceMessageOptions)
Expand All @@ -108,6 +109,12 @@ func WithMessageToSend(msgToSend interface{}) ProduceMessageOption {
}
}

func WithMessageKey(key string) ProduceMessageOption {
return func(o *produceMessageOptions) {
o.key = key
}
}

func WithPulsarClient(pulsarClient Client) ProduceMessageOption {
return func(o *produceMessageOptions) {
o.pulsarClient = pulsarClient
Expand All @@ -133,6 +140,7 @@ func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOpti
msg := pulsar.ProducerMessage{
Payload: msgBytes,
Properties: opts.properties,
Key: opts.key,
}

if err != nil {
Expand Down

0 comments on commit 66a61ca

Please sign in to comment.