Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce interface for topic event subscriber #661

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,35 @@ if err != nil {
}
```

You can also create a custom type that implements the `TopicEventSubscriber` interface to handle your events:

```go
type EventHandler struct {
// any data or references that your event handler needs.
}

func (h *EventHandler) Handle(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data)
// do something with the event
return true, nil
}
```

The `EventHandler` can then be added using the `AddTopicEventSubscriber` method:

```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
}
eventHandler := &EventHandler{
// initialize any fields
}
if err := s.AddTopicEventSubscriber(sub, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
```

### Service Invocation Handler
To handle service invocations you will need to add at least one service invocation handler before starting the service:

Expand Down
29 changes: 29 additions & 0 deletions daprdocs/content/en/go-sdk-docs/go-service/http-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ if err != nil {
}
```

You can also create a custom type that implements the `TopicEventSubscriber` interface to handle your events:

```go
type EventHandler struct {
// any data or references that your event handler needs.
}

func (h *EventHandler) Handle(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data)
// do something with the event
return true, nil
}
```

The `EventHandler` can then be added using the `AddTopicEventSubscriber` method:

```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
}
eventHandler := &EventHandler{
// initialize any fields
}
if err := s.AddTopicEventSubscriber(sub, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
```

### Service Invocation Handler
To handle service invocations you will need to add at least one service invocation handler before starting the service:

Expand Down
21 changes: 18 additions & 3 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@ const (
)

// Service represents Dapr callback service.
//
//nolint:interfacebloat
type Service interface {
// AddHealthCheckHandler sets a health check handler, name: http (router) and grpc (invalid).
AddHealthCheckHandler(name string, fn HealthCheckHandler) error
// AddServiceInvocationHandler appends provided service invocation handler with its name to the service.
AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
// Note, retries are only considered when there is an error. Lack of error is considered as a success.
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddTopicEventSubscriber appends the provided subscriber with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success.
AddTopicEventSubscriber(sub *Subscription, subscriber TopicEventSubscriber) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
// Deprecated: use RegisterActorImplFactoryContext instead
// Deprecated: use RegisterActorImplFactoryContext instead.
RegisterActorImplFactory(f actor.Factory, opts ...config.Option)
// RegisterActorImplFactoryContext Register a new actor to actor runtime of go sdk
RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option)
Expand All @@ -49,7 +54,7 @@ type Service interface {
Start() error
// Stop stops the previously started service.
Stop() error
// Gracefully stops the previous started service
// Gracefully stops the previous started service.
GracefulStop() error
}

Expand All @@ -60,3 +65,13 @@ type (
JobEventHandler func(ctx context.Context, in *JobEvent) error
HealthCheckHandler func(context.Context) error
)

type TopicEventSubscriber interface {
Handle(ctx context.Context, e *TopicEvent) (retry bool, err error)
}

// Handle converts TopicEventHandler into an adapter that implements
// TopicEventSubscriber.
func (h TopicEventHandler) Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) {
return h(ctx, e)
}
13 changes: 11 additions & 2 deletions service/grpc/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ import (

// AddTopicEventHandler appends provided event handler with topic name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if fn == nil {
return errors.New("topic handler required")
}

return s.AddTopicEventSubscriber(sub, fn)
}

// AddTopicEventSubscriber appends the provided subscriber to the service.
func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error {
if sub == nil {
return errors.New("subscription required")
}

return s.topicRegistrar.AddSubscription(sub, fn)
return s.topicRegistrar.AddSubscription(sub, subscriber)
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
Expand Down Expand Up @@ -142,7 +151,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
in.GetPath(), in.GetPubsubName(), in.GetTopic(),
)
}
retry, err := h(ctx, e)
retry, err := h.Handle(ctx, e)
if err == nil {
return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS}, nil
}
Expand Down
13 changes: 11 additions & 2 deletions service/http/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ func (s *Server) registerBaseHandler() {

// AddTopicEventHandler appends provided event handler with it's name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if fn == nil {
return errors.New("topic handler required")
}

return s.AddTopicEventSubscriber(sub, fn)
}

// AddTopicEventSubscriber appends the provided subscriber to the service.
func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error {
if sub == nil {
return errors.New("subscription required")
}
Expand All @@ -249,7 +258,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
if sub.Route == "" {
return errors.New("handler route name")
}
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
if err := s.topicRegistrar.AddSubscription(sub, subscriber); err != nil {
return err
}

Expand Down Expand Up @@ -306,7 +315,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
w.WriteHeader(http.StatusOK)

// execute user handler
retry, err := fn(r.Context(), &te)
retry, err := subscriber.Handle(r.Context(), &te)
if err == nil {
writeStatus(w, common.SubscriptionResponseStatusSuccess)
return
Expand Down
8 changes: 4 additions & 4 deletions service/internal/topicregistrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ type TopicRegistrar map[string]*TopicRegistration
// TopicRegistration encapsulates the subscription and handlers.
type TopicRegistration struct {
Subscription *TopicSubscription
DefaultHandler common.TopicEventHandler
RouteHandlers map[string]common.TopicEventHandler
DefaultHandler common.TopicEventSubscriber
RouteHandlers map[string]common.TopicEventSubscriber
}

func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error {
func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventSubscriber) error {
if sub.Topic == "" {
return errors.New("topic name required")
}
Expand All @@ -40,7 +40,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
if !ok {
ts = &TopicRegistration{
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
RouteHandlers: make(map[string]common.TopicEventHandler),
RouteHandlers: make(map[string]common.TopicEventSubscriber),
DefaultHandler: nil,
}
ts.Subscription.SetMetadata(sub.Metadata)
Expand Down
10 changes: 5 additions & 5 deletions service/internal/topicregistrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
)

func TestTopicRegistrarValidation(t *testing.T) {
fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
fn := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false, nil
}
})
tests := map[string]struct {
sub common.Subscription
fn common.TopicEventHandler
fn common.TopicEventSubscriber
err string
}{
"pubsub required": {
Expand Down Expand Up @@ -75,9 +75,9 @@ func TestTopicRegistrarValidation(t *testing.T) {
}

func TestTopicAddSubscriptionMetadata(t *testing.T) {
handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
handler := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false, nil
}
})
topicRegistrar := internal.TopicRegistrar{}
sub := &common.Subscription{
PubsubName: "pubsubname",
Expand Down
Loading