From 66a61ca082277cedf5dd7c2a54f64e6ae21d952e Mon Sep 17 00:00:00 2001 From: Amir Malka Date: Wed, 7 Aug 2024 11:22:14 +0300 Subject: [PATCH] shared key subscription support Signed-off-by: Amir Malka --- pulsar/connector/client.go | 3 ++- pulsar/connector/consumer.go | 20 ++++++++++++++++++-- pulsar/connector/producer.go | 8 ++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/pulsar/connector/client.go b/pulsar/connector/client.go index fec5bda..e0cce90 100644 --- a/pulsar/connector/client.go +++ b/pulsar/connector/client.go @@ -51,8 +51,9 @@ func (p *pulsarClient) NewProducer(createProducerOption ...CreateProducerOption) return newProducer(p, createProducerOption...) } +// NewConsumer creates a new consumer. When subscription type is not provided, it defaults to Shared func (p *pulsarClient) NewConsumer(createConsumerOpts ...CreateConsumerOption) (Consumer, error) { - return newSharedConsumer(p, createConsumerOpts...) + return newConsumer(p, createConsumerOpts...) } func (p *pulsarClient) ping() error { diff --git a/pulsar/connector/consumer.go b/pulsar/connector/consumer.go index f53f609..43e07ca 100644 --- a/pulsar/connector/consumer.go +++ b/pulsar/connector/consumer.go @@ -110,6 +110,7 @@ type createConsumerOptions struct { Topics []TopicName FullTopics []TopicName SubscriptionName string + subscriptionType *pulsar.SubscriptionType MaxDeliveryAttempts uint32 dlqNamespace string RedeliveryDelay time.Duration @@ -203,6 +204,12 @@ func WithDefaultBackoffPolicy() CreateConsumerOption { } } +func WithSubscriptionType(subscriptionType pulsar.SubscriptionType) CreateConsumerOption { + return func(o *createConsumerOptions) { + o.subscriptionType = &subscriptionType + } +} + // maxDeliveryAttempts before sending to DLQ - 0 means no DLQ // by default, maxDeliveryAttempts is 5 func WithDLQ(maxDeliveryAttempts uint32) CreateConsumerOption { @@ -241,7 +248,7 @@ func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsum } } -func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) { +func newConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) { opts := &createConsumerOptions{} opts.defaults(pulsarClient.GetConfig()) for _, o := range createConsumerOpts { @@ -277,11 +284,20 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer } dlq = NewDlq(opts.Tenant, opts.dlqNamespace, topicName, opts.MaxDeliveryAttempts, opts.retryTopic) } + + var subscriptionType pulsar.SubscriptionType + if opts.subscriptionType != nil { + subscriptionType = *opts.subscriptionType + } else { + // default to shared subscription type if not specified + subscriptionType = pulsar.Shared + } + pulsarConsumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{ Topic: topic, Topics: topics, SubscriptionName: opts.SubscriptionName, - Type: pulsar.Shared, + Type: subscriptionType, MessageChannel: opts.MessageChannel, DLQ: dlq, EnableDefaultNackBackoffPolicy: opts.DefaultBackoffPolicy, diff --git a/pulsar/connector/producer.go b/pulsar/connector/producer.go index 54aecb0..5ca3783 100644 --- a/pulsar/connector/producer.go +++ b/pulsar/connector/producer.go @@ -92,6 +92,7 @@ type produceMessageOptions struct { pulsarClient Client ctx context.Context properties map[string]string + key string } type ProduceMessageOption func(*produceMessageOptions) @@ -108,6 +109,12 @@ func WithMessageToSend(msgToSend interface{}) ProduceMessageOption { } } +func WithMessageKey(key string) ProduceMessageOption { + return func(o *produceMessageOptions) { + o.key = key + } +} + func WithPulsarClient(pulsarClient Client) ProduceMessageOption { return func(o *produceMessageOptions) { o.pulsarClient = pulsarClient @@ -133,6 +140,7 @@ func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOpti msg := pulsar.ProducerMessage{ Payload: msgBytes, Properties: opts.properties, + Key: opts.key, } if err != nil {