diff --git a/service/common/service.go b/service/common/service.go index c95a919d..122d77d1 100644 --- a/service/common/service.go +++ b/service/common/service.go @@ -27,18 +27,23 @@ const ( ) // Service represents Dapr callback service. +// +//nolint:interfacebloat type Service interface { // AddHealthCheckHandler sets a health check handler, name: http (router) and grpc (invalid). AddHealthCheckHandler(name string, fn HealthCheckHandler) error // AddServiceInvocationHandler appends provided service invocation handler with its name to the service. AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error // AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service. - // Note, retries are only considered when there is an error. Lack of error is considered as a success + // Note, retries are only considered when there is an error. Lack of error is considered as a success. AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error + // AddTopicEventSubscriber appends the provided subscriber with its topic and optional metadata to the service. + // Note, retries are only considered when there is an error. Lack of error is considered as a success. + AddTopicEventSubscriber(sub *Subscription, subscriber TopicEventSubscriber) error // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error // RegisterActorImplFactory Register a new actor to actor runtime of go sdk - // Deprecated: use RegisterActorImplFactoryContext instead + // Deprecated: use RegisterActorImplFactoryContext instead. RegisterActorImplFactory(f actor.Factory, opts ...config.Option) // RegisterActorImplFactoryContext Register a new actor to actor runtime of go sdk RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option) @@ -49,7 +54,7 @@ type Service interface { Start() error // Stop stops the previously started service. Stop() error - // Gracefully stops the previous started service + // Gracefully stops the previous started service. GracefulStop() error } @@ -60,3 +65,13 @@ type ( JobEventHandler func(ctx context.Context, in *JobEvent) error HealthCheckHandler func(context.Context) error ) + +type TopicEventSubscriber interface { + Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) +} + +// Handle converts TopicEventHandler into an adapter that implements +// TopicEventSubscriber. +func (h TopicEventHandler) Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) { + return h(ctx, e) +} diff --git a/service/grpc/topic.go b/service/grpc/topic.go index 2928b235..14f3ee06 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -31,11 +31,20 @@ import ( // AddTopicEventHandler appends provided event handler with topic name to the service. func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { + if fn == nil { + return errors.New("topic handler required") + } + + return s.AddTopicEventSubscriber(sub, fn) +} + +// AddTopicEventSubscriber appends the provided subscriber to the service. +func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error { if sub == nil { return errors.New("subscription required") } - return s.topicRegistrar.AddSubscription(sub, fn) + return s.topicRegistrar.AddSubscription(sub, subscriber) } // ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. @@ -142,7 +151,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq in.GetPath(), in.GetPubsubName(), in.GetTopic(), ) } - retry, err := h(ctx, e) + retry, err := h.Handle(ctx, e) if err == nil { return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS}, nil } diff --git a/service/http/topic.go b/service/http/topic.go index 8cb87a69..2d321897 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -241,6 +241,15 @@ func (s *Server) registerBaseHandler() { // AddTopicEventHandler appends provided event handler with it's name to the service. func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { + if fn == nil { + return errors.New("topic handler required") + } + + return s.AddTopicEventSubscriber(sub, fn) +} + +// AddTopicEventSubscriber appends the provided subscriber to the service. +func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error { if sub == nil { return errors.New("subscription required") } @@ -249,7 +258,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE if sub.Route == "" { return errors.New("handler route name") } - if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil { + if err := s.topicRegistrar.AddSubscription(sub, subscriber); err != nil { return err } @@ -306,7 +315,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE w.WriteHeader(http.StatusOK) // execute user handler - retry, err := fn(r.Context(), &te) + retry, err := subscriber.Handle(r.Context(), &te) if err == nil { writeStatus(w, common.SubscriptionResponseStatusSuccess) return diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go index a443af0d..1ff82626 100644 --- a/service/internal/topicregistrar.go +++ b/service/internal/topicregistrar.go @@ -14,11 +14,11 @@ type TopicRegistrar map[string]*TopicRegistration // TopicRegistration encapsulates the subscription and handlers. type TopicRegistration struct { Subscription *TopicSubscription - DefaultHandler common.TopicEventHandler - RouteHandlers map[string]common.TopicEventHandler + DefaultHandler common.TopicEventSubscriber + RouteHandlers map[string]common.TopicEventSubscriber } -func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error { +func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventSubscriber) error { if sub.Topic == "" { return errors.New("topic name required") } @@ -40,7 +40,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi if !ok { ts = &TopicRegistration{ Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic), - RouteHandlers: make(map[string]common.TopicEventHandler), + RouteHandlers: make(map[string]common.TopicEventSubscriber), DefaultHandler: nil, } ts.Subscription.SetMetadata(sub.Metadata) diff --git a/service/internal/topicregistrar_test.go b/service/internal/topicregistrar_test.go index cb5d4fb0..769e36dc 100644 --- a/service/internal/topicregistrar_test.go +++ b/service/internal/topicregistrar_test.go @@ -13,12 +13,12 @@ import ( ) func TestTopicRegistrarValidation(t *testing.T) { - fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + fn := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { return false, nil - } + }) tests := map[string]struct { sub common.Subscription - fn common.TopicEventHandler + fn common.TopicEventSubscriber err string }{ "pubsub required": { @@ -75,9 +75,9 @@ func TestTopicRegistrarValidation(t *testing.T) { } func TestTopicAddSubscriptionMetadata(t *testing.T) { - handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + handler := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { return false, nil - } + }) topicRegistrar := internal.TopicRegistrar{} sub := &common.Subscription{ PubsubName: "pubsubname",