diff --git a/go.mod b/go.mod index 8851d38..208d9e9 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( go.opentelemetry.io/otel v1.13.0 go.opentelemetry.io/otel/trace v1.13.0 go.uber.org/zap v1.24.0 + golang.org/x/sync v0.0.0-20201207232520-09787c993a3a ) require ( diff --git a/go.sum b/go.sum index 78e54f3..47394b0 100644 --- a/go.sum +++ b/go.sum @@ -380,6 +380,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pulsar/connector/basic_conn_test.go b/pulsar/connector/basic_conn_test.go index 2aea18f..357b05b 100644 --- a/pulsar/connector/basic_conn_test.go +++ b/pulsar/connector/basic_conn_test.go @@ -4,7 +4,6 @@ import ( "context" _ "embed" "fmt" - "sync" "time" "github.com/apache/pulsar-client-go/pulsar" @@ -78,43 +77,6 @@ func (suite *MainTestSuite) TestConsumerAndProducer() { suite.Equal(2, len(actualPayloads), "expected 2 messages") } -func (suite *MainTestSuite) TestReConsumerLater() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - producer, err := CreateTestProducer(ctx, suite.pulsarClient) - if err != nil { - suite.FailNow(err.Error(), "create producer") - } - if producer == nil { - suite.FailNow("producer is nil") - } - defer producer.Close() - //create consumer to get actual payloads - consumer, err := CreateTestConsumer(ctx, suite.pulsarClient) - if err != nil { - suite.FailNow(err.Error()) - } - defer consumer.Close() - - //produce - if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("hello workld")}); err != nil { - suite.FailNow(err.Error(), "send payload") - } - //consume - testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(time.Second*2)) - defer consumerCancel() - msg, err := consumer.Receive(testConsumerCtx) - if err != nil { - suite.FailNow(err.Error(), "receive payload") - } - //reconsume - consumer.ReconsumeLater(msg, time.Millisecond*5) - msg, err = consumer.Receive(testConsumerCtx) - if err != nil { - suite.FailNow(err.Error(), "reconsume payload") - } -} - // CreateTestProducer creates a producer func CreateTestProducer(ctx context.Context, client Client) (pulsar.Producer, error) { @@ -131,88 +93,27 @@ func CreateTestProducer(ctx context.Context, client Client) (pulsar.Producer, er return producer, err } -func CreateTestConsumer(ctx context.Context, client Client) (pulsar.Consumer, error) { - return client.NewConsumer(WithTopic(TestTopicName), +func CreateTestConsumer(ctx context.Context, client Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) { + createConsumerOpts = append(createConsumerOpts, WithTopic(TestTopicName), WithSubscriptionName(TestSubscriptionName), WithRedeliveryDelay(time.Duration(client.GetConfig().RedeliveryDelaySeconds)*time.Second), WithDLQ(uint32(client.GetConfig().MaxDeliveryAttempts)), - WithDefaultBackoffPolicy(), - WithRetryEnable(true), + WithDefaultBackoffPolicy()) + return client.NewConsumer( + createConsumerOpts..., ) } func CreateTestDlqConsumer(client Client) (pulsar.Consumer, error) { - return client.NewConsumer(WithTopic(TestTopicName), + return client.NewConsumer(WithTopic(TestTopicName+"-dlq"), WithSubscriptionName(TestSubscriptionName+"-dlq"), WithRedeliveryDelay(0), + WithNamespace("ca-messaging", "test-namespace-dlqs"), WithDLQ(0), ) } -func (suite *MainTestSuite) TestDLQ() { - //start tenant check - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - //create producer to input test payloads - pubsubCtx := utils.NewContextWithValues(ctx, "testConsumer") - producer, err := CreateTestProducer(pubsubCtx, suite.pulsarClient) - if err != nil { - suite.FailNow(err.Error(), "create producer") - } - if producer == nil { - suite.FailNow("producer is nil") - } - defer producer.Close() - //create consumer to get actual payloads - consumer, err := CreateTestConsumer(pubsubCtx, suite.pulsarClient) - if err != nil { - suite.FailNow(err.Error()) - } - defer consumer.Close() - dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient) - if err != nil { - suite.FailNow(err.Error()) - } - defer dlqConsumer.Close() - - testPayload := []byte("[{\"id\":\"1\",\"data\":\"Hello World\"},{\"id\":2,\"data\":\"Hello from the other World\"}]") - - //send test payloads - produceMessages(suite, ctx, producer, loadJson[[]TestPayloadImplInterface](testPayload)) - //sleep to allow redelivery - time.Sleep(time.Second * 5) - //create next stage consumer and dlq consumer - wg := sync.WaitGroup{} - wg.Add(2) - var actualPayloads map[string]TestPayloadImpl - go func() { - defer wg.Done() - // consume payloads for one second - actualPayloads = consumeMessages[TestPayloadImpl](suite, pubsubCtx, consumer, "consumer", 20) - //sleep to allow redelivery - // - }() - var dlqPayloads map[string]TestInvalidPayloadImpl - go func() { - defer wg.Done() - time.Sleep(time.Second * 10) - // consume payloads for one second - dlqPayloads = consumeMessages[TestInvalidPayloadImpl](suite, pubsubCtx, dlqConsumer, "dlqConsumer", 20) - }() - wg.Wait() - - suite.Equal(1, len(actualPayloads), "expected 1 msg in successful consumer") - suite.Contains(actualPayloads, "1", "expected msg with ID 1 in successful consumer") - - suite.Equal(1, len(dlqPayloads), "expected 1 msg in dlq consumer") - suite.Contains(dlqPayloads, "2", "expected msg with ID 2 in dlq consumer") - - // TODO: bad payload test - // suite.badPayloadTest(ctx, producer, dlqConsumer) -} - func (suite *MainTestSuite) badPayloadTest(ctx context.Context, producer pulsar.Producer, dlqConsumer pulsar.Consumer) { actualPayloads := []string{} if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("some bad payload")}); err != nil { diff --git a/pulsar/connector/client.go b/pulsar/connector/client.go index d9c874d..fec5bda 100644 --- a/pulsar/connector/client.go +++ b/pulsar/connector/client.go @@ -20,7 +20,8 @@ const ( tenantsPath = adminPath + "/tenants" namespacesPath = adminPath + "/namespaces" - dlqNamespaceSuffix = "-dlqs" + dlqNamespaceSuffix = "-dlqs" + retryNamespaceSuffix = "-retry" ) type PulsarClientOptions struct { @@ -131,6 +132,11 @@ func NewClient(options ...func(*PulsarClientOptions)) (Client, error) { if initErr = pulsarAdminRequest(http.MethodPut, dlqNamespacePath, nil); initErr != nil { return nil, fmt.Errorf("failed to create dlq namespace: %w", initErr) } + retryNamespacePath := namespacePath + retryNamespaceSuffix + log.Printf("creating retry namespace %s\n", retryNamespacePath) + if initErr = pulsarAdminRequest(http.MethodPut, retryNamespacePath, nil); initErr != nil { + return nil, fmt.Errorf("failed to create retry namespace: %w", initErr) + } } return pulsarClient, nil } diff --git a/pulsar/connector/consumer.go b/pulsar/connector/consumer.go index 80771b4..bc3c9bd 100644 --- a/pulsar/connector/consumer.go +++ b/pulsar/connector/consumer.go @@ -2,6 +2,7 @@ package connector import ( "fmt" + "strconv" "time" "github.com/apache/pulsar-client-go/pulsar" @@ -10,11 +11,98 @@ import ( type Consumer interface { pulsar.Consumer + //ReconsumeLaterDLQSafe returns false if the message is not Reconsumable (i.e. out of redeliveries or out of time. next attempt will go to dlq) + //If the message was sent for reconsuming the return value is true + ReconsumeLaterDLQSafe(msg pulsar.Message, delay time.Duration) bool + //IsReconsumable returns true if the message can be reconsumed or false if next attempt will go to dlq + IsReconsumable(msg pulsar.Message) bool } type consumer struct { pulsar.Consumer - //TODO override Receive Ack Nack for OTL + options createConsumerOptions +} + +const ( + propertyRetryStartTime = "RETRY_START_TIME" + propertySavedReconsumeAttempts = "SAVED_RECONSUME_ATTEMPS" +) + +func (c consumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) { + if !c.options.retryEnabled { + panic("reconsumeLater called on consumer without retry enabled option set to true") + } + if c.options.forceDLQSafeRetry { + panic("reconsumeLater: when forceDLQSafeRetry option is true ReconsumeLaterDLQSafe must be used") + } + c.applyReconsumeDurationProperties(msg) + c.Consumer.ReconsumeLater(msg, delay) +} + +func (c consumer) IsReconsumable(msg pulsar.Message) bool { + if !c.options.retryEnabled { + return false + } + c.applyReconsumeDurationProperties(msg) + reconsumeTimes := 1 + if msg.Properties() != nil { + if s, ok := msg.Properties()[pulsar.SysPropertyReconsumeTimes]; ok { + reconsumeTimes, _ = strconv.Atoi(s) + reconsumeTimes++ + } + } + return reconsumeTimes <= int(c.options.MaxDeliveryAttempts) +} + +func (c consumer) ReconsumeLaterDLQSafe(msg pulsar.Message, delay time.Duration) bool { + if !c.IsReconsumable(msg) { + return false + } + c.Consumer.ReconsumeLater(msg, delay) + return true +} + +func (c *consumer) applyReconsumeDurationProperties(msg pulsar.Message) { + if !c.options.retryEnabled { + return + } + if c.options.retryDuration > 0 && msg.Properties() != nil { + //get/set startTime + startTime := time.Now() + if _, ok := msg.Properties()[propertyRetryStartTime]; !ok { + msg.Properties()[propertyRetryStartTime] = startTime.Format(time.RFC3339) + } else { + startTime, _ = time.Parse(time.RFC3339, msg.Properties()[propertyRetryStartTime]) + } + //get reconsume Times + reconsumeTimes := 1 + if s, ok := msg.Properties()[pulsar.SysPropertyReconsumeTimes]; ok { + reconsumeTimes, _ = strconv.Atoi(s) + reconsumeTimes++ + + } + //get actual retries + actualRetries := 0 + if s, ok := msg.Properties()[propertySavedReconsumeAttempts]; ok { + actualRetries, _ = strconv.Atoi(s) + } + //if next delivery is about to exceed max deliveries (and nack) and the duration has not passed yet + if reconsumeTimes > int(c.options.MaxDeliveryAttempts) && time.Since(startTime) < c.options.retryDuration { + //reset the reconsume times to 1 + msg.Properties()[pulsar.SysPropertyReconsumeTimes] = "1" + //reduce the actual retries by 1 + actualRetries-- + //add the retry count the saved attempts property + msg.Properties()[propertySavedReconsumeAttempts] = strconv.Itoa(actualRetries + reconsumeTimes - 1) //reduce one becuase it was not reconsumed yet + //reset the reconsume times to 1 + msg.Properties()[pulsar.SysPropertyReconsumeTimes] = "1" + + } else if reconsumeTimes <= int(c.options.MaxDeliveryAttempts) && time.Since(startTime) >= c.options.retryDuration { + //duration passed set the retry to MaxDeliveryAttempts + msg.Properties()[propertySavedReconsumeAttempts] = strconv.Itoa(actualRetries + reconsumeTimes) + msg.Properties()[pulsar.SysPropertyReconsumeTimes] = strconv.Itoa(int(c.options.MaxDeliveryAttempts)) + } + } } type createConsumerOptions struct { @@ -29,7 +117,14 @@ type createConsumerOptions struct { BackoffPolicy pulsar.NackBackoffPolicy Tenant string Namespace string - RetryEnable bool + //retry options + retryEnabled bool + //duration of retry (overides the max delivery attemps) + retryDuration time.Duration + //safe retry with no sending to DLQ when set must use ReconsumeLaterDLQSafe and not ReconsumeLater + forceDLQSafeRetry bool + //automatically set to -retry/-retry + retryTopic string } func (opt *createConsumerOptions) defaults(config config.PulsarConfig) { @@ -63,6 +158,9 @@ func (opt *createConsumerOptions) validate() error { if opt.DefaultBackoffPolicy && opt.BackoffPolicy != nil { return fmt.Errorf("cannot specify both default backoff policy and backoff policy") } + if opt.MaxDeliveryAttempts == 0 && opt.retryEnabled { + return fmt.Errorf("cannot enable retry without setting max delivery attempts") + } return nil } @@ -75,9 +173,11 @@ func WithNamespace(tenant, namespace string) CreateConsumerOption { type CreateConsumerOption func(*createConsumerOptions) -func WithRetryEnable(retryEnable bool) CreateConsumerOption { +func WithRetryEnable(enable, forceDLQSafeRetry bool, retryDuration time.Duration) CreateConsumerOption { return func(o *createConsumerOptions) { - o.RetryEnable = retryEnable + o.retryEnabled = enable + o.retryDuration = retryDuration + o.forceDLQSafeRetry = forceDLQSafeRetry } } @@ -157,7 +257,11 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer if topicName == "" && len(opts.Topics) > 0 { topicName = opts.Topics[0] } - dlq = NewDlq(opts.Tenant, opts.dlqNamespace, topicName, opts.MaxDeliveryAttempts) + + if opts.retryEnabled { + opts.retryTopic = BuildPersistentTopic(opts.Tenant, opts.Namespace+retryNamespaceSuffix, topicName+"-retry") + } + dlq = NewDlq(opts.Tenant, opts.dlqNamespace, topicName, opts.MaxDeliveryAttempts, opts.retryTopic) } pulsarConsumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{ Topic: topic, @@ -167,7 +271,7 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer MessageChannel: opts.MessageChannel, DLQ: dlq, EnableDefaultNackBackoffPolicy: opts.DefaultBackoffPolicy, - RetryEnable: opts.RetryEnable, + RetryEnable: opts.retryEnabled, // Interceptors: tracer.NewConsumerInterceptors(ctx), NackRedeliveryDelay: opts.RedeliveryDelay, NackBackoffPolicy: opts.BackoffPolicy, @@ -175,6 +279,6 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer if err != nil { return nil, err } - return consumer{pulsarConsumer}, nil + return consumer{pulsarConsumer, *opts}, nil } diff --git a/pulsar/connector/dlq.go b/pulsar/connector/dlq.go index 6ca6784..d58b26a 100644 --- a/pulsar/connector/dlq.go +++ b/pulsar/connector/dlq.go @@ -7,10 +7,11 @@ import ( "github.com/apache/pulsar-client-go/pulsar" ) -func NewDlq(tenant, namespace string, topic TopicName, maxDeliveryAttempts uint32) *pulsar.DLQPolicy { +func NewDlq(tenant, namespace string, topic TopicName, maxDeliveryAttempts uint32, retryTopic string) *pulsar.DLQPolicy { return &pulsar.DLQPolicy{ - MaxDeliveries: maxDeliveryAttempts, - DeadLetterTopic: BuildPersistentTopic(tenant, namespace, topic+"-dlq"), + MaxDeliveries: maxDeliveryAttempts, + RetryLetterTopic: retryTopic, + DeadLetterTopic: BuildPersistentTopic(tenant, namespace, topic+"-dlq"), ProducerOptions: pulsar.ProducerOptions{ //TODO: OTL // Interceptors: tracer.NewProducerInterceptors(ctx), diff --git a/pulsar/connector/suite_test.go b/pulsar/connector/suite_test.go index c82a263..b55c2f4 100644 --- a/pulsar/connector/suite_test.go +++ b/pulsar/connector/suite_test.go @@ -5,6 +5,8 @@ import ( _ "embed" "fmt" "os/exec" + "strconv" + "sync" "net/http" "testing" @@ -12,7 +14,9 @@ import ( "encoding/json" + "github.com/kubescape/messaging/pulsar/common/utils" "github.com/kubescape/messaging/pulsar/config" + "golang.org/x/sync/errgroup" "github.com/apache/pulsar-client-go/pulsar" @@ -37,9 +41,10 @@ func TestBasicConnection(t *testing.T) { type MainTestSuite struct { suite.Suite - defaultTestConfig config.PulsarConfig - pulsarClient Client - shutdownFunc func() + failOnUnconsummedMessages bool + defaultTestConfig config.PulsarConfig + pulsarClient Client + shutdownFunc func() } func (suite *MainTestSuite) SetupSuite() { @@ -85,6 +90,67 @@ func (suite *MainTestSuite) SetupTest() { // suite.T().Log("setup test") } +func (suite *MainTestSuite) TearDownTest() { + //cleanup unconsumed messages + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) + defer cancel() + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, false, 0), + } + consumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + + dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer dlqConsumer.Close() + errg := errgroup.Group{} + errg.Go(func() error { + for { + msg, err := consumer.Receive(ctx) + if err != nil { + if ctx.Err() != nil { + return nil + } else { + return err + } + } + if err := consumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + if suite.failOnUnconsummedMessages { + return fmt.Errorf("unconsumed message topic:%s \npayload:%s \nproperites:%v", msg.Topic(), msg.Payload(), msg.Properties()) + } + } + }) + errg.Go(func() error { + for { + msg, err := dlqConsumer.Receive(ctx) + if err != nil { + if ctx.Err() != nil { + return nil + } else { + return err + } + } + if err := dlqConsumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + if suite.failOnUnconsummedMessages { + return fmt.Errorf("unconsumed message topic:%s \npayload:%s \nproperites:%v", msg.Topic(), msg.Payload(), msg.Properties()) + } + } + }) + if err := errg.Wait(); err != nil { + suite.FailNow(err.Error()) + } + suite.failOnUnconsummedMessages = false +} + func (suite *MainTestSuite) TestCreateConsumer() { //create consumer chan1 := make(chan pulsar.ConsumerMessage) @@ -227,7 +293,532 @@ func consumeMessages[P TestPayload](suite *MainTestSuite, ctx context.Context, c } actualPayloads[payload.GetId()] = payload fmt.Printf("%s: Ack() - ID %s", consumerId, payload.GetId()) - consumer.Ack(msg) + if err := consumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } } return actualPayloads } + +func (suite *MainTestSuite) TestDLQ() { + //start tenant check + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + //create producer to input test payloads + pubsubCtx := utils.NewContextWithValues(ctx, "testConsumer") + producer, err := CreateTestProducer(pubsubCtx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + //create consumer to get actual payloads + consumer, err := CreateTestConsumer(pubsubCtx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer dlqConsumer.Close() + + testPayload := []byte("[{\"id\":\"1\",\"data\":\"Hello World\"},{\"id\":2,\"data\":\"Hello from the other World\"}]") + + //send test payloads + produceMessages(suite, ctx, producer, loadJson[[]TestPayloadImplInterface](testPayload)) + //sleep to allow redelivery + time.Sleep(time.Second * 5) + //create next stage consumer and dlq consumer + wg := sync.WaitGroup{} + wg.Add(2) + var actualPayloads map[string]TestPayloadImpl + go func() { + defer wg.Done() + // consume payloads for one second + actualPayloads = consumeMessages[TestPayloadImpl](suite, pubsubCtx, consumer, "consumer", 20) + //sleep to allow redelivery + // + }() + var dlqPayloads map[string]TestInvalidPayloadImpl + go func() { + defer wg.Done() + time.Sleep(time.Second * 10) + // consume payloads for one second + dlqPayloads = consumeMessages[TestInvalidPayloadImpl](suite, pubsubCtx, dlqConsumer, "dlqConsumer", 20) + }() + wg.Wait() + + suite.Equal(1, len(actualPayloads), "expected 1 msg in successful consumer") + suite.Contains(actualPayloads, "1", "expected msg with ID 1 in successful consumer") + + suite.Equal(1, len(dlqPayloads), "expected 1 msg in dlq consumer") + suite.Contains(dlqPayloads, "2", "expected msg with ID 2 in dlq consumer") +} + +func (suite *MainTestSuite) TestReconsumeLaterWithNacks() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, false, 0), + } + //create consumer to get actual payloads + consumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + + dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer dlqConsumer.Close() + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(time.Second*2)) + defer consumerCancel() + msg, err := consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + //reconsume + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + //reconsume again + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + suite.False(consumer.IsReconsumable(msg), "expect message not to be reconsumable") + consumer.Nack(msg) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + //nack again - 2nd redelivery + consumer.Nack(msg) + //expect dlq + msg, err = dlqConsumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + if err := dlqConsumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + testMsg(msg) + //fail on test teardown if there are unconsumed messages + suite.failOnUnconsummedMessages = true + +} + +func (suite *MainTestSuite) TestReconsumeLaterMaxAttemps() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + //create consumer to get actual payloads + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, false, 0), + } + consumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + + dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer dlqConsumer.Close() + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(time.Second*2)) + defer consumerCancel() + msg, err := consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + //reconsume + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + //reconsume again + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + suite.False(consumer.IsReconsumable(msg), "expect message not to be reconsumable") + //reconsume again - this time it should go to dlq + consumer.ReconsumeLater(msg, time.Millisecond*5) + //expect dlq + msg, err = dlqConsumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + if err := dlqConsumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + testMsg(msg) + //fail on test teardown if there are unconsumed messages + suite.failOnUnconsummedMessages = true + +} + +func (suite *MainTestSuite) TestReconsumeLater() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + //create consumer to get actual payloads + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, false, 0), + } + consumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(time.Second*2)) + defer consumerCancel() + msg, err := consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + //reconsume + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + //reconsume again + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + suite.False(consumer.IsReconsumable(msg), "expect message not to be reconsumable") + if err := consumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + //fail on test teardown if there are unconsumed messages + suite.failOnUnconsummedMessages = true + +} + +func (suite *MainTestSuite) TestReconsumeLaterWithDuration() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + //create consumer to get actual payloads + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, false, time.Second*2), + } + consumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + + dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer dlqConsumer.Close() + + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(time.Second*2)) + defer consumerCancel() + msg, err := consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + //reconsume 20 times + for i := 0; i < 20; i++ { + consumer.ReconsumeLater(msg, time.Millisecond*5) + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + } + time.Sleep(time.Millisecond * 2300) + suite.False(consumer.IsReconsumable(msg), "expect message not to be reconsumable") + //reconsume anyway + consumer.ReconsumeLater(msg, time.Millisecond*5) + //expect dlq + msg, err = dlqConsumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + if err := dlqConsumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + testMsg(msg) + + //fail on test teardown if there are unconsumed messages + suite.failOnUnconsummedMessages = true +} + +func (suite *MainTestSuite) TestSafeReconsumeLaterWithDuration() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + //create consumer to get actual payloads + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, false, time.Second*2), + } + consumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer consumer.Close() + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(2)) + defer consumerCancel() + msg, err := consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + //reconsume 20 times + for i := 0; i < 20; i++ { + sent := consumer.ReconsumeLaterDLQSafe(msg, time.Millisecond) + if !sent { + suite.FailNow("expected to message to be reconsumed got false in meesage num ", i) + } + msg, err = consumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "reconsume payload") + } + testMsg(msg) + } + time.Sleep(time.Millisecond * 2300) + suite.False(consumer.IsReconsumable(msg), "expect message not to be reconsumable") + //reconsume anyway + sent := consumer.ReconsumeLaterDLQSafe(msg, time.Millisecond*5) + suite.False(sent, "expect reconsume to send message") + savedRetries, _ := strconv.Atoi(msg.Properties()[propertySavedReconsumeAttempts]) + retries, _ := strconv.Atoi(msg.Properties()[pulsar.SysPropertyReconsumeTimes]) + suite.Equal(20, savedRetries+retries, "expected 20 retries") + if err := consumer.Ack(msg); err != nil { + suite.FailNow(err.Error()) + } + //fail on test teardown if there are unconsumed messages + suite.failOnUnconsummedMessages = true +} + +func (suite *MainTestSuite) TestReconsumeLaterPanicOnRetryDisabled() { + defer func() { + if r := recover(); r == nil { + suite.T().Errorf("The code did not panic") + } else { + suite.T().Logf("Recovered in TestReconsumeLaterPanics: %v", r) + } + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + + noRetrayEnabledConsumer, err := CreateTestConsumer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error()) + } + defer noRetrayEnabledConsumer.Close() + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(2)) + defer consumerCancel() + msg, err := noRetrayEnabledConsumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + noRetrayEnabledConsumer.ReconsumeLater(msg, time.Millisecond) + suite.FailNow("should panic on call reconsume when retry option is not enabled") +} + +func (suite *MainTestSuite) TestReconsumeLaterPanicOnUnSafeReconsume() { + defer func() { + if r := recover(); r == nil { + suite.T().Errorf("The code did not panic") + } else { + suite.T().Logf("Recovered in TestReconsumeLaterPanicOnUnSafeReconsume: %v", r) + } + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer, err := CreateTestProducer(ctx, suite.pulsarClient) + if err != nil { + suite.FailNow(err.Error(), "create producer") + } + if producer == nil { + suite.FailNow("producer is nil") + } + defer producer.Close() + reconsumeLaterOptions := []CreateConsumerOption{ + WithRetryEnable(true, true, time.Second*2), + } + safeOnlyEnabledConsumer, err := CreateTestConsumer(ctx, suite.pulsarClient, reconsumeLaterOptions...) + if err != nil { + suite.FailNow(err.Error()) + } + defer safeOnlyEnabledConsumer.Close() + //produce + if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(suite.T().Name())}); err != nil { + suite.FailNow(err.Error(), "send payload") + } + testMsg := func(msg pulsar.Message) { + if msg == nil { + suite.FailNow("msg is nil") + } + if string(msg.Payload()) != suite.T().Name() { + suite.FailNow("unexpected payload") + } + } + //consume + testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(2)) + defer consumerCancel() + msg, err := safeOnlyEnabledConsumer.Receive(testConsumerCtx) + if err != nil { + suite.FailNow(err.Error(), "receive payload") + } + testMsg(msg) + safeOnlyEnabledConsumer.ReconsumeLater(msg, time.Millisecond) + suite.FailNow("should panic on call ReconsumeLater when safe only option is set") +}