Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic send and batch send splits for milestone 1.5 #367

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 38 additions & 76 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
- [Run server with Docker](#run-server-with-docker)
- [Getting started for impatient](#getting-started-for-impatient)
- [Examples](#examples)
- [Client best practices](#client-best-practices)
- [Usage](#usage)
* [Connect](#connect)
* [Multi hosts](#multi-hosts)
Expand Down Expand Up @@ -67,7 +68,7 @@ You may need a server to test locally. Let's start the broker:
```shell
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
rabbitmq:3-management
rabbitmq:4-management
```
The broker should start in a few seconds. When it’s ready, enable the `stream` plugin and `stream_management`:
```shell
Expand All @@ -85,6 +86,11 @@ See [getting started](./examples/getting_started.go) example.

See [examples](./examples/) directory for more use cases.

### Client best practices

This client provides a set of best practices to use the client in the best way. </br>
See [best practices](./best_practices/README.md) for more details.

# Usage

### Connect
Expand Down Expand Up @@ -251,15 +257,8 @@ To publish a message you need a `*stream.Producer` instance:
producer, err := env.NewProducer("my-stream", nil)
```

With `ProducerOptions` is possible to customize the Producer behaviour:
```golang
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
```
With `ProducerOptions` is possible to customize the Producer behaviour.


The client provides two interfaces to send messages.
`send`:
Expand All @@ -277,30 +276,35 @@ for z := 0; z < 10; z++ {
err = producer.BatchSend(messages)
```

### `Send` vs `BatchSend`

`producer.Send`:
- accepts one message as parameter
- automatically aggregates the messages
- automatically splits the messages in case the size is bigger than `requestedMaxFrameSize`
- automatically splits the messages based on batch-size
- sends the messages in case nothing happens in `producer-send-timeout`
- is asynchronous
The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation.
`Send` introduces a smart layer to publish messages and internally uses `BatchSend`.

`producer.BatchSend`:
- accepts an array messages as parameter
- is synchronous
Starting from version 1.5.0, the `Send` uses a dynamic send.
The client sends the message buffer regardless of any timeout.</br>

Close the producer:
`producer.Close()` the producer is removed from the server. TCP connection is closed if there aren't </b>
other producers
What should you use? </br>
The `Send` method is the best choice for most of the cases:</br>
- It is asynchronous
- It is smart to aggregate the messages in a batch with a low-latency
- It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize`
- You can play with `BatchSize` parameter to increase the throughput

### `Send` vs `BatchSend`
The `BatchSend` is useful in case you need to manage the aggregation by yourself. </br>
It gives you more control over the aggregation process: </br>
- It is synchronous
- It is up to the user to manage the aggregation
- It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize`
- It can be faster than `Send` in case the aggregation is managed by the user.

The `BatchSend` is the primitive to send the messages, `Send` introduces a smart layer to publish messages and internally uses `BatchSend`.
#### Throughput vs Latency</br>
With both methods you can have low-latency and/or high-throughput. </br>
The `Send` is the best choice for low-latency without care about aggregation.
With `BatchSend` you have more control.</br>

The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ).

See also "Client performances" example in the [examples](./examples/performances/) directory
Performance test tool can help you to test `Send` and `BatchSend` </br>
See also the [Performance test tool](#performance-test-tool) section.

### Publish Confirmation

Expand Down Expand Up @@ -350,10 +354,13 @@ the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.Get

See also "Getting started" example in the [examples](./examples/) directory



### Deduplication

The deduplication is a feature that allows to avoid the duplication of messages. </br>
It is enabled by the user by setting the producer name with the options: </br>
```golang
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer"))
```
The stream plugin can handle deduplication data, see this blog post for more details:
https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/ </br>
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
Expand Down Expand Up @@ -596,55 +603,10 @@ Like the standard stream, you should avoid to store the offset for each single m
### Performance test tool

Performance test tool it is useful to execute tests.
The performance test tool is in the [perfTest](./perfTest) directory. </br>
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool


To install you can download the version from github:

Mac:
```
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
```

Linux:
```
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
```

Windows
```
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
```

execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer.

here an example:
```shell
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
```

### Performance test tool Docker
A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it:

Run the server is host mode:
```shell
docker run -it --rm --name rabbitmq --network host \
rabbitmq:3-management
```
enable the plugin:
```
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
```
then run the docker image:
```shell
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
```

To see all the parameters:
```shell
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
```

### Build form source

```shell
Expand Down
106 changes: 106 additions & 0 deletions best_practices/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
Client best practices
=====================

The scope of this document is to provide a set of best practices for the client applications that use the Go client library.</br>


#### General recommendations
- Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls.
- Use the producer name only if you need deduplication.
- Avoid to store the consumer offset to the server too often.
- `Send` works well in most of the cases, use `BatchSend` when you need more control.
- Connections/producers/consumers are designed to be long-lived. You should avoid creating and closing them too often.
- The library is generally thread-safe,even it is better to use one producer/consumer per go-routine.

#### Default configuration

The default configuration of the client library is designed to be used in most of the cases.
No particular tuning is required. Just follow the [Getting started](../examples/getting_started.go) example.

#### Multiple producers and consumers

Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.</br>
With:
```golang
SetMaxConsumersPerClient(10).
SetMaxConsumersPerClient(10)
```
The TCP connection will be shared between the producers and consumers.
Note about consumers: One slow consumer can block the others, so it is important:
- To have a good balance between the number of consumers and the speed of the consumers.
- work application side to avoid slow consumers, for example, by using a go-routines/buffers.

#### High throughput

To achieve high throughput, you should use one producer per connection, and one consumer per connection.
This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages.

The method `Send` is usually enough to achieve high throughput.
In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details.

#### Low latency

To achieve Low latency, you should use one producer per connection, and one consumer per connection.

The method `Send` is the best choice to achieve low latency. Default values are tuned for low latency.
You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch.
Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`.

#### Store several text based messages

In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method.
Where it is possible to store multiple messages in one entry and compress the entry with different algorithms.
It is useful to reduce the disk space and the network bandwidth.
See the `Sub Entries Batching` documentation for more details.</br>

#### Store several small messages

In case you want to store a lot of small messages, you can use the `BatchSend` method.
Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.</br>


#### Avoid duplications

In case you want to store messages with deduplication, you need to set the producer name and the deduplication id.
See the `Deduplication` documentation for more details.</br>


#### Consumer fail over

In case you want to have a consumer fail over, you can use the `Single Active Consumer` method.
Where only one consumer is active at a time, and the other consumers are in standby mode.

#### Reliable producer and consumer

The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure.
See the `Reliable` documentation for more details.</br>


#### Scaling the streams

In case you want to scale the streams, you can use the `Super Stream` method.
Where you can have multiple streams and only one stream is active at a time.
See the `Super Stream` documentation for more details.</br>


#### Filtering the data when consuming

In case you want to filter the data when consuming, you can use the `Stream Filtering` method.
Where you can filter the data based on the metadata.
See the `Stream Filtering` documentation for more details.</br>


#### Using a load balancer

In case you want to use a load balancer, you can use the `Using a load balancer` method.
In Kubernetes, you can use the service name as load balancer dns.
See the `Using a load balancer` documentation for more details.</br>









2 changes: 0 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ Stream examples
- [Single Active Consumer](./single_active_consumer) - Single Active Consumer example
- [Reliable](./reliable) - Reliable Producer and Reliable Consumer example
- [Super Stream](./super_stream) - Super Stream example with Single Active Consumer
- [Client performances](./performances) - Client performances example

5 changes: 4 additions & 1 deletion examples/deduplication/deduplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func main() {
}

producer, err := env.NewProducer(streamName,
stream.NewProducerOptions().SetProducerName("myProducer")) // producer name is mandatory to handle the deduplication
stream.NewProducerOptions().
// producer name is mandatory to handle the deduplication
// don't use the producer name if you don't need the deduplication
SetProducerName("myProducer"))

CheckErr(err)

Expand Down
38 changes: 0 additions & 38 deletions examples/performances/README.md

This file was deleted.

Loading
Loading