-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bulksubscribe http #478
base: main
Are you sure you want to change the base?
Changes from 15 commits
3f5bcd8
1c45f4e
2193f47
5d4e498
f19a505
fdd425a
cb26d03
597c099
10d2c67
e218130
5131387
81b72ef
203d31c
068ef1c
36b78f5
38bc154
35c37bc
fb4138a
0b41a28
d05856f
92e7eef
f315c9e
0f2d9bb
f4a3b65
3577081
5e1f64d
de720d9
05f3ad7
300e5ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,12 @@ var defaultSubscription = &common.Subscription{ | |
Route: "/orders", | ||
} | ||
|
||
var bulkSubscription = &common.Subscription{ | ||
PubsubName: "bulkmessages", | ||
Topic: "newbulkorder", | ||
Route: "/bulkorders", | ||
} | ||
|
||
var importantSubscription = &common.Subscription{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to see the importantSubscription re-implemented as a way to validate the |
||
PubsubName: "messages", | ||
Topic: "neworder", | ||
|
@@ -46,11 +52,11 @@ var importantSubscription = &common.Subscription{ | |
func main() { | ||
s := daprd.NewService(":8080") | ||
|
||
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil { | ||
if err := s.AddBulkTopicEventHandler(defaultSubscription, eventHandler, 10, 100); err != nil { | ||
log.Fatalf("error adding topic subscription: %v", err) | ||
} | ||
|
||
if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil { | ||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil { | ||
log.Fatalf("error adding topic subscription: %v", err) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,8 @@ type Service interface { | |
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service. | ||
// Note, retries are only considered when there is an error. Lack of error is considered as a success | ||
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error | ||
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: break this comment up so it doesn't wrap/exceed a reasonable character count (~140ish) |
||
AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error | ||
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. | ||
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error | ||
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk | ||
|
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,16 +37,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE | |
return s.topicRegistrar.AddSubscription(sub, fn) | ||
} | ||
|
||
func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { | ||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what default values you suggest if nil values are given? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error-out if either are |
||
if sub == nil { | ||
return errors.New("subscription required") | ||
} | ||
|
||
return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs) | ||
} | ||
|
||
// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. | ||
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) { | ||
subs := make([]*runtimev1pb.TopicSubscription, 0) | ||
for _, v := range s.topicRegistrar { | ||
s := v.Subscription | ||
sub := &runtimev1pb.TopicSubscription{ | ||
PubsubName: s.PubsubName, | ||
Topic: s.Topic, | ||
Metadata: s.Metadata, | ||
Routes: convertRoutes(s.Routes), | ||
PubsubName: s.PubsubName, | ||
Topic: s.Topic, | ||
Metadata: s.Metadata, | ||
Routes: convertRoutes(s.Routes), | ||
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe), | ||
} | ||
subs = append(subs, sub) | ||
} | ||
|
@@ -73,6 +82,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes { | |
} | ||
} | ||
|
||
func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribeOptions) *runtimev1pb.BulkSubscribeConfig { | ||
if bulkSubscribe == nil { | ||
return nil | ||
} | ||
return &runtimev1pb.BulkSubscribeConfig{ | ||
Enabled: bulkSubscribe.Enabled, | ||
MaxMessagesCount: bulkSubscribe.MaxMessagesCount, | ||
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs, | ||
} | ||
} | ||
|
||
// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. | ||
// Dapr sends published messages in a CloudEvents v1.0 envelope. | ||
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem entirely right, you're panicking here with an unrelated (potentially) nil error