From d310732ca88b7e447b96a5bb4ce8221eeec48ae7 Mon Sep 17 00:00:00 2001 From: Michael Collins Date: Sun, 8 Dec 2024 20:24:58 -0700 Subject: [PATCH 1/2] Introduce interface for topic event subscriber I introduced the TopicEventSubscriber interface to allow for subscribers to be implemented as structs. This will allow subscribers to be initialized with any settings or dependencies needed to process incoming events. I converted TopicEventHandler to implement TopicEventSubscriber. I revised TopicRegistrar to store TopicEventSubscribers instead of TopicEventHandlers. Resolves #660 Signed-off-by: Michael Collins --- service/common/service.go | 21 ++++++++++++++++++--- service/grpc/topic.go | 13 +++++++++++-- service/http/topic.go | 13 +++++++++++-- service/internal/topicregistrar.go | 8 ++++---- service/internal/topicregistrar_test.go | 10 +++++----- 5 files changed, 49 insertions(+), 16 deletions(-) diff --git a/service/common/service.go b/service/common/service.go index c95a919d..122d77d1 100644 --- a/service/common/service.go +++ b/service/common/service.go @@ -27,18 +27,23 @@ const ( ) // Service represents Dapr callback service. +// +//nolint:interfacebloat type Service interface { // AddHealthCheckHandler sets a health check handler, name: http (router) and grpc (invalid). AddHealthCheckHandler(name string, fn HealthCheckHandler) error // AddServiceInvocationHandler appends provided service invocation handler with its name to the service. AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error // AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service. - // Note, retries are only considered when there is an error. Lack of error is considered as a success + // Note, retries are only considered when there is an error. Lack of error is considered as a success. AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error + // AddTopicEventSubscriber appends the provided subscriber with its topic and optional metadata to the service. + // Note, retries are only considered when there is an error. Lack of error is considered as a success. + AddTopicEventSubscriber(sub *Subscription, subscriber TopicEventSubscriber) error // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error // RegisterActorImplFactory Register a new actor to actor runtime of go sdk - // Deprecated: use RegisterActorImplFactoryContext instead + // Deprecated: use RegisterActorImplFactoryContext instead. RegisterActorImplFactory(f actor.Factory, opts ...config.Option) // RegisterActorImplFactoryContext Register a new actor to actor runtime of go sdk RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option) @@ -49,7 +54,7 @@ type Service interface { Start() error // Stop stops the previously started service. Stop() error - // Gracefully stops the previous started service + // Gracefully stops the previous started service. GracefulStop() error } @@ -60,3 +65,13 @@ type ( JobEventHandler func(ctx context.Context, in *JobEvent) error HealthCheckHandler func(context.Context) error ) + +type TopicEventSubscriber interface { + Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) +} + +// Handle converts TopicEventHandler into an adapter that implements +// TopicEventSubscriber. +func (h TopicEventHandler) Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) { + return h(ctx, e) +} diff --git a/service/grpc/topic.go b/service/grpc/topic.go index 2928b235..14f3ee06 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -31,11 +31,20 @@ import ( // AddTopicEventHandler appends provided event handler with topic name to the service. func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { + if fn == nil { + return errors.New("topic handler required") + } + + return s.AddTopicEventSubscriber(sub, fn) +} + +// AddTopicEventSubscriber appends the provided subscriber to the service. +func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error { if sub == nil { return errors.New("subscription required") } - return s.topicRegistrar.AddSubscription(sub, fn) + return s.topicRegistrar.AddSubscription(sub, subscriber) } // ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. @@ -142,7 +151,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq in.GetPath(), in.GetPubsubName(), in.GetTopic(), ) } - retry, err := h(ctx, e) + retry, err := h.Handle(ctx, e) if err == nil { return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS}, nil } diff --git a/service/http/topic.go b/service/http/topic.go index 8cb87a69..2d321897 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -241,6 +241,15 @@ func (s *Server) registerBaseHandler() { // AddTopicEventHandler appends provided event handler with it's name to the service. func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { + if fn == nil { + return errors.New("topic handler required") + } + + return s.AddTopicEventSubscriber(sub, fn) +} + +// AddTopicEventSubscriber appends the provided subscriber to the service. +func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error { if sub == nil { return errors.New("subscription required") } @@ -249,7 +258,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE if sub.Route == "" { return errors.New("handler route name") } - if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil { + if err := s.topicRegistrar.AddSubscription(sub, subscriber); err != nil { return err } @@ -306,7 +315,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE w.WriteHeader(http.StatusOK) // execute user handler - retry, err := fn(r.Context(), &te) + retry, err := subscriber.Handle(r.Context(), &te) if err == nil { writeStatus(w, common.SubscriptionResponseStatusSuccess) return diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go index a443af0d..1ff82626 100644 --- a/service/internal/topicregistrar.go +++ b/service/internal/topicregistrar.go @@ -14,11 +14,11 @@ type TopicRegistrar map[string]*TopicRegistration // TopicRegistration encapsulates the subscription and handlers. type TopicRegistration struct { Subscription *TopicSubscription - DefaultHandler common.TopicEventHandler - RouteHandlers map[string]common.TopicEventHandler + DefaultHandler common.TopicEventSubscriber + RouteHandlers map[string]common.TopicEventSubscriber } -func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error { +func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventSubscriber) error { if sub.Topic == "" { return errors.New("topic name required") } @@ -40,7 +40,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi if !ok { ts = &TopicRegistration{ Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic), - RouteHandlers: make(map[string]common.TopicEventHandler), + RouteHandlers: make(map[string]common.TopicEventSubscriber), DefaultHandler: nil, } ts.Subscription.SetMetadata(sub.Metadata) diff --git a/service/internal/topicregistrar_test.go b/service/internal/topicregistrar_test.go index cb5d4fb0..769e36dc 100644 --- a/service/internal/topicregistrar_test.go +++ b/service/internal/topicregistrar_test.go @@ -13,12 +13,12 @@ import ( ) func TestTopicRegistrarValidation(t *testing.T) { - fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + fn := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { return false, nil - } + }) tests := map[string]struct { sub common.Subscription - fn common.TopicEventHandler + fn common.TopicEventSubscriber err string }{ "pubsub required": { @@ -75,9 +75,9 @@ func TestTopicRegistrarValidation(t *testing.T) { } func TestTopicAddSubscriptionMetadata(t *testing.T) { - handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + handler := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { return false, nil - } + }) topicRegistrar := internal.TopicRegistrar{} sub := &common.Subscription{ PubsubName: "pubsubname", From 2528b254570c538a79ed077e0e3fec8068933ce1 Mon Sep 17 00:00:00 2001 From: Michael Collins Date: Sun, 8 Dec 2024 20:52:38 -0700 Subject: [PATCH 2/2] Update go-service documentation I added examples of how to use the TopicEventSubscriber interface to create a new subscriber for both HTTP and gRPC services. Signed-off-by: Michael Collins --- .../en/go-sdk-docs/go-service/grpc-service.md | 29 +++++++++++++++++++ .../en/go-sdk-docs/go-service/http-service.md | 29 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md b/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md index 4ac37c39..33c7909d 100644 --- a/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md +++ b/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md @@ -83,6 +83,35 @@ if err != nil { } ``` +You can also create a custom type that implements the `TopicEventSubscriber` interface to handle your events: + +```go +type EventHandler struct { + // any data or references that your event handler needs. +} + +func (h *EventHandler) Handle(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) + // do something with the event + return true, nil +} +``` + +The `EventHandler` can then be added using the `AddTopicEventSubscriber` method: + +```go +sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", +} +eventHandler := &EventHandler{ +// initialize any fields +} +if err := s.AddTopicEventSubscriber(sub, eventHandler); err != nil { + log.Fatalf("error adding topic subscription: %v", err) +} +``` + ### Service Invocation Handler To handle service invocations you will need to add at least one service invocation handler before starting the service: diff --git a/daprdocs/content/en/go-sdk-docs/go-service/http-service.md b/daprdocs/content/en/go-sdk-docs/go-service/http-service.md index 643f24f2..c73487e0 100644 --- a/daprdocs/content/en/go-sdk-docs/go-service/http-service.md +++ b/daprdocs/content/en/go-sdk-docs/go-service/http-service.md @@ -78,6 +78,35 @@ if err != nil { } ``` +You can also create a custom type that implements the `TopicEventSubscriber` interface to handle your events: + +```go +type EventHandler struct { + // any data or references that your event handler needs. +} + +func (h *EventHandler) Handle(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) + // do something with the event + return true, nil +} +``` + +The `EventHandler` can then be added using the `AddTopicEventSubscriber` method: + +```go +sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", +} +eventHandler := &EventHandler{ +// initialize any fields +} +if err := s.AddTopicEventSubscriber(sub, eventHandler); err != nil { + log.Fatalf("error adding topic subscription: %v", err) +} +``` + ### Service Invocation Handler To handle service invocations you will need to add at least one service invocation handler before starting the service: