Skip to content

Commit

Permalink
Introduce interface for topic event subscriber
Browse files Browse the repository at this point in the history
I introduced the TopicEventSubscriber interface to allow for
subscribers to be implemented as structs. This will allow subscribers
to be initialized with any settings or dependencies needed to process
incoming events.

I converted TopicEventHandler to implement TopicEventSubscriber. I
revised TopicRegistrar to store TopicEventSubscribers instead of
TopicEventHandlers.

Resolves #660

Signed-off-by: Michael Collins <[email protected]>
  • Loading branch information
mfcollins3 committed Dec 9, 2024
1 parent 921a6a7 commit d310732
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 16 deletions.
21 changes: 18 additions & 3 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@ const (
)

// Service represents Dapr callback service.
//
//nolint:interfacebloat
type Service interface {
// AddHealthCheckHandler sets a health check handler, name: http (router) and grpc (invalid).
AddHealthCheckHandler(name string, fn HealthCheckHandler) error
// AddServiceInvocationHandler appends provided service invocation handler with its name to the service.
AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
// Note, retries are only considered when there is an error. Lack of error is considered as a success.
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddTopicEventSubscriber appends the provided subscriber with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success.
AddTopicEventSubscriber(sub *Subscription, subscriber TopicEventSubscriber) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
// Deprecated: use RegisterActorImplFactoryContext instead
// Deprecated: use RegisterActorImplFactoryContext instead.
RegisterActorImplFactory(f actor.Factory, opts ...config.Option)
// RegisterActorImplFactoryContext Register a new actor to actor runtime of go sdk
RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option)
Expand All @@ -49,7 +54,7 @@ type Service interface {
Start() error
// Stop stops the previously started service.
Stop() error
// Gracefully stops the previous started service
// Gracefully stops the previous started service.
GracefulStop() error
}

Expand All @@ -60,3 +65,13 @@ type (
JobEventHandler func(ctx context.Context, in *JobEvent) error
HealthCheckHandler func(context.Context) error
)

type TopicEventSubscriber interface {
Handle(ctx context.Context, e *TopicEvent) (retry bool, err error)
}

// Handle converts TopicEventHandler into an adapter that implements
// TopicEventSubscriber.
func (h TopicEventHandler) Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) {
return h(ctx, e)
}
13 changes: 11 additions & 2 deletions service/grpc/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ import (

// AddTopicEventHandler appends provided event handler with topic name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if fn == nil {
return errors.New("topic handler required")
}

return s.AddTopicEventSubscriber(sub, fn)
}

// AddTopicEventSubscriber appends the provided subscriber to the service.
func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error {
if sub == nil {
return errors.New("subscription required")
}

return s.topicRegistrar.AddSubscription(sub, fn)
return s.topicRegistrar.AddSubscription(sub, subscriber)
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
Expand Down Expand Up @@ -142,7 +151,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
in.GetPath(), in.GetPubsubName(), in.GetTopic(),
)
}
retry, err := h(ctx, e)
retry, err := h.Handle(ctx, e)
if err == nil {
return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS}, nil
}
Expand Down
13 changes: 11 additions & 2 deletions service/http/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ func (s *Server) registerBaseHandler() {

// AddTopicEventHandler appends provided event handler with it's name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if fn == nil {
return errors.New("topic handler required")
}

return s.AddTopicEventSubscriber(sub, fn)
}

// AddTopicEventSubscriber appends the provided subscriber to the service.
func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error {
if sub == nil {
return errors.New("subscription required")
}
Expand All @@ -249,7 +258,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
if sub.Route == "" {
return errors.New("handler route name")
}
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
if err := s.topicRegistrar.AddSubscription(sub, subscriber); err != nil {
return err
}

Expand Down Expand Up @@ -306,7 +315,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
w.WriteHeader(http.StatusOK)

// execute user handler
retry, err := fn(r.Context(), &te)
retry, err := subscriber.Handle(r.Context(), &te)
if err == nil {
writeStatus(w, common.SubscriptionResponseStatusSuccess)
return
Expand Down
8 changes: 4 additions & 4 deletions service/internal/topicregistrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ type TopicRegistrar map[string]*TopicRegistration
// TopicRegistration encapsulates the subscription and handlers.
type TopicRegistration struct {
Subscription *TopicSubscription
DefaultHandler common.TopicEventHandler
RouteHandlers map[string]common.TopicEventHandler
DefaultHandler common.TopicEventSubscriber
RouteHandlers map[string]common.TopicEventSubscriber
}

func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error {
func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventSubscriber) error {
if sub.Topic == "" {
return errors.New("topic name required")
}
Expand All @@ -40,7 +40,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
if !ok {
ts = &TopicRegistration{
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
RouteHandlers: make(map[string]common.TopicEventHandler),
RouteHandlers: make(map[string]common.TopicEventSubscriber),
DefaultHandler: nil,
}
ts.Subscription.SetMetadata(sub.Metadata)
Expand Down
10 changes: 5 additions & 5 deletions service/internal/topicregistrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
)

func TestTopicRegistrarValidation(t *testing.T) {
fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
fn := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false, nil
}
})
tests := map[string]struct {
sub common.Subscription
fn common.TopicEventHandler
fn common.TopicEventSubscriber
err string
}{
"pubsub required": {
Expand Down Expand Up @@ -75,9 +75,9 @@ func TestTopicRegistrarValidation(t *testing.T) {
}

func TestTopicAddSubscriptionMetadata(t *testing.T) {
handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
handler := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false, nil
}
})
topicRegistrar := internal.TopicRegistrar{}
sub := &common.Subscription{
PubsubName: "pubsubname",
Expand Down

0 comments on commit d310732

Please sign in to comment.