Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulksubscribe http #478

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/pubsub/pub/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (

var (
// set the environment as instructions.
pubsubName = os.Getenv("DAPR_PUBSUB_NAME")
topicName = "neworder"
pubsubName = os.Getenv("DAPR_PUBSUB_NAME")
topicName = "neworder"
bulkTopicName = "newbulkorder"
)

func main() {
Expand All @@ -44,7 +45,7 @@ func main() {
}

// Publish multiple events
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {
if res := client.PublishEvents(ctx, pubsubName, bulkTopicName, publishEventsData); res.Error != nil {
panic(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem entirely right, you're panicking here with an unrelated (potentially) nil error

}

Expand Down
10 changes: 8 additions & 2 deletions examples/pubsub/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ var defaultSubscription = &common.Subscription{
Route: "/orders",
}

var bulkSubscription = &common.Subscription{
PubsubName: "bulkmessages",
Topic: "newbulkorder",
Route: "/bulkorders",
}

var importantSubscription = &common.Subscription{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see the importantSubscription re-implemented as a way to validate the route being correct for a message on the same topic alongside match/priority

PubsubName: "messages",
Topic: "neworder",
Expand All @@ -46,11 +52,11 @@ var importantSubscription = &common.Subscription{
func main() {
s := daprd.NewService(":8080")

if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
if err := s.AddBulkTopicEventHandler(defaultSubscription, eventHandler, 10, 100); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

Expand Down
2 changes: 2 additions & 0 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Service interface {
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break this comment up so it doesn't wrap/exceed a reasonable character count (~140ish)

AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
Expand Down
28 changes: 24 additions & 4 deletions service/grpc/topic.go
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return s.topicRegistrar.AddSubscription(sub, fn)
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default values you suggest if nil values are given?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error-out if either are X <= 0 this would prevent a negative value, a nil value is not possible for the int type.

if sub == nil {
return errors.New("subscription required")
}

return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs)
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) {
subs := make([]*runtimev1pb.TopicSubscription, 0)
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &runtimev1pb.TopicSubscription{
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe),
}
subs = append(subs, sub)
}
Expand All @@ -73,6 +82,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes {
}
}

func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribeOptions) *runtimev1pb.BulkSubscribeConfig {
if bulkSubscribe == nil {
return nil
}
return &runtimev1pb.BulkSubscribeConfig{
Enabled: bulkSubscribe.Enabled,
MaxMessagesCount: bulkSubscribe.MaxMessagesCount,
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs,
}
}

// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
Expand Down
Loading