diff --git a/examples/pubsub/README.md b/examples/pubsub/README.md index d3e92315..cfa522cb 100644 --- a/examples/pubsub/README.md +++ b/examples/pubsub/README.md @@ -19,10 +19,14 @@ name: Run Subscriber Server output_match_mode: substring expected_stdout_lines: - 'event - PubsubName: messages, Topic: neworder' + - 'event - PubsubName: messages, Topic: newbulkorder' + - 'event - PubsubName: messages, Topic: newbulkorder' background: true sleep: 15 --> +#### Note: sub/sub.go contains both AddTopicEventHandler (used for subscribe of messages) and AddBulkTopicEventHandler (used for bulksubscribe of messages) + ```bash dapr run --app-id sub \ --app-protocol http \ @@ -44,10 +48,9 @@ expected_stdout_lines: background: true sleep: 15 --> +#### Note: pub/pub.go contains both PublishEvents (used for publish of messages) and PublishEvent (used for bulkPublish of messages) ```bash -export DAPR_PUBSUB_NAME=messages - dapr run --app-id pub \ --log-level debug \ --resources-path ./config \ @@ -76,6 +79,6 @@ dapr stop --app-id sub ```shell == APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping -== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: cc13829c-af77-4303-a4d7-55cdc0b0fa7d, Data: multi-pong -== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 0147f10a-d6c3-4b16-ad5a-6776956757dd, Data: multi-ping +== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: newbulkorder, ID: cc13829c-af77-4303-a4d7-55cdc0b0fa7d, Data: multi-pong +== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: newbulkorder, ID: 0147f10a-d6c3-4b16-ad5a-6776956757dd, Data: multi-ping ``` diff --git a/examples/pubsub/pub/pub.go b/examples/pubsub/pub/pub.go index 60bce409..e4928f45 100644 --- a/examples/pubsub/pub/pub.go +++ b/examples/pubsub/pub/pub.go @@ -16,15 +16,15 @@ package main import ( "context" "fmt" - "os" dapr "github.com/dapr/go-sdk/client" ) var ( // set the environment as instructions. - pubsubName = os.Getenv("DAPR_PUBSUB_NAME") - topicName = "neworder" + pubsubName = "messages" + topicName = "neworder" + bulkTopicName = "newbulkorder" ) func main() { @@ -44,7 +44,7 @@ func main() { } // Publish multiple events - if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil { + if res := client.PublishEvents(ctx, pubsubName, bulkTopicName, publishEventsData); res.Error != nil { panic(err) } diff --git a/examples/pubsub/sub/sub.go b/examples/pubsub/sub/sub.go index c9089d61..43e973d2 100644 --- a/examples/pubsub/sub/sub.go +++ b/examples/pubsub/sub/sub.go @@ -35,12 +35,10 @@ var defaultSubscription = &common.Subscription{ Route: "/orders", } -var importantSubscription = &common.Subscription{ +var bulkSubscription = &common.Subscription{ PubsubName: "messages", - Topic: "neworder", - Route: "/important", - Match: `event.type == "important"`, - Priority: 1, + Topic: "newbulkorder", + Route: "/bulkorders", } func main() { @@ -50,7 +48,7 @@ func main() { log.Fatalf("error adding topic subscription: %v", err) } - if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil { + if err := s.AddBulkTopicEventHandler(bulkSubscription, eventHandler, 10, 100); err != nil { log.Fatalf("error adding topic subscription: %v", err) } @@ -63,8 +61,3 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data) return false, nil } - -func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { - log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data) - return false, nil -} diff --git a/service/common/service.go b/service/common/service.go index 7f306e4d..20194501 100644 --- a/service/common/service.go +++ b/service/common/service.go @@ -35,6 +35,8 @@ type Service interface { // AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service. // Note, retries are only considered when there is an error. Lack of error is considered as a success AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error + // AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service. + AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error // RegisterActorImplFactory Register a new actor to actor runtime of go sdk diff --git a/service/grpc/topic.go b/service/grpc/topic.go index b773b569..cd501094 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -38,16 +38,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE return s.topicRegistrar.AddSubscription(sub, fn) } +func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { + if sub == nil { + return errors.New("subscription required") + } + + return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs) +} + // ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. func (s *Server) ListTopicSubscriptions(ctx context.Context, in *emptypb.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) { subs := make([]*runtimev1pb.TopicSubscription, 0) for _, v := range s.topicRegistrar { s := v.Subscription sub := &runtimev1pb.TopicSubscription{ - PubsubName: s.PubsubName, - Topic: s.Topic, - Metadata: s.Metadata, - Routes: convertRoutes(s.Routes), + PubsubName: s.PubsubName, + Topic: s.Topic, + Metadata: s.Metadata, + Routes: convertRoutes(s.Routes), + BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe), } subs = append(subs, sub) } @@ -74,6 +83,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes { } } +func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribeOptions) *runtimev1pb.BulkSubscribeConfig { + if bulkSubscribe == nil { + return nil + } + return &runtimev1pb.BulkSubscribeConfig{ + Enabled: bulkSubscribe.Enabled, + MaxMessagesCount: bulkSubscribe.MaxMessagesCount, + MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs, + } +} + // OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. // Dapr sends published messages in a CloudEvents v1.0 envelope. func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) { diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index 854362ec..95bbdec2 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -18,10 +18,9 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" - - "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/emptypb" "github.com/dapr/dapr/pkg/proto/runtime/v1" @@ -31,98 +30,259 @@ import ( func TestTopicErrors(t *testing.T) { server := getTestServer() err := server.AddTopicEventHandler(nil, nil) - require.Errorf(t, err, "expected error on nil sub") + require.Error(t, err, "expected error on nil sub with AddTopicEventHandler") + + err = server.AddBulkTopicEventHandler(nil, nil, 0, 0) + require.Error(t, err, "expected error on nil sub with AddBulkTopicEventHandler") sub := &common.Subscription{} err = server.AddTopicEventHandler(sub, nil) - require.Errorf(t, err, "expected error on invalid sub") + require.Error(t, err, "expected error on invalid sub with AddTopicEventHandler") + err = server.AddBulkTopicEventHandler(sub, nil, 0, 0) + require.Error(t, err, "expected error on invalid sub with AddBulkTopicEventHandler") sub.PubsubName = "messages" err = server.AddTopicEventHandler(sub, nil) - require.Errorf(t, err, "expected error on sub without topic") + require.Error(t, err, "expected error on sub without topic with AddTopicEventHandler") + sub.PubsubName = "messages" + err = server.AddBulkTopicEventHandler(sub, nil, 0, 0) + require.Error(t, err, "expected error on sub without topic with AddBulkTopicEventHandler") sub.Topic = "test" err = server.AddTopicEventHandler(sub, nil) - require.Errorf(t, err, "expected error on sub without handler") + require.Error(t, err, "expected error on sub without handler") + err = server.AddBulkTopicEventHandler(sub, nil, 0, 0) + require.Error(t, err, "expected error on sub without handler") } func TestTopicSubscriptionList(t *testing.T) { - server := getTestServer() + t.Run("With single event handling", func(t *testing.T) { + server := getTestServer() - // Add default route. - sub1 := &common.Subscription{ - PubsubName: "messages", - Topic: "test", - Route: "/test", - } - err := server.AddTopicEventHandler(sub1, eventHandler) - require.NoError(t, err) - resp, err := server.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) - require.NoError(t, err) - assert.NotNil(t, resp) - if assert.Lenf(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { - sub := resp.GetSubscriptions()[0] - assert.Equal(t, "messages", sub.GetPubsubName()) - assert.Equal(t, "test", sub.GetTopic()) - assert.Nil(t, sub.GetRoutes()) - } + sub1 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + } + err := server.AddTopicEventHandler(sub1, eventHandler) + require.NoError(t, err) + resp, err := server.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) + require.NoError(t, err) + assert.NotNil(t, resp) + if assert.Len(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + assert.Nil(t, sub.GetRoutes()) + } - // Add routing rule. - sub2 := &common.Subscription{ - PubsubName: "messages", - Topic: "test", - Route: "/other", - Match: `event.type == "other"`, - } - err = server.AddTopicEventHandler(sub2, eventHandler) - require.NoError(t, err) - resp, err = server.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) - require.NoError(t, err) - assert.NotNil(t, resp) - if assert.Lenf(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { - sub := resp.GetSubscriptions()[0] - assert.Equal(t, "messages", sub.GetPubsubName()) - assert.Equal(t, "test", sub.GetTopic()) - if assert.NotNil(t, sub.GetRoutes()) { - assert.Equal(t, "/test", sub.GetRoutes().GetDefault()) - if assert.Len(t, sub.GetRoutes().GetRules(), 1) { - rule := sub.GetRoutes().GetRules()[0] - assert.Equal(t, "/other", rule.GetPath()) - assert.Equal(t, `event.type == "other"`, rule.GetMatch()) + // Add routing rule. + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/other", + Match: `event.type == "other"`, + } + err = server.AddTopicEventHandler(sub2, eventHandler) + require.NoError(t, err) + resp, err = server.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) + require.NoError(t, err) + assert.NotNil(t, resp) + if assert.Len(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + if assert.NotNil(t, sub.GetRoutes()) { + assert.Equal(t, "/test", sub.GetRoutes().GetDefault()) + if assert.Len(t, sub.GetRoutes().GetRules(), 1) { + rule := sub.GetRoutes().GetRules()[0] + assert.Equal(t, "/other", rule.GetPath()) + assert.Equal(t, `event.type == "other"`, rule.GetMatch()) + } } } - } + }) + t.Run("With bulk event handling", func(t *testing.T) { + server := getTestServer() + sub1 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + } + err := server.AddBulkTopicEventHandler(sub1, eventHandler, 10, 1000) + require.NoError(t, err) + resp, err := server.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) + require.NoError(t, err) + assert.NotNil(t, resp) + if assert.Len(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + assert.Nil(t, sub.GetRoutes()) + } + + // Add routing rule. + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/other", + Match: `event.type == "other"`, + } + err = server.AddBulkTopicEventHandler(sub2, eventHandler, 10, 1000) + require.NoError(t, err) + resp, err = server.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) + require.NoError(t, err) + assert.NotNil(t, resp) + if assert.Len(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + if assert.NotNil(t, sub.GetRoutes()) { + assert.Equal(t, "/test", sub.GetRoutes().GetDefault()) + if assert.Len(t, sub.GetRoutes().GetRules(), 1) { + rule := sub.GetRoutes().GetRules()[0] + assert.Equal(t, "/other", rule.GetPath()) + assert.Equal(t, `event.type == "other"`, rule.GetMatch()) + } + } + } + }) } // go test -timeout 30s ./service/grpc -count 1 -run ^TestTopic$ func TestTopic(t *testing.T) { - ctx := context.Background() + t.Run("With single event handling", func(t *testing.T) { + ctx := context.Background() - sub := &common.Subscription{ - PubsubName: "messages", - Topic: "test", - } - server := getTestServer() + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + } + server := getTestServer() + + err := server.AddTopicEventHandler(sub, eventHandler) + require.NoError(t, err) - err := server.AddTopicEventHandler(sub, eventHandler) - require.NoError(t, err) + startTestServer(server) - startTestServer(server) + t.Run("topic event without request", func(t *testing.T) { + _, err := server.OnTopicEvent(ctx, nil) + require.Error(t, err) + }) - t.Run("topic event without request", func(t *testing.T) { - _, err := server.OnTopicEvent(ctx, nil) - require.Error(t, err) + t.Run("topic event for wrong topic", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Topic: "invalid", + } + _, err := server.OnTopicEvent(ctx, in) + require.Error(t, err) + }) + + t.Run("topic event for valid topic", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + } + _, err := server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) + + stopTestServer(t, server) }) + t.Run("With bulk event handling", func(t *testing.T) { + ctx := context.Background() - t.Run("topic event for wrong topic", func(t *testing.T) { - in := &runtime.TopicEventRequest{ - Topic: "invalid", + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", } - _, err := server.OnTopicEvent(ctx, in) - require.Error(t, err) + server := getTestServer() + + err := server.AddBulkTopicEventHandler(sub, eventHandler, 10, 1000) + require.NoError(t, err) + + startTestServer(server) + + t.Run("topic event without request", func(t *testing.T) { + _, err := server.OnTopicEvent(ctx, nil) + require.Error(t, err) + }) + + t.Run("topic event for wrong topic", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Topic: "invalid", + } + _, err := server.OnTopicEvent(ctx, in) + require.Error(t, err) + }) + + t.Run("topic event for valid topic", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + } + _, err := server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) + + t.Run("topic event for valid topic with metadata", func(t *testing.T) { + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", + } + err := server.AddTopicEventHandler(sub2, func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + assert.Equal(t, "value1", e.Metadata["key1"]) + return false, nil + }) + require.NoError(t, err) + + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + } + ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"Metadata.key1": "value1"})) + _, err = server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) + + stopTestServer(t, server) }) +} + +func TestTopicWithValidationDisabled(t *testing.T) { + t.Run("With single event handling", func(t *testing.T) { + ctx := context.Background() + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "*", + DisableTopicValidation: true, + } + server := getTestServer() + + err := server.AddTopicEventHandler(sub, eventHandler) + require.NoError(t, err) + + startTestServer(server) - t.Run("topic event for valid topic", func(t *testing.T) { in := &runtime.TopicEventRequest{ Id: "a123", Source: "test", @@ -130,24 +290,28 @@ func TestTopic(t *testing.T) { SpecVersion: "v1.0", DataContentType: "text/plain", Data: []byte("test"), - Topic: sub.Topic, + Topic: "test", PubsubName: sub.PubsubName, } - _, err := server.OnTopicEvent(ctx, in) + + _, err = server.OnTopicEvent(ctx, in) require.NoError(t, err) }) + t.Run("With bulk event handling", func(t *testing.T) { + ctx := context.Background() - t.Run("topic event for valid topic with metadata", func(t *testing.T) { - sub2 := &common.Subscription{ - PubsubName: "messages", - Topic: "test2", + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "*", + DisableTopicValidation: true, } - err := server.AddTopicEventHandler(sub2, func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { - assert.Equal(t, "value1", e.Metadata["key1"]) - return false, nil - }) + server := getTestServer() + + err := server.AddBulkTopicEventHandler(sub, eventHandler, 10, 1000) require.NoError(t, err) + startTestServer(server) + in := &runtime.TopicEventRequest{ Id: "a123", Source: "test", @@ -155,102 +319,128 @@ func TestTopic(t *testing.T) { SpecVersion: "v1.0", DataContentType: "text/plain", Data: []byte("test"), - Topic: sub2.Topic, - PubsubName: sub2.PubsubName, + Topic: "test", + PubsubName: sub.PubsubName, } - ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"Metadata.key1": "value1"})) + _, err = server.OnTopicEvent(ctx, in) require.NoError(t, err) }) - - stopTestServer(t, server) } -func TestTopicWithValidationDisabled(t *testing.T) { - ctx := context.Background() - - sub := &common.Subscription{ - PubsubName: "messages", - Topic: "*", - DisableTopicValidation: true, - } - server := getTestServer() - - err := server.AddTopicEventHandler(sub, eventHandler) - require.NoError(t, err) - - startTestServer(server) +func TestTopicWithErrors(t *testing.T) { + t.Run("With single event handling", func(t *testing.T) { + ctx := context.Background() - in := &runtime.TopicEventRequest{ - Id: "a123", - Source: "test", - Type: "test", - SpecVersion: "v1.0", - DataContentType: "text/plain", - Data: []byte("test"), - Topic: "test", - PubsubName: sub.PubsubName, - } + sub1 := &common.Subscription{ + PubsubName: "messages", + Topic: "test1", + } - _, err = server.OnTopicEvent(ctx, in) - require.NoError(t, err) -} + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", + } + server := getTestServer() -func TestTopicWithErrors(t *testing.T) { - ctx := context.Background() + err := server.AddTopicEventHandler(sub1, eventHandlerWithRetryError) + require.NoError(t, err) - sub1 := &common.Subscription{ - PubsubName: "messages", - Topic: "test1", - } + err = server.AddTopicEventHandler(sub2, eventHandlerWithError) + require.NoError(t, err) - sub2 := &common.Subscription{ - PubsubName: "messages", - Topic: "test2", - } - server := getTestServer() + startTestServer(server) - err := server.AddTopicEventHandler(sub1, eventHandlerWithRetryError) - require.NoError(t, err) + t.Run("topic event for retry error", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub1.Topic, + PubsubName: sub1.PubsubName, + } + resp, err := server.OnTopicEvent(ctx, in) + require.Error(t, err) + assert.Equal(t, runtime.TopicEventResponse_RETRY, resp.GetStatus()) + }) - err = server.AddTopicEventHandler(sub2, eventHandlerWithError) - require.NoError(t, err) + t.Run("topic event for error", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + } + resp, err := server.OnTopicEvent(ctx, in) + require.NoError(t, err) + assert.Equal(t, runtime.TopicEventResponse_DROP, resp.GetStatus()) + }) - startTestServer(server) + stopTestServer(t, server) + }) + t.Run("With bulk event handling", func(t *testing.T) { + ctx := context.Background() - t.Run("topic event for retry error", func(t *testing.T) { - in := &runtime.TopicEventRequest{ - Id: "a123", - Source: "test", - Type: "test", - SpecVersion: "v1.0", - DataContentType: "text/plain", - Data: []byte("test"), - Topic: sub1.Topic, - PubsubName: sub1.PubsubName, + sub1 := &common.Subscription{ + PubsubName: "messages", + Topic: "test1", } - resp, err := server.OnTopicEvent(ctx, in) - require.Error(t, err) - assert.Equal(t, runtime.TopicEventResponse_RETRY, resp.GetStatus()) - }) - t.Run("topic event for error", func(t *testing.T) { - in := &runtime.TopicEventRequest{ - Id: "a123", - Source: "test", - Type: "test", - SpecVersion: "v1.0", - DataContentType: "text/plain", - Data: []byte("test"), - Topic: sub2.Topic, - PubsubName: sub2.PubsubName, + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", } - resp, err := server.OnTopicEvent(ctx, in) + server := getTestServer() + + err := server.AddBulkTopicEventHandler(sub1, eventHandlerWithRetryError, 10, 1000) require.NoError(t, err) - assert.Equal(t, runtime.TopicEventResponse_DROP, resp.GetStatus()) - }) - stopTestServer(t, server) + err = server.AddBulkTopicEventHandler(sub2, eventHandlerWithError, 10, 1000) + require.NoError(t, err) + + startTestServer(server) + + t.Run("topic event for retry error", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub1.Topic, + PubsubName: sub1.PubsubName, + } + resp, err := server.OnTopicEvent(ctx, in) + require.Error(t, err) + assert.Equal(t, runtime.TopicEventResponse_RETRY, resp.GetStatus()) + }) + + t.Run("topic event for error", func(t *testing.T) { + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + } + resp, err := server.OnTopicEvent(ctx, in) + require.NoError(t, err) + assert.Equal(t, runtime.TopicEventResponse_DROP, resp.GetStatus()) + }) + + stopTestServer(t, server) + }) } func eventHandler(ctx context.Context, event *common.TopicEvent) (retry bool, err error) { @@ -269,77 +459,152 @@ func eventHandlerWithError(ctx context.Context, event *common.TopicEvent) (retry } func TestEventDataHandling(t *testing.T) { - ctx := context.Background() - - tests := map[string]struct { - contentType string - data string - value interface{} - }{ - "JSON bytes": { - contentType: "application/json", - data: `{"message":"hello"}`, - value: map[string]interface{}{ - "message": "hello", + t.Run("With single event handling", func(t *testing.T) { + ctx := context.Background() + tests := map[string]struct { + contentType string + data string + value interface{} + }{ + "JSON bytes": { + contentType: "application/json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, }, - }, - "JSON entension media type bytes": { - contentType: "application/extension+json", - data: `{"message":"hello"}`, - value: map[string]interface{}{ - "message": "hello", + "JSON entension media type bytes": { + contentType: "application/extension+json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, }, - }, - "Test": { - contentType: "text/plain", - data: `message = hello`, - value: `message = hello`, - }, - "Other": { - contentType: "application/octet-stream", - data: `message = hello`, - value: []byte(`message = hello`), - }, - } + "Test": { + contentType: "text/plain", + data: `message = hello`, + value: `message = hello`, + }, + "Other": { + contentType: "application/octet-stream", + data: `message = hello`, + value: []byte(`message = hello`), + }, + } - s := getTestServer() + s := getTestServer() - sub := &common.Subscription{ - PubsubName: "messages", - Topic: "test", - Route: "/test", - Metadata: map[string]string{}, - } + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } - recv := make(chan struct{}, 1) - var topicEvent *common.TopicEvent - handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { - topicEvent = e - recv <- struct{}{} + recv := make(chan struct{}, 1) + var topicEvent *common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvent = e + recv <- struct{}{} - return false, nil - } - err := s.AddTopicEventHandler(sub, handler) - require.NoErrorf(t, err, "error adding event handler") + return false, nil + } + err := s.AddTopicEventHandler(sub, handler) + require.NoError(t, err, "error adding event handler") + + startTestServer(s) + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + in := runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: tt.contentType, + Data: []byte(tt.data), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + } + + s.OnTopicEvent(ctx, &in) + <-recv + assert.Equal(t, tt.value, topicEvent.Data) + }) + } + }) + t.Run("With bulk event handling", func(t *testing.T) { + ctx := context.Background() + tests := map[string]struct { + contentType string + data string + value interface{} + }{ + "JSON bytes": { + contentType: "application/json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, + }, + "JSON entension media type bytes": { + contentType: "application/extension+json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, + }, + "Test": { + contentType: "text/plain", + data: `message = hello`, + value: `message = hello`, + }, + "Other": { + contentType: "application/octet-stream", + data: `message = hello`, + value: []byte(`message = hello`), + }, + } - startTestServer(s) + s := getTestServer() - for name, tt := range tests { - t.Run(name, func(t *testing.T) { - in := runtime.TopicEventRequest{ - Id: "a123", - Source: "test", - Type: "test", - SpecVersion: "v1.0", - DataContentType: tt.contentType, - Data: []byte(tt.data), - Topic: sub.Topic, - PubsubName: sub.PubsubName, - } + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } - s.OnTopicEvent(ctx, &in) - <-recv - assert.Equal(t, tt.value, topicEvent.Data) - }) - } + recv := make(chan struct{}, 1) + var topicEvent *common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvent = e + recv <- struct{}{} + + return false, nil + } + err := s.AddBulkTopicEventHandler(sub, handler, 10, 1000) + require.NoError(t, err, "error adding event handler") + + startTestServer(s) + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + in := runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: tt.contentType, + Data: []byte(tt.data), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + } + + s.OnTopicEvent(ctx, &in) + <-recv + assert.Equal(t, tt.value, topicEvent.Data) + }) + } + }) } diff --git a/service/http/topic.go b/service/http/topic.go index fa6a85e5..64bee949 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -114,6 +114,29 @@ func (in topicEventJSON) getData() (data any, rawData []byte) { return data, rawData } +type AppResponseStatus string + +const ( + // Success means the message is received and processed correctly. + Success AppResponseStatus = "SUCCESS" + // Retry means the message is received but could not be processed and must be retried. + Retry AppResponseStatus = "RETRY" + // Drop means the message is received but should not be processed. + Drop AppResponseStatus = "DROP" +) + +type BulkSubscribeResponseEntry struct { + // The id of the bulk subscribe entry + EntryId string `json:"entryId"` //nolint:stylecheck + + // The response status of the bulk subscribe entry + Status AppResponseStatus `json:"status"` +} + +type BulkSubscribeResponse struct { + Statuses []BulkSubscribeResponseEntry `json:"statuses"` +} + func (s *Server) registerBaseHandler() { // register subscribe handler f := func(w http.ResponseWriter, r *http.Request) { @@ -227,7 +250,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE // Route is only required for HTTP but should be specified for the // app protocol to be interchangeable. if sub.Route == "" { - return errors.New("handler route name") + return errors.New("missing route for this subscription") } if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil { return err @@ -303,6 +326,119 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE return nil } +func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { + if sub == nil { + return errors.New("subscription required") + } + // Route is only required for HTTP but should be specified for the + // app protocol to be interchangeable. + if sub.Route == "" { + return errors.New("missing route for bulk subscription") + } + if err := s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs); err != nil { + return err + } + + s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + // check for post with no data + var ( + body []byte + err error + ) + if r.Body != nil { + body, err = io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), PubSubHandlerDropStatusCode) + return + } + } + if len(body) == 0 { + http.Error(w, "nil content", PubSubHandlerDropStatusCode) + return + } + + // deserialize the event + var ins internal.BulkSubscribeEnvelope + if err = json.Unmarshal(body, &ins); err != nil { + http.Error(w, err.Error(), PubSubHandlerDropStatusCode) + return + } + + statuses := make([]BulkSubscribeResponseEntry, 0, len(ins.Entries)) + + for _, entry := range ins.Entries { + itemJSON, entryErr := json.Marshal(entry.Event) + if entryErr != nil { + http.Error(w, entryErr.Error(), PubSubHandlerDropStatusCode) + return + } + var in topicEventJSON + + if err = json.Unmarshal(itemJSON, &in); err != nil { + http.Error(w, err.Error(), PubSubHandlerDropStatusCode) + return + } + if in.PubsubName == "" { + in.Topic = sub.PubsubName + } + if in.Topic == "" { + in.Topic = sub.Topic + } + data, rawData := in.getData() + + te := common.TopicEvent{ + ID: in.ID, + SpecVersion: in.SpecVersion, + Type: in.Type, + Source: in.Source, + DataContentType: in.DataContentType, + Data: data, + RawData: rawData, + DataBase64: in.DataBase64, + Subject: in.Subject, + PubsubName: in.PubsubName, + Topic: in.Topic, + } + + retry, funcErr := fn(r.Context(), &te) + if funcErr == nil { + statuses = append(statuses, BulkSubscribeResponseEntry{ + EntryId: entry.EntryId, + Status: Success, + }, + ) + } else if retry { + statuses = append(statuses, BulkSubscribeResponseEntry{ + EntryId: entry.EntryId, + Status: Retry, + }, + ) + } else { + statuses = append(statuses, BulkSubscribeResponseEntry{ + EntryId: entry.EntryId, + Status: Drop, + }, + ) + } + } + + resp := BulkSubscribeResponse{ + Statuses: statuses, + } + if err != nil { + http.Error(w, err.Error(), PubSubHandlerDropStatusCode) + return + } + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + writeBulkStatus(w, resp) + }))) + + return nil +} + func getCustomMetdataFromHeaders(r *http.Request) map[string]string { md := make(map[string]string) for k, v := range r.Header { @@ -319,3 +455,9 @@ func writeStatus(w http.ResponseWriter, s string) { http.Error(w, err.Error(), PubSubHandlerRetryStatusCode) } } + +func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) { + if err := json.NewEncoder(w).Encode(s); err != nil { + http.Error(w, err.Error(), PubSubHandlerRetryStatusCode) + } +} diff --git a/service/http/topic_test.go b/service/http/topic_test.go index 596b67fc..64136ccf 100644 --- a/service/http/topic_test.go +++ b/service/http/topic_test.go @@ -49,18 +49,117 @@ func testErrorTopicFunc(ctx context.Context, e *common.TopicEvent) (retry bool, } func TestEventNilHandler(t *testing.T) { + t.Run("With single event handling", func(t *testing.T) { + s := newServer("", nil) + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/", + Metadata: map[string]string{}, + } + err := s.AddTopicEventHandler(sub, nil) + require.Error(t, err, "expected error adding event handler") + }) + t.Run("With bulk event handling", func(t *testing.T) { + s := newServer("", nil) + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/", + Metadata: map[string]string{}, + } + err := s.AddBulkTopicEventHandler(sub, nil, 10, 1000) + require.Error(t, err, "expected error adding event handler") + }) +} + +func TestEventHandler(t *testing.T) { + data := `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }` + s := newServer("", nil) + sub := &common.Subscription{ PubsubName: "messages", Topic: "test", Route: "/", Metadata: map[string]string{}, } - err := s.AddTopicEventHandler(sub, nil) - require.Errorf(t, err, "expected error adding event handler") + err := s.AddTopicEventHandler(sub, testTopicFunc) + require.NoError(t, err, "error adding event handler") + + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "errors", + Route: "/errors", + Metadata: map[string]string{}, + } + err = s.AddTopicEventHandler(sub2, testErrorTopicFunc) + require.NoError(t, err, "error adding error event handler") + + sub3 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/other", + Match: `event.type == "other"`, + Priority: 1, + } + err = s.AddTopicEventHandler(sub3, testTopicFunc) + require.NoError(t, err, "error adding error event handler") + + s.registerBaseHandler() + + req, err := http.NewRequest(http.MethodGet, "/dapr/subscribe", nil) + require.NoErrorf(t, err, "error creating request: %s", data) + req.Header.Set("Accept", "application/json") + rr := httptest.NewRecorder() + s.mux.ServeHTTP(rr, req) + resp := rr.Result() + defer resp.Body.Close() + payload, err := io.ReadAll(resp.Body) + require.NoError(t, err, "error reading response") + var subs []internal.TopicSubscription + require.NoError(t, json.Unmarshal(payload, &subs), "could not decode subscribe response") + + sort.Slice(subs, func(i, j int) bool { + less := strings.Compare(subs[i].PubsubName, subs[j].PubsubName) + if less != 0 { + return less < 0 + } + return strings.Compare(subs[i].Topic, subs[j].Topic) <= 0 + }) + + if assert.Len(t, subs, 2, "unexpected subscription count") { + assert.Equal(t, "messages", subs[0].PubsubName) + assert.Equal(t, "errors", subs[0].Topic) + + assert.Equal(t, "messages", subs[1].PubsubName) + assert.Equal(t, "test", subs[1].Topic) + assert.Equal(t, "", subs[1].Route) + assert.Equal(t, "/", subs[1].Routes.Default) + if assert.Len(t, subs[1].Routes.Rules, 1, "unexpected rules count") { + assert.Equal(t, `event.type == "other"`, subs[1].Routes.Rules[0].Match) + assert.Equal(t, "/other", subs[1].Routes.Rules[0].Path) + } + } + + makeEventRequest(t, s, "/", data, http.StatusOK) + makeEventRequest(t, s, "/", "", http.StatusSeeOther) + makeEventRequest(t, s, "/", "not JSON", http.StatusSeeOther) + makeEventRequest(t, s, "/errors", data, http.StatusOK) } -func TestEventHandler(t *testing.T) { +func TestBulkEventHandler(t *testing.T) { data := `{ "specversion" : "1.0", "type" : "com.github.pull.create", @@ -82,8 +181,8 @@ func TestEventHandler(t *testing.T) { Route: "/", Metadata: map[string]string{}, } - err := s.AddTopicEventHandler(sub, testTopicFunc) - require.NoErrorf(t, err, "error adding event handler") + err := s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) + require.NoError(t, err, "error adding event handler") sub2 := &common.Subscription{ PubsubName: "messages", @@ -91,8 +190,8 @@ func TestEventHandler(t *testing.T) { Route: "/errors", Metadata: map[string]string{}, } - err = s.AddTopicEventHandler(sub2, testErrorTopicFunc) - require.NoErrorf(t, err, "error adding error event handler") + err = s.AddBulkTopicEventHandler(sub2, testErrorTopicFunc, 10, 1000) + require.NoError(t, err, "error adding error event handler") sub3 := &common.Subscription{ PubsubName: "messages", @@ -101,8 +200,8 @@ func TestEventHandler(t *testing.T) { Match: `event.type == "other"`, Priority: 1, } - err = s.AddTopicEventHandler(sub3, testTopicFunc) - require.NoErrorf(t, err, "error adding error event handler") + err = s.AddBulkTopicEventHandler(sub3, testTopicFunc, 10, 1000) + require.NoError(t, err, "error adding error event handler") s.registerBaseHandler() @@ -114,9 +213,9 @@ func TestEventHandler(t *testing.T) { resp := rr.Result() defer resp.Body.Close() payload, err := io.ReadAll(resp.Body) - require.NoErrorf(t, err, "error reading response") + require.NoError(t, err, "error reading response") var subs []internal.TopicSubscription - require.NoErrorf(t, json.Unmarshal(payload, &subs), "could not decode subscribe response") + require.NoError(t, json.Unmarshal(payload, &subs), "could not decode subscribe response") sort.Slice(subs, func(i, j int) bool { less := strings.Compare(subs[i].PubsubName, subs[j].PubsubName) @@ -126,7 +225,7 @@ func TestEventHandler(t *testing.T) { return strings.Compare(subs[i].Topic, subs[j].Topic) <= 0 }) - if assert.Lenf(t, subs, 2, "unexpected subscription count") { + if assert.Len(t, subs, 2, "unexpected subscription count") { assert.Equal(t, "messages", subs[0].PubsubName) assert.Equal(t, "errors", subs[0].Topic) @@ -134,7 +233,7 @@ func TestEventHandler(t *testing.T) { assert.Equal(t, "test", subs[1].Topic) assert.Equal(t, "", subs[1].Route) assert.Equal(t, "/", subs[1].Routes.Default) - if assert.Lenf(t, subs[1].Routes.Rules, 1, "unexpected rules count") { + if assert.Len(t, subs[1].Routes.Rules, 1, "unexpected rules count") { assert.Equal(t, `event.type == "other"`, subs[1].Routes.Rules[0].Match) assert.Equal(t, "/other", subs[1].Routes.Rules[0].Path) } @@ -256,7 +355,7 @@ func TestEventDataHandling(t *testing.T) { return false, nil } err := s.AddTopicEventHandler(sub, handler) - require.NoErrorf(t, err, "error adding event handler") + require.NoError(t, err, "error adding event handler") s.registerBaseHandler() @@ -347,6 +446,176 @@ func TestEventMetadataHandling(t *testing.T) { } } +func TestBulkEventDataHandling(t *testing.T) { + tests := map[string]struct { + data string + results []interface{} + }{ + "JSON Values": { + data: `{ + "id": "unique_id", + "entries": [ + { + "entryId": "entry_id_1", + "event": { + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : { + "message":"hello" + } + }, + "metadata": { + "meta_key_1": "meta_value_1", + "meta_key_2": "meta_value_2" + }, + "contentType": "application/json" + }, + { + "entryId": "entry_id_2", + "event": { + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }, + "metadata": { + "meta_key_3": "meta_value_3", + "meta_key_4": "meta_value_4" + }, + "contentType": "application/json" + }, + { + "entryId": "entry_id_2", + "event": { + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }, + "metadata": { + "meta_key_3": "meta_value_3", + "meta_key_4": "meta_value_4" + }, + "contentType": "application/json" + }, + { + "entryId": "entry_id_3", + "event": { + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/octet-stream", + "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }, + "metadata": { + "meta_key_3": "meta_value_3", + "meta_key_4": "meta_value_4" + }, + "contentType": "application/json" + }, + { + "entryId": "entry_id_4", + "event": { + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : "{\"message\":\"hello\"}" + }, + "metadata": { + "meta_key_3": "meta_value_3", + "meta_key_4": "meta_value_4" + }, + "contentType": "application/json" + } + ], + "metadata": { + "envelope_key_1": "envelope_value_1", + "envelope_key_2": "envelope_value_2" + }, + "topic": "overall_topic", + "pubsub": "overall_pubsub", + "eventType": "overall_eventType" + }`, + results: []interface{}{ + map[string]interface{}{ + "message": "hello", + }, + map[string]interface{}{ + "message": "hello", + }, + map[string]interface{}{ + "message": "hello", + }, + []byte(`{"message":"hello"}`), + map[string]interface{}{ + "message": "hello", + }, + }, + }, + } + + s := newServer("", nil) + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } + + recv := make(chan struct{}, 5) + var topicEvents []common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvents = append(topicEvents, *e) + recv <- struct{}{} + return false, nil + } + err := s.AddBulkTopicEventHandler(sub, handler, 5, 1000) + require.NoError(t, err, "error adding event handler") + + s.registerBaseHandler() + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + makeEventRequest(t, s, "/test", tt.data, http.StatusOK) + <-recv + for i, tdata := range topicEvents { + assert.Equal(t, tt.results[i], tdata.Data) + } + }) + } +} + func TestHealthCheck(t *testing.T) { s := newServer("", nil) s.registerBaseHandler() @@ -450,19 +719,37 @@ func makeEventRequestWithMetadata(t *testing.T, s *Server, route, data string, e func TestAddingInvalidEventHandlers(t *testing.T) { s := newServer("", nil) err := s.AddTopicEventHandler(nil, testTopicFunc) - require.Errorf(t, err, "expected error adding no sub event handler") + require.Error(t, err, "expected error adding no sub event handler") sub := &common.Subscription{Metadata: map[string]string{}} err = s.AddTopicEventHandler(sub, testTopicFunc) - require.Errorf(t, err, "expected error adding empty sub event handler") + require.Error(t, err, "expected error adding empty sub event handler") sub.Topic = "test" err = s.AddTopicEventHandler(sub, testTopicFunc) - require.Errorf(t, err, "expected error adding sub without component event handler") + require.Error(t, err, "expected error adding sub without component event handler") sub.PubsubName = "messages" err = s.AddTopicEventHandler(sub, testTopicFunc) - require.Errorf(t, err, "expected error adding sub without route event handler") + require.Error(t, err, "expected error adding sub without route event handler") +} + +func TestAddingInvalidBulkEventHandlers(t *testing.T) { + s := newServer("", nil) + err := s.AddBulkTopicEventHandler(nil, testTopicFunc, 10, 1000) + require.Error(t, err, "expected error adding no sub event handler") + + sub := &common.Subscription{Metadata: map[string]string{}} + err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) + require.Error(t, err, "expected error adding empty sub event handler") + + sub.Topic = "test" + err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) + require.Error(t, err, "expected error adding sub without component event handler") + + sub.PubsubName = "messages" + err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000) + require.Error(t, err, "expected error adding sub without route event handler") } func TestRawPayloadDecode(t *testing.T) { @@ -474,7 +761,7 @@ func TestRawPayloadDecode(t *testing.T) { err = errors.New("error decode data_base64") } if err != nil { - require.NoErrorf(t, err, "error rawPayload decode") + require.NoError(t, err, "error rawPayload decode") } return } @@ -484,19 +771,38 @@ func TestRawPayloadDecode(t *testing.T) { "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" }` - s := newServer("", nil) + t.Run("With single event handling", func(t *testing.T) { + s := newServer("", nil) - sub3 := &common.Subscription{ - PubsubName: "messages", - Topic: "testRaw", - Route: "/raw", - Metadata: map[string]string{ - "rawPayload": "true", - }, - } - err := s.AddTopicEventHandler(sub3, testRawTopicFunc) - require.NoErrorf(t, err, "error adding raw event handler") + sub3 := &common.Subscription{ + PubsubName: "messages", + Topic: "testRaw", + Route: "/raw", + Metadata: map[string]string{ + "rawPayload": "true", + }, + } + err := s.AddTopicEventHandler(sub3, testRawTopicFunc) + require.NoError(t, err, "error adding raw event handler") - s.registerBaseHandler() - makeEventRequest(t, s, "/raw", rawData, http.StatusOK) + s.registerBaseHandler() + makeEventRequest(t, s, "/raw", rawData, http.StatusOK) + }) + t.Run("With bulk event handling", func(t *testing.T) { + s := newServer("", nil) + + sub3 := &common.Subscription{ + PubsubName: "messages", + Topic: "testRaw", + Route: "/raw", + Metadata: map[string]string{ + "rawPayload": "true", + }, + } + err := s.AddBulkTopicEventHandler(sub3, testRawTopicFunc, 10, 1000) + require.NoError(t, err, "error adding raw event handler") + + s.registerBaseHandler() + makeEventRequest(t, s, "/raw", rawData, http.StatusOK) + }) } diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go index ec4efb9e..8294c6b2 100644 --- a/service/internal/topicregistrar.go +++ b/service/internal/topicregistrar.go @@ -62,3 +62,51 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi return nil } + +func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { + if sub.Topic == "" { + return errors.New("topic name required") + } + if sub.PubsubName == "" { + return errors.New("pub/sub name required") + } + if fn == nil { + return fmt.Errorf("topic handler required") + } + + var key string + if !sub.DisableTopicValidation { + key = sub.PubsubName + "-" + sub.Topic + } else { + key = sub.PubsubName + } + + ts, ok := m[key] + if !ok { + ts = &TopicRegistration{ + Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic), + RouteHandlers: make(map[string]common.TopicEventHandler), + DefaultHandler: nil, + } + ts.Subscription.SetMetadata(sub.Metadata) + m[key] = ts + } + + ts.Subscription.SetBulkSubscribe(maxMessagesCount, maxAwaitDurationMs) + + if sub.Match != "" { + if err := ts.Subscription.AddRoutingRule(sub.Route, sub.Match, sub.Priority); err != nil { + return err + } + } else { + if err := ts.Subscription.SetDefaultRoute(sub.Route); err != nil { + return err + } + + ts.DefaultHandler = fn + } + + ts.RouteHandlers[sub.Route] = fn + + return nil +} diff --git a/service/internal/topicregistrar_test.go b/service/internal/topicregistrar_test.go index 64be563c..a2a3e517 100644 --- a/service/internal/topicregistrar_test.go +++ b/service/internal/topicregistrar_test.go @@ -4,9 +4,8 @@ import ( "context" "testing" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/dapr/go-sdk/service/common" "github.com/dapr/go-sdk/service/internal" @@ -62,37 +61,71 @@ func TestTopicRegistrarValidation(t *testing.T) { }, fn, "", }, } - for name, tt := range tests { - tt := tt // dereference loop var - t.Run(name, func(t *testing.T) { - m := internal.TopicRegistrar{} - if tt.err != "" { - require.EqualError(t, m.AddSubscription(&tt.sub, tests[name].fn), tt.err) - } else { - require.NoError(t, m.AddSubscription(&tt.sub, tt.fn)) - } - }) - } + t.Run("with subscription", func(t *testing.T) { + for name, tt := range tests { + tt := tt // dereference loop var + t.Run(name, func(t *testing.T) { + m := internal.TopicRegistrar{} + if tt.err != "" { + require.EqualError(t, m.AddSubscription(&tt.sub, tests[name].fn), tt.err) + } else { + require.NoError(t, m.AddSubscription(&tt.sub, tt.fn)) + } + }) + } + }) + t.Run("with bulk subscription", func(t *testing.T) { + for name, tt := range tests { + tt := tt // dereference loop var + t.Run(name, func(t *testing.T) { + m := internal.TopicRegistrar{} + if tt.err != "" { + require.EqualError(t, m.AddBulkSubscription(&tt.sub, tests[name].fn, 10, 1000), tt.err) + } else { + require.NoError(t, m.AddBulkSubscription(&tt.sub, tt.fn, 10, 1000)) + } + }) + } + }) } func TestTopicAddSubscriptionMetadata(t *testing.T) { handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { return false, nil } - topicRegistrar := internal.TopicRegistrar{} sub := &common.Subscription{ PubsubName: "pubsubname", Topic: "topic", Metadata: map[string]string{"key": "value"}, } - require.NoError(t, topicRegistrar.AddSubscription(sub, handler)) + t.Run("with subscription", func(t *testing.T) { + topicRegistrar := internal.TopicRegistrar{} + require.NoError(t, topicRegistrar.AddSubscription(sub, handler)) - actual := topicRegistrar["pubsubname-topic"].Subscription - expected := &internal.TopicSubscription{ - PubsubName: sub.PubsubName, - Topic: sub.Topic, - Metadata: sub.Metadata, - } - assert.Equal(t, expected, actual) + actual := topicRegistrar["pubsubname-topic"].Subscription + expected := &internal.TopicSubscription{ + PubsubName: sub.PubsubName, + Topic: sub.Topic, + Metadata: sub.Metadata, + } + assert.Equal(t, expected, actual) + }) + t.Run("with bulk subscription", func(t *testing.T) { + topicRegistrar := internal.TopicRegistrar{} + require.NoError(t, topicRegistrar.AddBulkSubscription(sub, handler, 10, 1000)) + + actual := topicRegistrar["pubsubname-topic"].Subscription + expected := &internal.TopicSubscription{ + PubsubName: sub.PubsubName, + Topic: sub.Topic, + Metadata: sub.Metadata, + BulkSubscribe: &internal.BulkSubscribeOptions{ + Enabled: true, + MaxMessagesCount: 10, + MaxAwaitDurationMs: 1000, + }, + } + assert.Equal(t, expected, actual) + }) } diff --git a/service/internal/topicsubscription.go b/service/internal/topicsubscription.go index ca628993..c40a368b 100644 --- a/service/internal/topicsubscription.go +++ b/service/internal/topicsubscription.go @@ -18,6 +18,30 @@ type TopicSubscription struct { Routes *TopicRoutes `json:"routes,omitempty"` // Metadata is the subscription metadata. Metadata map[string]string `json:"metadata,omitempty"` + // options for customizing the bulksubscribe behaviour + BulkSubscribe *BulkSubscribeOptions `json:"bulkSubscribe,omitempty"` +} + +type BulkSubscribeMessageItem struct { + EntryId string `json:"entryId"` //nolint:stylecheck + Event interface{} `json:"event"` + Metadata map[string]string `json:"metadata"` + ContentType string `json:"contentType,omitempty"` +} + +type BulkSubscribeEnvelope struct { + ID string `json:"id"` + Entries []BulkSubscribeMessageItem `json:"entries"` + Metadata map[string]string `json:"metadata"` + Topic string `json:"topic"` + Pubsub string `json:"pubsub"` + EventType string `json:"eventType"` +} + +type BulkSubscribeOptions struct { + Enabled bool `json:"enabled"` + MaxMessagesCount int32 `json:"maxMessagesCount,omitempty"` + MaxAwaitDurationMs int32 `json:"maxAwaitDurationMs,omitempty"` } // TopicRoutes encapsulates the default route and multiple routing rules. @@ -60,6 +84,19 @@ func (s *TopicSubscription) SetMetadata(metadata map[string]string) error { return nil } +func (s *TopicSubscription) SetBulkSubscribe(maxMessagesCount, maxAwaitDurationMs int32) error { + if s.BulkSubscribe != nil { + return fmt.Errorf("subscription for topic %s on pubsub %s already has bulkSubscribe set", s.Topic, s.PubsubName) + } + s.BulkSubscribe = &BulkSubscribeOptions{ + Enabled: true, + MaxMessagesCount: maxMessagesCount, + MaxAwaitDurationMs: maxAwaitDurationMs, + } + + return nil +} + // SetDefaultRoute sets the default route if not already set. // An error is returned if it is already set. func (s *TopicSubscription) SetDefaultRoute(path string) error { diff --git a/service/internal/topicsubscription_test.go b/service/internal/topicsubscription_test.go index 3762b36a..45f17118 100644 --- a/service/internal/topicsubscription_test.go +++ b/service/internal/topicsubscription_test.go @@ -71,4 +71,12 @@ func TestTopicSubscripiton(t *testing.T) { assert.Equal(t, `event.type == "100"`, sub.Routes.Rules[2].Match) } }) + + t.Run("enabling bulk subscription", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + require.NoError(t, sub.SetBulkSubscribe(10, 1000)) + assert.True(t, sub.BulkSubscribe.Enabled) + assert.Equal(t, int32(10), sub.BulkSubscribe.MaxMessagesCount) + assert.Equal(t, int32(1000), sub.BulkSubscribe.MaxAwaitDurationMs) + }) }