Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulksubscribe http #478

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions examples/pubsub/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ var importantSubscription = &common.Subscription{
func main() {
s := daprd.NewService(":8080")

// for single event subscribing
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
log.Fatalf("error adding topic subscription: %v", err)
}
// if err := s.AddBulkTopicEventHandler(defaultSubscription, eventHandler,10,100); err != nil {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
// log.Fatalf("error adding topic subscription: %v", err)
// }
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved

if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("error listenning: %v", err)
Expand All @@ -64,6 +65,13 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
return false, nil
}

func bulkeventHandler(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
for _, event := range e {
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", event.PubsubName, event.Topic, event.ID, event.Data)
}
return false, nil
}

sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
Expand Down
2 changes: 2 additions & 0 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Service interface {
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break this comment up so it doesn't wrap/exceed a reasonable character count (~140ish)

AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
Expand Down
28 changes: 24 additions & 4 deletions service/grpc/topic.go
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@
return s.topicRegistrar.AddSubscription(sub, fn)
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default values you suggest if nil values are given?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error-out if either are X <= 0 this would prevent a negative value, a nil value is not possible for the int type.

if sub == nil {
return errors.New("subscription required")
}

Check warning on line 43 in service/grpc/topic.go

View check run for this annotation

Codecov / codecov/patch

service/grpc/topic.go#L40-L43

Added lines #L40 - L43 were not covered by tests

return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs)

Check warning on line 45 in service/grpc/topic.go

View check run for this annotation

Codecov / codecov/patch

service/grpc/topic.go#L45

Added line #L45 was not covered by tests
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) {
subs := make([]*runtimev1pb.TopicSubscription, 0)
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &runtimev1pb.TopicSubscription{
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe),
}
subs = append(subs, sub)
}
Expand All @@ -73,6 +82,17 @@
}
}

func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribe) *runtimev1pb.BulkSubscribeConfig {
if bulkSubscribe == nil {
return nil
}
return &runtimev1pb.BulkSubscribeConfig{
Enabled: bulkSubscribe.Enabled,
MaxMessagesCount: bulkSubscribe.MaxMessagesCount,
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs,
}

Check warning on line 93 in service/grpc/topic.go

View check run for this annotation

Codecov / codecov/patch

service/grpc/topic.go#L89-L93

Added lines #L89 - L93 were not covered by tests
}

// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
Expand Down
143 changes: 143 additions & 0 deletions service/http/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
// topicEventJSON is identical to `common.TopicEvent`
// except for it treats `data` as a json.RawMessage so it can
// be used as bytes or interface{}.
// Merge itemMap into topicEventJSON
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
type topicEventJSON struct {
// ID identifies the event.
ID string `json:"id"`
Expand Down Expand Up @@ -113,6 +114,29 @@
return data, rawData
}

type AppResponseStatus string

const (
// Success means the message is received and processed correctly.
Success AppResponseStatus = "SUCCESS"
// Retry means the message is received but could not be processed and must be retried.
Retry AppResponseStatus = "RETRY"
// Drop means the message is received but should not be processed.
Drop AppResponseStatus = "DROP"
)
Comment on lines +117 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse the type provided

const (
// SubscriptionResponseStatusSuccess means message is processed successfully.
SubscriptionResponseStatusSuccess = "SUCCESS"
// SubscriptionResponseStatusRetry means message to be retried by Dapr.
SubscriptionResponseStatusRetry = "RETRY"
// SubscriptionResponseStatusDrop means warning is logged and message is dropped.
SubscriptionResponseStatusDrop = "DROP"
)


type BulkSubscribeResponseEntry struct {
// The id of the bulk subscribe entry
EntryId string `json:"entryId"` //nolint:stylecheck

// The response status of the bulk subscribe entry
Status AppResponseStatus `json:"status"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Status AppResponseStatus `json:"status"`
Status string `json:"status"`

A string value type should be fine

}

type BulkSubscribeResponse struct {
Statuses []BulkSubscribeResponseEntry `json:"statuses"`
}

Comment on lines +128 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these to go-sdk/service/common/type.go

func (s *Server) registerBaseHandler() {
// register subscribe handler
f := func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -301,9 +325,128 @@
return nil
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise with the grpc implementation I would like to see validation of the maxMessagesCount and maxAwaitDurationMs params

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exported function should have a comment quickly outlining the use

if sub == nil {
return errors.New("subscription required")
}
// Route is only required for HTTP but should be specified for the
// app protocol to be interchangeable.
if sub.Route == "" {
return errors.New("handler route name")
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}
if err := s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs); err != nil {
return err
}

s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// check for post with no data
var (
body []byte
err error
)
if r.Body != nil {
body, err = io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}

Check warning on line 353 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L351-L353

Added lines #L351 - L353 were not covered by tests
}
if len(body) == 0 {
http.Error(w, "nil content", PubSubHandlerDropStatusCode)
return
}

// deserialize the event
var ins internal.BulkSubscribeEnvelope
if err = json.Unmarshal(body, &ins); err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}

statuses := make([]BulkSubscribeResponseEntry, 0, len(ins.Entries))

for _, entry := range ins.Entries {
itemJSON, entryErr := json.Marshal(entry.Event)
if entryErr != nil {
http.Error(w, entryErr.Error(), PubSubHandlerDropStatusCode)
return
}

Check warning on line 374 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L372-L374

Added lines #L372 - L374 were not covered by tests
var in topicEventJSON

if err = json.Unmarshal(itemJSON, &in); err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}

Check warning on line 380 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L378-L380

Added lines #L378 - L380 were not covered by tests
if in.PubsubName == "" {
in.Topic = sub.PubsubName
}
if in.Topic == "" {
in.Topic = sub.Topic
}

Check warning on line 386 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L385-L386

Added lines #L385 - L386 were not covered by tests
data, rawData := in.getData()

te := common.TopicEvent{
ID: in.ID,
SpecVersion: in.SpecVersion,
Type: in.Type,
Source: in.Source,
DataContentType: in.DataContentType,
Data: data,
RawData: rawData,
DataBase64: in.DataBase64,
Subject: in.Subject,
PubsubName: in.PubsubName,
Topic: in.Topic,
}

retry, funcErr := fn(r.Context(), &te)
if funcErr == nil {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Success,
},
)
} else if retry {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Retry,
},
)
} else {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Drop,
},
)
}

Check warning on line 422 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L411-L422

Added lines #L411 - L422 were not covered by tests
}

resp := BulkSubscribeResponse{
Statuses: statuses,
}
if err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}

Check warning on line 431 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L429-L431

Added lines #L429 - L431 were not covered by tests
Comment on lines +429 to +432
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no unhandled error at this point, could you clarify that if a single event is dropped it will be replayed/retried at a later date?

w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

writeBulkStatus(w, resp)
})))

return nil
}

func writeStatus(w http.ResponseWriter, s string) {
status := &common.SubscriptionResponse{Status: s}
if err := json.NewEncoder(w).Encode(status); err != nil {
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
}
}

func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
if err := json.NewEncoder(w).Encode(s); err != nil {
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
}

Check warning on line 451 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L450-L451

Added lines #L450 - L451 were not covered by tests
}
Comment on lines +459 to +463
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to create a new function as it is manifestly different or is idiomatic, do you think this is better or would it be better to pass the slice to your function as an argument and wrap it within the function?

Loading
Loading