Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic send and batch send splits for milestone 1.5 #367

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Dec 1, 2024

1.5 Milestone

News:

  • Golang 22

  • The PR introduces a new way to send messages.
    We removed the aggregation via timeout and implemented the dynamic send. The goal is to reduce the latency, especially in low-rate use cases.
    Low-rate use cases are the most used.
    The dynamic send is a wrapper around the BatchSend() that will remain public for the users who want more control during the send.

Test results

PerTest with a limit rate of 50 Messages per second

Given:

 ./perftest --streams stream --publishers 1 --consumers 1 --max-length-bytes 10GB --async-send --rate 50;

With this PR the latency is around 5 ms

Published     40.0 msg/s | Confirmed     40.0 msg/s |  Consumed     40.0 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 5 ms
Published     41.6 msg/s | Confirmed     41.6 msg/s |  Consumed     41.6 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 4 ms
Published     42.9 msg/s | Confirmed     42.9 msg/s |  Consumed     42.9 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 4 ms
Published     43.8 msg/s | Confirmed     43.8 msg/s |  Consumed     43.8 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 4 ms

Main branch (1.4.x), the latency is around 90 ms ( The timeout for the aggregation )

Published     44.4 msg/s | Confirmed     44.4 msg/s |  Consumed     44.4 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 108 ms
Published     45.0 msg/s | Confirmed     45.0 msg/s |  Consumed     45.0 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 106 ms
Published     45.5 msg/s | Confirmed     45.5 msg/s |  Consumed     45.5 msg/s |  Rate Fx: 50 | Body sz: 8 | latency: 103 ms

With the dynamic send, the latency is reduced, especially with a low rate, which is the most common use case.

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
- Add Dynamic send
- Fix super stream test

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio added this to the 1.5.0 milestone Dec 3, 2024
automatically splits the frames.
Removes the messages too big for the frame.
Add the batch result to give more control to the user

Signed-off-by: Gabriele Santomaggio <[email protected]>
automatically splits the frames.
Removes the messages too big for the frame.
Add the batch result to give more control to the user

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio changed the title Dynamic send Dynamic send and batch send splits for milestone 1.5 Dec 3, 2024
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
add unConfirmed struct to balance the unconfirm messages.
refactor code

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
pkg/stream/producer.go Outdated Show resolved Hide resolved
pkg/stream/producer_unconfirmed.go Outdated Show resolved Hide resolved
pkg/stream/producer.go Outdated Show resolved Hide resolved
pkg/stream/coordinator.go Outdated Show resolved Hide resolved
pkg/stream/producer.go Outdated Show resolved Hide resolved
pkg/stream/producer_unconfirmed.go Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
Gsantomaggio and others added 4 commits December 18, 2024 15:28
Co-authored-by: Alberto Moretti <[email protected]>
Co-authored-by: Alberto Moretti <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio marked this pull request as ready for review December 27, 2024 14:06
@@ -267,133 +244,80 @@ func (producer *Producer) getStatus() int {
func (producer *Producer) startUnconfirmedMessagesTimeOutTask() {

go func() {
for {
isActive := true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isActive is not needed, just a for without condition and a return where needed should be enough.

}

func (u *unConfirmed) clear() {
func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus {
u.mutex.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAll and size could use Rlock.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants