Skip to content

Commit

Permalink
Merge pull request #22 from kubescape/av-options
Browse files Browse the repository at this point in the history
Handle re-consume  later
  • Loading branch information
avrahams authored Feb 13, 2024
2 parents 7c8f6da + d3c3f54 commit 9a05f40
Show file tree
Hide file tree
Showing 7 changed files with 726 additions and 121 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
go.opentelemetry.io/otel v1.13.0
go.opentelemetry.io/otel/trace v1.13.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
113 changes: 7 additions & 106 deletions pulsar/connector/basic_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
_ "embed"
"fmt"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar"
Expand Down Expand Up @@ -78,43 +77,6 @@ func (suite *MainTestSuite) TestConsumerAndProducer() {
suite.Equal(2, len(actualPayloads), "expected 2 messages")
}

func (suite *MainTestSuite) TestReConsumerLater() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
producer, err := CreateTestProducer(ctx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error(), "create producer")
}
if producer == nil {
suite.FailNow("producer is nil")
}
defer producer.Close()
//create consumer to get actual payloads
consumer, err := CreateTestConsumer(ctx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error())
}
defer consumer.Close()

//produce
if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("hello workld")}); err != nil {
suite.FailNow(err.Error(), "send payload")
}
//consume
testConsumerCtx, consumerCancel := context.WithTimeout(ctx, time.Second*time.Duration(time.Second*2))
defer consumerCancel()
msg, err := consumer.Receive(testConsumerCtx)
if err != nil {
suite.FailNow(err.Error(), "receive payload")
}
//reconsume
consumer.ReconsumeLater(msg, time.Millisecond*5)
msg, err = consumer.Receive(testConsumerCtx)
if err != nil {
suite.FailNow(err.Error(), "reconsume payload")
}
}

// CreateTestProducer creates a producer
func CreateTestProducer(ctx context.Context, client Client) (pulsar.Producer, error) {

Expand All @@ -131,88 +93,27 @@ func CreateTestProducer(ctx context.Context, client Client) (pulsar.Producer, er
return producer, err
}

func CreateTestConsumer(ctx context.Context, client Client) (pulsar.Consumer, error) {
return client.NewConsumer(WithTopic(TestTopicName),
func CreateTestConsumer(ctx context.Context, client Client, createConsumerOpts ...CreateConsumerOption) (Consumer, error) {
createConsumerOpts = append(createConsumerOpts, WithTopic(TestTopicName),
WithSubscriptionName(TestSubscriptionName),
WithRedeliveryDelay(time.Duration(client.GetConfig().RedeliveryDelaySeconds)*time.Second),
WithDLQ(uint32(client.GetConfig().MaxDeliveryAttempts)),
WithDefaultBackoffPolicy(),
WithRetryEnable(true),
WithDefaultBackoffPolicy())
return client.NewConsumer(
createConsumerOpts...,
)

}

func CreateTestDlqConsumer(client Client) (pulsar.Consumer, error) {
return client.NewConsumer(WithTopic(TestTopicName),
return client.NewConsumer(WithTopic(TestTopicName+"-dlq"),
WithSubscriptionName(TestSubscriptionName+"-dlq"),
WithRedeliveryDelay(0),
WithNamespace("ca-messaging", "test-namespace-dlqs"),
WithDLQ(0),
)
}

func (suite *MainTestSuite) TestDLQ() {
//start tenant check
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

//create producer to input test payloads
pubsubCtx := utils.NewContextWithValues(ctx, "testConsumer")
producer, err := CreateTestProducer(pubsubCtx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error(), "create producer")
}
if producer == nil {
suite.FailNow("producer is nil")
}
defer producer.Close()
//create consumer to get actual payloads
consumer, err := CreateTestConsumer(pubsubCtx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error())
}
defer consumer.Close()
dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error())
}
defer dlqConsumer.Close()

testPayload := []byte("[{\"id\":\"1\",\"data\":\"Hello World\"},{\"id\":2,\"data\":\"Hello from the other World\"}]")

//send test payloads
produceMessages(suite, ctx, producer, loadJson[[]TestPayloadImplInterface](testPayload))
//sleep to allow redelivery
time.Sleep(time.Second * 5)
//create next stage consumer and dlq consumer
wg := sync.WaitGroup{}
wg.Add(2)
var actualPayloads map[string]TestPayloadImpl
go func() {
defer wg.Done()
// consume payloads for one second
actualPayloads = consumeMessages[TestPayloadImpl](suite, pubsubCtx, consumer, "consumer", 20)
//sleep to allow redelivery
//
}()
var dlqPayloads map[string]TestInvalidPayloadImpl
go func() {
defer wg.Done()
time.Sleep(time.Second * 10)
// consume payloads for one second
dlqPayloads = consumeMessages[TestInvalidPayloadImpl](suite, pubsubCtx, dlqConsumer, "dlqConsumer", 20)
}()
wg.Wait()

suite.Equal(1, len(actualPayloads), "expected 1 msg in successful consumer")
suite.Contains(actualPayloads, "1", "expected msg with ID 1 in successful consumer")

suite.Equal(1, len(dlqPayloads), "expected 1 msg in dlq consumer")
suite.Contains(dlqPayloads, "2", "expected msg with ID 2 in dlq consumer")

// TODO: bad payload test
// suite.badPayloadTest(ctx, producer, dlqConsumer)
}

func (suite *MainTestSuite) badPayloadTest(ctx context.Context, producer pulsar.Producer, dlqConsumer pulsar.Consumer) {
actualPayloads := []string{}
if _, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("some bad payload")}); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pulsar/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
tenantsPath = adminPath + "/tenants"
namespacesPath = adminPath + "/namespaces"

dlqNamespaceSuffix = "-dlqs"
dlqNamespaceSuffix = "-dlqs"
retryNamespaceSuffix = "-retry"
)

type PulsarClientOptions struct {
Expand Down Expand Up @@ -131,6 +132,11 @@ func NewClient(options ...func(*PulsarClientOptions)) (Client, error) {
if initErr = pulsarAdminRequest(http.MethodPut, dlqNamespacePath, nil); initErr != nil {
return nil, fmt.Errorf("failed to create dlq namespace: %w", initErr)
}
retryNamespacePath := namespacePath + retryNamespaceSuffix
log.Printf("creating retry namespace %s\n", retryNamespacePath)
if initErr = pulsarAdminRequest(http.MethodPut, retryNamespacePath, nil); initErr != nil {
return nil, fmt.Errorf("failed to create retry namespace: %w", initErr)
}
}
return pulsarClient, nil
}
Expand Down
118 changes: 111 additions & 7 deletions pulsar/connector/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connector

import (
"fmt"
"strconv"
"time"

"github.com/apache/pulsar-client-go/pulsar"
Expand All @@ -10,11 +11,98 @@ import (

type Consumer interface {
pulsar.Consumer
//ReconsumeLaterDLQSafe returns false if the message is not Reconsumable (i.e. out of redeliveries or out of time. next attempt will go to dlq)
//If the message was sent for reconsuming the return value is true
ReconsumeLaterDLQSafe(msg pulsar.Message, delay time.Duration) bool
//IsReconsumable returns true if the message can be reconsumed or false if next attempt will go to dlq
IsReconsumable(msg pulsar.Message) bool
}

type consumer struct {
pulsar.Consumer
//TODO override Receive Ack Nack for OTL
options createConsumerOptions
}

const (
propertyRetryStartTime = "RETRY_START_TIME"
propertySavedReconsumeAttempts = "SAVED_RECONSUME_ATTEMPS"
)

func (c consumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {
if !c.options.retryEnabled {
panic("reconsumeLater called on consumer without retry enabled option set to true")
}
if c.options.forceDLQSafeRetry {
panic("reconsumeLater: when forceDLQSafeRetry option is true ReconsumeLaterDLQSafe must be used")
}
c.applyReconsumeDurationProperties(msg)
c.Consumer.ReconsumeLater(msg, delay)
}

func (c consumer) IsReconsumable(msg pulsar.Message) bool {
if !c.options.retryEnabled {
return false
}
c.applyReconsumeDurationProperties(msg)
reconsumeTimes := 1
if msg.Properties() != nil {
if s, ok := msg.Properties()[pulsar.SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++
}
}
return reconsumeTimes <= int(c.options.MaxDeliveryAttempts)
}

func (c consumer) ReconsumeLaterDLQSafe(msg pulsar.Message, delay time.Duration) bool {
if !c.IsReconsumable(msg) {
return false
}
c.Consumer.ReconsumeLater(msg, delay)
return true
}

func (c *consumer) applyReconsumeDurationProperties(msg pulsar.Message) {
if !c.options.retryEnabled {
return
}
if c.options.retryDuration > 0 && msg.Properties() != nil {
//get/set startTime
startTime := time.Now()
if _, ok := msg.Properties()[propertyRetryStartTime]; !ok {
msg.Properties()[propertyRetryStartTime] = startTime.Format(time.RFC3339)
} else {
startTime, _ = time.Parse(time.RFC3339, msg.Properties()[propertyRetryStartTime])
}
//get reconsume Times
reconsumeTimes := 1
if s, ok := msg.Properties()[pulsar.SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++

}
//get actual retries
actualRetries := 0
if s, ok := msg.Properties()[propertySavedReconsumeAttempts]; ok {
actualRetries, _ = strconv.Atoi(s)
}
//if next delivery is about to exceed max deliveries (and nack) and the duration has not passed yet
if reconsumeTimes > int(c.options.MaxDeliveryAttempts) && time.Since(startTime) < c.options.retryDuration {
//reset the reconsume times to 1
msg.Properties()[pulsar.SysPropertyReconsumeTimes] = "1"
//reduce the actual retries by 1
actualRetries--
//add the retry count the saved attempts property
msg.Properties()[propertySavedReconsumeAttempts] = strconv.Itoa(actualRetries + reconsumeTimes - 1) //reduce one becuase it was not reconsumed yet
//reset the reconsume times to 1
msg.Properties()[pulsar.SysPropertyReconsumeTimes] = "1"

} else if reconsumeTimes <= int(c.options.MaxDeliveryAttempts) && time.Since(startTime) >= c.options.retryDuration {
//duration passed set the retry to MaxDeliveryAttempts
msg.Properties()[propertySavedReconsumeAttempts] = strconv.Itoa(actualRetries + reconsumeTimes)
msg.Properties()[pulsar.SysPropertyReconsumeTimes] = strconv.Itoa(int(c.options.MaxDeliveryAttempts))
}
}
}

type createConsumerOptions struct {
Expand All @@ -29,7 +117,14 @@ type createConsumerOptions struct {
BackoffPolicy pulsar.NackBackoffPolicy
Tenant string
Namespace string
RetryEnable bool
//retry options
retryEnabled bool
//duration of retry (overides the max delivery attemps)
retryDuration time.Duration
//safe retry with no sending to DLQ when set must use ReconsumeLaterDLQSafe and not ReconsumeLater
forceDLQSafeRetry bool
//automatically set to <namepace>-retry/<topic>-retry
retryTopic string
}

func (opt *createConsumerOptions) defaults(config config.PulsarConfig) {
Expand Down Expand Up @@ -63,6 +158,9 @@ func (opt *createConsumerOptions) validate() error {
if opt.DefaultBackoffPolicy && opt.BackoffPolicy != nil {
return fmt.Errorf("cannot specify both default backoff policy and backoff policy")
}
if opt.MaxDeliveryAttempts == 0 && opt.retryEnabled {
return fmt.Errorf("cannot enable retry without setting max delivery attempts")
}
return nil
}

Expand All @@ -75,9 +173,11 @@ func WithNamespace(tenant, namespace string) CreateConsumerOption {

type CreateConsumerOption func(*createConsumerOptions)

func WithRetryEnable(retryEnable bool) CreateConsumerOption {
func WithRetryEnable(enable, forceDLQSafeRetry bool, retryDuration time.Duration) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.RetryEnable = retryEnable
o.retryEnabled = enable
o.retryDuration = retryDuration
o.forceDLQSafeRetry = forceDLQSafeRetry
}
}

Expand Down Expand Up @@ -157,7 +257,11 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer
if topicName == "" && len(opts.Topics) > 0 {
topicName = opts.Topics[0]
}
dlq = NewDlq(opts.Tenant, opts.dlqNamespace, topicName, opts.MaxDeliveryAttempts)

if opts.retryEnabled {
opts.retryTopic = BuildPersistentTopic(opts.Tenant, opts.Namespace+retryNamespaceSuffix, topicName+"-retry")
}
dlq = NewDlq(opts.Tenant, opts.dlqNamespace, topicName, opts.MaxDeliveryAttempts, opts.retryTopic)
}
pulsarConsumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
Expand All @@ -167,14 +271,14 @@ func newSharedConsumer(pulsarClient Client, createConsumerOpts ...CreateConsumer
MessageChannel: opts.MessageChannel,
DLQ: dlq,
EnableDefaultNackBackoffPolicy: opts.DefaultBackoffPolicy,
RetryEnable: opts.RetryEnable,
RetryEnable: opts.retryEnabled,
// Interceptors: tracer.NewConsumerInterceptors(ctx),
NackRedeliveryDelay: opts.RedeliveryDelay,
NackBackoffPolicy: opts.BackoffPolicy,
})
if err != nil {
return nil, err
}
return consumer{pulsarConsumer}, nil
return consumer{pulsarConsumer, *opts}, nil

}
7 changes: 4 additions & 3 deletions pulsar/connector/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
)

func NewDlq(tenant, namespace string, topic TopicName, maxDeliveryAttempts uint32) *pulsar.DLQPolicy {
func NewDlq(tenant, namespace string, topic TopicName, maxDeliveryAttempts uint32, retryTopic string) *pulsar.DLQPolicy {
return &pulsar.DLQPolicy{
MaxDeliveries: maxDeliveryAttempts,
DeadLetterTopic: BuildPersistentTopic(tenant, namespace, topic+"-dlq"),
MaxDeliveries: maxDeliveryAttempts,
RetryLetterTopic: retryTopic,
DeadLetterTopic: BuildPersistentTopic(tenant, namespace, topic+"-dlq"),
ProducerOptions: pulsar.ProducerOptions{
//TODO: OTL
// Interceptors: tracer.NewProducerInterceptors(ctx),
Expand Down
Loading

0 comments on commit 9a05f40

Please sign in to comment.