diff --git a/.ci/versions.json b/.ci/versions.json index 11c0c71d..e2506775 100644 --- a/.ci/versions.json +++ b/.ci/versions.json @@ -1,4 +1,4 @@ { - "erlang": "26.2.3", - "rabbitmq": "3.13.0" + "erlang": "27.2", + "rabbitmq": "4.0.5" } diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f8f44132..9275882d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -63,30 +63,31 @@ jobs: verbose: true # optional (default = false) env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - test-win32: - runs-on: windows-latest - strategy: - matrix: - go: [ '1.22'] - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - id: setup_go - with: - go-version: ${{ matrix.go }} - check-latest: true - - name: Cache installers - uses: actions/cache@v4 - with: - # Note: the cache path is relative to the workspace directory - # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action - path: ~/installers - key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} - - name: Install and start RabbitMQ - run: ./.ci/install.ps1 - - name: Install GNU make - run: choco install make - - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + # temporany removed due of https://github.com/actions/checkout/issues/1186 + # test-win32: + # runs-on: windows-latest + # strategy: + # matrix: + # go: [ '1.22'] + # steps: + # - uses: actions/checkout@v4 + # - uses: actions/setup-go@v5 + # id: setup_go + # with: + # go-version: ${{ matrix.go }} + # check-latest: true + # - name: Cache installers + # uses: actions/cache@v4 + # with: + # # Note: the cache path is relative to the workspace directory + # # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action + # path: ~/installers + # key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} + # - name: Install and start RabbitMQ + # run: ./.ci/install.ps1 + # - name: Install GNU make + # run: choco install make + # - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} publish: runs-on: ubuntu-latest needs: [test] diff --git a/README.md b/README.md index ed03c043..c136870c 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv - [Run server with Docker](#run-server-with-docker) - [Getting started for impatient](#getting-started-for-impatient) - [Examples](#examples) +- [Client best practices](#client-best-practices) - [Usage](#usage) * [Connect](#connect) * [Multi hosts](#multi-hosts) @@ -67,7 +68,7 @@ You may need a server to test locally. Let's start the broker: ```shell docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \ - rabbitmq:3-management + rabbitmq:4-management ``` The broker should start in a few seconds. When it’s ready, enable the `stream` plugin and `stream_management`: ```shell @@ -85,6 +86,11 @@ See [getting started](./examples/getting_started.go) example. See [examples](./examples/) directory for more use cases. +### Client best practices + +This client provides a set of best practices to use the client in the best way.
+See [best practices](./best_practices/README.md) for more details. + # Usage ### Connect @@ -251,15 +257,8 @@ To publish a message you need a `*stream.Producer` instance: producer, err := env.NewProducer("my-stream", nil) ``` -With `ProducerOptions` is possible to customize the Producer behaviour: -```golang -type ProducerOptions struct { - Name string // Producer name, it is useful to handle deduplication messages - QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server - BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput - BatchPublishingDelay int // Period to send a batch of messages. -} -``` +With `ProducerOptions` is possible to customize the Producer behaviour. + The client provides two interfaces to send messages. `send`: @@ -277,30 +276,35 @@ for z := 0; z < 10; z++ { err = producer.BatchSend(messages) ``` +### `Send` vs `BatchSend` -`producer.Send`: -- accepts one message as parameter -- automatically aggregates the messages -- automatically splits the messages in case the size is bigger than `requestedMaxFrameSize` -- automatically splits the messages based on batch-size -- sends the messages in case nothing happens in `producer-send-timeout` -- is asynchronous +The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation. +`Send` introduces a smart layer to publish messages and internally uses `BatchSend`. -`producer.BatchSend`: -- accepts an array messages as parameter -- is synchronous +Starting from version 1.5.0, the `Send` uses a dynamic send. +The client sends the message buffer regardless of any timeout.
-Close the producer: -`producer.Close()` the producer is removed from the server. TCP connection is closed if there aren't -other producers +What should you use?
+The `Send` method is the best choice for most of the cases:
+- It is asynchronous +- It is smart to aggregate the messages in a batch with a low-latency +- It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize` +- You can play with `BatchSize` parameter to increase the throughput -### `Send` vs `BatchSend` +The `BatchSend` is useful in case you need to manage the aggregation by yourself.
+It gives you more control over the aggregation process:
+- It is synchronous +- It is up to the user to manage the aggregation +- It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize` +- It can be faster than `Send` in case the aggregation is managed by the user. -The `BatchSend` is the primitive to send the messages, `Send` introduces a smart layer to publish messages and internally uses `BatchSend`. +#### Throughput vs Latency
+With both methods you can have low-latency and/or high-throughput.
+The `Send` is the best choice for low-latency without care about aggregation. +With `BatchSend` you have more control.
-The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ). - -See also "Client performances" example in the [examples](./examples/performances/) directory +Performance test tool can help you to test `Send` and `BatchSend`
+See also the [Performance test tool](#performance-test-tool) section. ### Publish Confirmation @@ -350,10 +354,13 @@ the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.Get See also "Getting started" example in the [examples](./examples/) directory - - ### Deduplication +The deduplication is a feature that allows to avoid the duplication of messages.
+It is enabled by the user by setting the producer name with the options:
+```golang +producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer")) +``` The stream plugin can handle deduplication data, see this blog post for more details: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the [examples](./examples/) directory.
@@ -596,55 +603,10 @@ Like the standard stream, you should avoid to store the offset for each single m ### Performance test tool Performance test tool it is useful to execute tests. +The performance test tool is in the [perfTest](./perfTest) directory.
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool -To install you can download the version from github: - -Mac: -``` -https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz -``` - -Linux: -``` -https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz -``` - -Windows -``` -https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip -``` - -execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer. - -here an example: -```shell -stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10 -``` - -### Performance test tool Docker -A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it: - -Run the server is host mode: -```shell - docker run -it --rm --name rabbitmq --network host \ - rabbitmq:3-management -``` -enable the plugin: -``` - docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream -``` -then run the docker image: -```shell -docker run -it --network host pivotalrabbitmq/go-stream-perf-test -``` - -To see all the parameters: -```shell -docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help -``` - ### Build form source ```shell diff --git a/best_practices/README.md b/best_practices/README.md new file mode 100644 index 00000000..cb3512f1 --- /dev/null +++ b/best_practices/README.md @@ -0,0 +1,106 @@ +Client best practices +===================== + +The scope of this document is to provide a set of best practices for the client applications that use the Go client library.
+ + +#### General recommendations +- Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls. +- Use the producer name only if you need deduplication. +- Avoid to store the consumer offset to the server too often. +- `Send` works well in most of the cases, use `BatchSend` when you need more control. +- Connections/producers/consumers are designed to be long-lived. You should avoid creating and closing them too often. +- The library is generally thread-safe,even it is better to use one producer/consumer per go-routine. + +#### Default configuration + +The default configuration of the client library is designed to be used in most of the cases. +No particular tuning is required. Just follow the [Getting started](../examples/getting_started.go) example. + +#### Multiple producers and consumers + +Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.
+With: +```golang + SetMaxConsumersPerClient(10). + SetMaxConsumersPerClient(10) +``` +The TCP connection will be shared between the producers and consumers. +Note about consumers: One slow consumer can block the others, so it is important: +- To have a good balance between the number of consumers and the speed of the consumers. +- work application side to avoid slow consumers, for example, by using a go-routines/buffers. + +#### High throughput + +To achieve high throughput, you should use one producer per connection, and one consumer per connection. +This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages. + +The method `Send` is usually enough to achieve high throughput. +In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details. + +#### Low latency + +To achieve Low latency, you should use one producer per connection, and one consumer per connection. + +The method `Send` is the best choice to achieve low latency. Default values are tuned for low latency. +You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch. +Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`. + +#### Store several text based messages + +In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method. +Where it is possible to store multiple messages in one entry and compress the entry with different algorithms. +It is useful to reduce the disk space and the network bandwidth. +See the `Sub Entries Batching` documentation for more details.
+ +#### Store several small messages + +In case you want to store a lot of small messages, you can use the `BatchSend` method. +Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.
+ + +#### Avoid duplications + +In case you want to store messages with deduplication, you need to set the producer name and the deduplication id. +See the `Deduplication` documentation for more details.
+ + +#### Consumer fail over + +In case you want to have a consumer fail over, you can use the `Single Active Consumer` method. +Where only one consumer is active at a time, and the other consumers are in standby mode. + +#### Reliable producer and consumer + +The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure. +See the `Reliable` documentation for more details.
+ + +#### Scaling the streams + +In case you want to scale the streams, you can use the `Super Stream` method. +Where you can have multiple streams and only one stream is active at a time. +See the `Super Stream` documentation for more details.
+ + +#### Filtering the data when consuming + +In case you want to filter the data when consuming, you can use the `Stream Filtering` method. +Where you can filter the data based on the metadata. +See the `Stream Filtering` documentation for more details.
+ + +#### Using a load balancer + +In case you want to use a load balancer, you can use the `Using a load balancer` method. +In Kubernetes, you can use the service name as load balancer dns. +See the `Using a load balancer` documentation for more details.
+ + + + + + + + + diff --git a/examples/README.md b/examples/README.md index 3abbfd2f..96315143 100644 --- a/examples/README.md +++ b/examples/README.md @@ -16,5 +16,3 @@ Stream examples - [Single Active Consumer](./single_active_consumer) - Single Active Consumer example - [Reliable](./reliable) - Reliable Producer and Reliable Consumer example - [Super Stream](./super_stream) - Super Stream example with Single Active Consumer - - [Client performances](./performances) - Client performances example - diff --git a/examples/deduplication/deduplication.go b/examples/deduplication/deduplication.go index 1ef2568c..686a3801 100644 --- a/examples/deduplication/deduplication.go +++ b/examples/deduplication/deduplication.go @@ -35,7 +35,10 @@ func main() { } producer, err := env.NewProducer(streamName, - stream.NewProducerOptions().SetProducerName("myProducer")) // producer name is mandatory to handle the deduplication + stream.NewProducerOptions(). + // producer name is mandatory to handle the deduplication + // don't use the producer name if you don't need the deduplication + SetProducerName("myProducer")) CheckErr(err) diff --git a/examples/getting_started.go b/examples/getting_started.go index 0d9a7cbd..7fabd0a2 100644 --- a/examples/getting_started.go +++ b/examples/getting_started.go @@ -111,7 +111,9 @@ func main() { channelClose := consumer.NotifyClose() // channelClose receives all the closing events, here you can handle the // client reconnection or just log - defer consumerClose(channelClose) + go func() { + consumerClose(channelClose) + }() fmt.Println("Press any key to stop ") _, _ = reader.ReadString('\n') diff --git a/examples/performances/README.md b/examples/performances/README.md deleted file mode 100644 index 9059fe41..00000000 --- a/examples/performances/README.md +++ /dev/null @@ -1,38 +0,0 @@ -## Client performances - -This document describes how to tune the parameters of the client to: -- Increase the throughput -- And/or reduce the latency -- And/or reduce the disk space used by the messages. - -### Throughput and Latency - -The parameters that can be tuned are: -- `SetBatchSize(batchSize)` and `SetBatchPublishingDelay(100)` when use the `Send()` method -- The number of the messages when use the `BatchSend()` method - -In this example you can play with the parameters to see the impact on the performances. -There is not a magic formula to find the best parameters, you need to test and find the best values for your use case. - -### How to run the example -``` -go run performances.go async 1000000 100; -``` - -### Disk space used by the messages - -The client supports also the batch entry size and the compression: -`SetSubEntrySize(500).SetCompression(stream.Compression{}...` -These parameters can be used to reduce the space used by the messages due of the compression and the batch entry size. - - -### Default values - -The default producer values are meant to be a good trade-off between throughput and latency. -You can tune the parameters to increase the throughput, reduce the latency or reduce the disk space used by the messages. - - - -### Load tests -To execute load tests, you can use the official load test tool: -https://github.com/rabbitmq/rabbitmq-stream-perf-test \ No newline at end of file diff --git a/examples/performances/performances.go b/examples/performances/performances.go deleted file mode 100644 index 3bdadfb6..00000000 --- a/examples/performances/performances.go +++ /dev/null @@ -1,133 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "github.com/google/uuid" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" - "os" - "strconv" - "sync/atomic" - "time" -) - -func CheckErr(err error) { - if err != nil { - fmt.Printf("%s ", err) - os.Exit(1) - } -} - -var messagesConfirmed int32 - -func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { - go func() { - for confirmed := range confirms { - for _, msg := range confirmed { - if msg.IsConfirmed() { - atomic.AddInt32(&messagesConfirmed, 1) - } - } - } - }() -} - -func main() { - - useSyncBatch := os.Args[1] == "sync" - useAsyncSend := os.Args[1] == "async" - - messagesToSend, err := strconv.Atoi(os.Args[2]) - CheckErr(err) - batchSize, err := strconv.Atoi(os.Args[3]) - messagesToSend = messagesToSend / batchSize - - reader := bufio.NewReader(os.Stdin) - fmt.Println("RabbitMQ performance example") - - // Connect to the broker ( or brokers ) - env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetHost("localhost"). - SetPort(5552). - SetUser("guest"). - SetPassword("guest")) - CheckErr(err) - fmt.Println("Connected to the RabbitMQ server") - - streamName := uuid.New().String() - err = env.DeclareStream(streamName, - &stream.StreamOptions{ - MaxLengthBytes: stream.ByteCapacity{}.GB(2), - }, - ) - CheckErr(err) - fmt.Printf("Created Stream: %s \n", streamName) - - producer, err := env.NewProducer(streamName, - stream.NewProducerOptions(). - SetBatchSize(batchSize). - SetBatchPublishingDelay(100)) - CheckErr(err) - - chPublishConfirm := producer.NotifyPublishConfirmation() - handlePublishConfirm(chPublishConfirm) - fmt.Printf("------------------------------------------\n\n") - fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend*batchSize, len("hello_world")) - var averageLatency time.Duration - var messagesConsumed int32 - handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&messagesConsumed, 1) - var latency time.Time - err := latency.UnmarshalBinary(message.Data[0]) - CheckErr(err) - averageLatency += time.Since(latency) - } - _, err = env.NewConsumer(streamName, handleMessages, stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First())) - CheckErr(err) - - start := time.Now() - - // here the client sends the messages in batch and it is up to the user to aggregate the messages - if useSyncBatch { - var arr []message.StreamMessage - for i := 0; i < messagesToSend; i++ { - for i := 0; i < batchSize; i++ { - latency, err := time.Now().MarshalBinary() - CheckErr(err) - arr = append(arr, amqp.NewMessage(latency)) - } - err := producer.BatchSend(arr) - CheckErr(err) - arr = arr[:0] - } - } - - // here the client aggregates the messages based on the batch size and batch publishing delay - if useAsyncSend { - for i := 0; i < messagesToSend; i++ { - for i := 0; i < batchSize; i++ { - latency, err := time.Now().MarshalBinary() - CheckErr(err) - err = producer.Send(amqp.NewMessage(latency)) - CheckErr(err) - } - } - } - - duration := time.Since(start) - fmt.Println("Press any key to report and stop ") - _, _ = reader.ReadString('\n') - fmt.Printf("------------------------------------------\n\n") - fmt.Printf("Sent %d messages in %s. Confirmed: %d avarage latency: %s \n", messagesToSend*batchSize, duration, messagesConfirmed, averageLatency/time.Duration(messagesConsumed)) - fmt.Printf("------------------------------------------\n\n") - - time.Sleep(200 * time.Millisecond) - CheckErr(err) - err = env.DeleteStream(streamName) - CheckErr(err) - err = env.Close() - CheckErr(err) -} diff --git a/examples/reliable/README.md b/examples/reliable/README.md index 4f70387b..abfe40e9 100644 --- a/examples/reliable/README.md +++ b/examples/reliable/README.md @@ -12,6 +12,10 @@ Note: - The `unConfirmedMessages` are not persisted, so if the application is restarted, the `unConfirmedMessages` will be lost. - The `unConfirmedMessages` order is not guaranteed - The `unConfirmedMessages` can grow indefinitely if the broker is unavailable for a long time. +- The `re-send` in an option that can be enabled by setting `enableResend` to `true`. + +The example enables golang `pprof` you can check the url: localhost:6060/debug/pprof/.
+The scope is to check the resources used by the application in case of reconnection. The `reliable_common.go/retry` function does different checks because during the restart broker can happen different events, please check: diff --git a/examples/reliable/reliable_client.go b/examples/reliable/reliable_client.go index fa154476..365d5ca7 100644 --- a/examples/reliable/reliable_client.go +++ b/examples/reliable/reliable_client.go @@ -4,7 +4,11 @@ import ( "bufio" "errors" "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "log" + "net/http" "os" + "runtime" "sync" "sync/atomic" "time" @@ -13,6 +17,7 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + _ "net/http/pprof" ) // The ha producer and consumer provide a way to auto-reconnect in case of connection problems @@ -30,20 +35,27 @@ var consumed int32 = 0 var sent int32 var reSent int32 +const enableResend = false + func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + // Your application code here + // Tune the parameters to test the reliability - const messagesToSend = 10_000_000 - const numberOfProducers = 4 + const messagesToSend = 50_000_000 + const numberOfProducers = 5 const concurrentProducers = 2 - const numberOfConsumers = 4 + const numberOfConsumers = 1 const sendDelay = 1 * time.Millisecond - const delayEachMessages = 200 - const maxProducersPerClient = 4 + const delayEachMessages = 500 + const maxProducersPerClient = 2 const maxConsumersPerClient = 2 // reader := bufio.NewReader(os.Stdin) - //stream.SetLevelInfo(logs.DEBUG) + stream.SetLevelInfo(logs.DEBUG) fmt.Println("Reliable Producer/Consumer example") fmt.Println("Connecting to RabbitMQ streaming ...") @@ -70,27 +82,52 @@ func main() { } err = env.DeclareStream(streamName, &stream.StreamOptions{ - MaxLengthBytes: stream.ByteCapacity{}.GB(2), + MaxLengthBytes: stream.ByteCapacity{}.GB(1), }, ) CheckErr(err) + var producers []*ha.ReliableProducer + var consumers []*ha.ReliableConsumer isRunning := true go func() { for isRunning { totalConfirmed := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) - expectedMessages := messagesToSend * numberOfProducers * concurrentProducers - fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC822), + expectedMessages := messagesToSend * numberOfProducers * concurrentProducers * 2 + fmt.Printf("********************************************\n") + fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC850), expectedMessages, numberOfProducers, concurrentProducers, numberOfConsumers) fmt.Printf("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n", sent, atomic.LoadInt32(&reSent), atomic.LoadInt32(&confirmed), atomic.LoadInt32(&fail), totalConfirmed) fmt.Printf("Total Consumed: %d - Per consumer: %d \n", atomic.LoadInt32(&consumed), atomic.LoadInt32(&consumed)/numberOfConsumers) + + for _, producer := range producers { + fmt.Printf("%s, status: %s \n", + producer.GetInfo(), producer.GetStatusAsString()) + + } + for _, consumer := range consumers { + fmt.Printf("%s, status: %s \n", + consumer.GetInfo(), consumer.GetStatusAsString()) + } + fmt.Printf("go-routine: %d\n", runtime.NumGoroutine()) fmt.Printf("********************************************\n") time.Sleep(5 * time.Second) } }() - var producers []*ha.ReliableProducer + + for i := 0; i < numberOfConsumers; i++ { + consumer, err := ha.NewReliableConsumer(env, + streamName, + stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()), + func(consumerContext stream.ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&consumed, 1) + }) + CheckErr(err) + consumers = append(consumers, consumer) + } + for i := 0; i < numberOfProducers; i++ { var mutex = sync.Mutex{} // Here we store the messages that have not been confirmed @@ -100,7 +137,7 @@ func main() { rProducer, err := ha.NewReliableProducer(env, streamName, stream.NewProducerOptions(). - SetConfirmationTimeOut(5*time.Second). + SetConfirmationTimeOut(2*time.Second). SetClientProvidedName(fmt.Sprintf("producer-%d", i)), func(messageStatus []*stream.ConfirmationStatus) { go func() { @@ -109,9 +146,11 @@ func main() { atomic.AddInt32(&confirmed, 1) } else { atomic.AddInt32(&fail, 1) - mutex.Lock() - unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) - mutex.Unlock() + if enableResend { + mutex.Lock() + unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) + mutex.Unlock() + } } } }() @@ -122,7 +161,6 @@ func main() { for i := 0; i < concurrentProducers; i++ { go func() { for i := 0; i < messagesToSend; i++ { - msg := amqp.NewMessage([]byte("ha")) mutex.Lock() for _, confirmedMessage := range unConfirmedMessages { err := rProducer.Send(confirmedMessage) @@ -131,6 +169,7 @@ func main() { } unConfirmedMessages = []message.StreamMessage{} mutex.Unlock() + msg := amqp.NewMessage([]byte("ha")) err := rProducer.Send(msg) if i%delayEachMessages == 0 { time.Sleep(sendDelay) @@ -138,25 +177,15 @@ func main() { atomic.AddInt32(&sent, 1) CheckErr(err) + errBatch := rProducer.BatchSend([]message.StreamMessage{msg}) + CheckErr(errBatch) + atomic.AddInt32(&sent, 1) + } }() } }() } - var consumers []*ha.ReliableConsumer - - for i := 0; i < numberOfConsumers; i++ { - go func(name string) { - consumer, err := ha.NewReliableConsumer(env, - streamName, - stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()), - func(consumerContext stream.ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&consumed, 1) - }) - CheckErr(err) - consumers = append(consumers, consumer) - }(streamName) - } fmt.Println("Press enter to close the connections.") _, _ = reader.ReadString('\n') diff --git a/go.mod b/go.mod index 723e75f4..c1e73ac1 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,13 @@ module github.com/rabbitmq/rabbitmq-stream-go-client -go 1.18 +go 1.22.0 require ( github.com/golang/snappy v0.0.4 github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.17.9 - github.com/onsi/ginkgo/v2 v2.19.0 - github.com/onsi/gomega v1.33.1 + github.com/onsi/ginkgo/v2 v2.22.2 + github.com/onsi/gomega v1.36.2 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 github.com/spaolacci/murmur3 v1.1.0 @@ -16,17 +16,17 @@ require ( require ( github.com/frankban/quicktest v1.14.6 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect + github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - golang.org/x/tools v0.21.0 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/tools v0.28.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a26a28ac..956355f8 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -14,6 +16,11 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20241128161848-dc51965c6481 h1:yudKIrXagAOl99WQzrP1gbz5HLB9UjhcOFnPzdd6Qec= +github.com/google/pprof v0.0.0-20241128161848-dc51965c6481/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -29,8 +36,20 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= +github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM= +github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM= +github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= +github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= +github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= +github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= +github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -50,12 +69,32 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/tools v0.27.0 h1:qEKojBykQkQ4EynWy4S8Weg69NumxKdn40Fce3uc/8o= +golang.org/x/tools v0.27.0/go.mod h1:sUi0ZgbwW9ZPAq26Ekut+weQPR5eIM6GQLQ1Yjm1H0Q= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/perfTest/REAMDE.md b/perfTest/REAMDE.md new file mode 100644 index 00000000..03dbe9ce --- /dev/null +++ b/perfTest/REAMDE.md @@ -0,0 +1,91 @@ +Go stream performances test +=== + +This test is to measure the performance of the stream package. + +#### Install the performance test tool +To install you can download the version from GitHub: + +Mac: +``` +https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz +``` + +Linux: +``` +https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz +``` + +Windows +``` +https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip +``` + +execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer in `BatchSend` mode. + +here an example: +```shell +stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10 +``` + +### Performance test tool Docker +A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it: + +Run the server is host mode: +```shell + docker run -it --rm --name rabbitmq --network host \ + rabbitmq:4-management +``` +enable the plugin: +``` + docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream +``` +then run the docker image: +```shell +docker run -it --network host pivotalrabbitmq/go-stream-perf-test +``` + +To see all the parameters: +```shell +docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help +``` + +### Examples + +#### 1. Simple test +1 producer, 1 consumer, 1 stream, 1GB max length +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB +``` + +#### 2. Multiple producers and consumers +3 producers, 2 consumers, 1 stream, 2GB max length +```shell +stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB +``` + +#### 3. Fixed body size +1 producer, 1 consumer, 1 stream, 1GB max length, 400 bytes body +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --fixed-body 400 +``` + +#### 4. Test async-send +By default, the test uses the `BatchSend` mode, to test the `Send` mode: +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --async-send +``` + +#### 5. Test fixed rate and async-send +This test is useful to test the latency, the producer sends messages at a fixed rate. +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --async-send --rate 100 +``` + +#### 6. Batch Size + +Batch Size is valid only for `Send` mode, it is the max number of messages sent in a batch. +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --async-send --batch-size 100 +``` + diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 5fa5a709..b6e6d82e 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -84,10 +84,13 @@ func printStats() { averageLatency := int64(0) CMessagesPerSecond := float64(0) - ConfirmedMessagesPerSecond := float64(0) if atomic.LoadInt32(&consumerMessageCount) > 0 { CMessagesPerSecond = float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount)) + } + + ConfirmedMessagesPerSecond := float64(0) + if atomic.LoadInt32(&confirmedMessageCount) > 0 { ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 } p := gomsg.NewPrinter(language.English) @@ -277,7 +280,7 @@ func startPublisher(streamName string) error { logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp) } - producerOptions.SetClientProvidedName(clientProvidedName) + producerOptions.SetClientProvidedName(clientProvidedName).SetBatchSize(batchSize) rPublisher, err := ha.NewReliableProducer(simulEnvironment, streamName, producerOptions, @@ -319,7 +322,6 @@ func startPublisher(streamName string) error { err = prod.BatchSend(messages) checkErr(err) } - atomic.AddInt32(&publisherMessageCount, int32(len(messages))) } @@ -337,8 +339,8 @@ func buildMessages() []message.StreamMessage { body = make([]byte, fixedBody) } else { if variableBody > 0 { - rand.Seed(time.Now().UnixNano()) - body = make([]byte, rand.Intn(variableBody)) + r := rand.New(rand.NewSource(time.Now().Unix())) + body = make([]byte, r.Intn(variableBody)) } } var buff = make([]byte, 8) diff --git a/pkg/amqp/types.go b/pkg/amqp/types.go index 23067f37..be59a157 100644 --- a/pkg/amqp/types.go +++ b/pkg/amqp/types.go @@ -413,6 +413,12 @@ type Message struct { doneSignal chan struct{} } +// AMQP10 is an AMQP 1.0 message with the necessary fields to work with the +// stream package. +// This is a wrapper around the amqp.Message struct. +// AMQP10 is not thread-safe. +// You should avoid to share the same message between multiple go routines and between multi Send/BatchSend calls. +// Each Send/BatchSend call should use a new message. type AMQP10 struct { publishingId int64 hasPublishingId bool @@ -453,7 +459,15 @@ func (amqp *AMQP10) MarshalBinary() ([]byte, error) { } func (amqp *AMQP10) UnmarshalBinary(data []byte) error { - return amqp.message.UnmarshalBinary(data) + + e := amqp.message.UnmarshalBinary(data) + if e != nil { + return e + } + amqp.Properties = amqp.message.Properties + amqp.Annotations = amqp.message.Annotations + amqp.ApplicationProperties = amqp.message.ApplicationProperties + return nil } func (amqp *AMQP10) GetData() [][]byte { @@ -461,15 +475,15 @@ func (amqp *AMQP10) GetData() [][]byte { } func (amqp *AMQP10) GetMessageProperties() *MessageProperties { - return amqp.message.Properties + return amqp.Properties } func (amqp *AMQP10) GetMessageAnnotations() Annotations { - return amqp.message.Annotations + return amqp.Annotations } func (amqp *AMQP10) GetApplicationProperties() map[string]interface{} { - return amqp.message.ApplicationProperties + return amqp.ApplicationProperties } func (amqp *AMQP10) GetMessageHeader() *MessageHeader { diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 27f88ac5..24e8bb9d 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -28,27 +28,41 @@ type ReliableConsumer struct { bootstrap bool } +func (c *ReliableConsumer) GetStatusAsString() string { + switch c.GetStatus() { + case StatusOpen: + return "Open" + case StatusClosed: + return "Closed" + case StatusStreamDoesNotExist: + return "StreamDoesNotExist" + case StatusReconnecting: + return "Reconnecting" + default: + return "Unknown" + } +} + func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { - for event := range channelClose { - if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { - logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) - c.bootstrap = false - err, reconnected := retry(0, c) - if err != nil { - logs.LogInfo(""+ - "[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err) - } - if reconnected { - c.setStatus(StatusOpen) - } else { - c.setStatus(StatusClosed) - } - + event := <-channelClose + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + c.setStatus(StatusReconnecting) + logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) + c.bootstrap = false + err, reconnected := retry(1, c) + if err != nil { + logs.LogInfo(""+ + "[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err) + } + if reconnected { + c.setStatus(StatusOpen) } else { - logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason) c.setStatus(StatusClosed) } + } else { + logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason) + c.setStatus(StatusClosed) } }() } @@ -71,11 +85,13 @@ func NewReliableConsumer(env *stream.Environment, streamName string, if consumerOptions == nil { return nil, fmt.Errorf("the consumer options is mandatory") } - + logs.LogDebug("[Reliable] - creating %s", res.getInfo()) err := res.newConsumer() if err == nil { + res.setStatus(StatusOpen) } + logs.LogDebug("[Reliable] - created %s", res.getInfo()) return res, err } @@ -123,14 +139,15 @@ func (c *ReliableConsumer) newConsumer() error { c.bootstrap, offset) consumer, err := c.env.NewConsumer(c.streamName, func(consumerContext stream.ConsumerContext, message *amqp.Message) { c.mutexConnection.Lock() - c.currentPosition = consumerContext.Consumer.GetOffset() c.mutexConnection.Unlock() + c.messagesHandler(consumerContext, message) }, c.consumerOptions.SetOffset(offset)) if err != nil { return err } + channelNotifyClose := consumer.NotifyClose() c.handleNotifyClose(channelNotifyClose) c.consumer = consumer @@ -145,3 +162,7 @@ func (c *ReliableConsumer) Close() error { } return nil } + +func (c *ReliableConsumer) GetInfo() string { + return c.getInfo() +} diff --git a/pkg/ha/ha_consumer_test.go b/pkg/ha/ha_consumer_test.go index 0153f70e..66678b8d 100644 --- a/pkg/ha/ha_consumer_test.go +++ b/pkg/ha/ha_consumer_test.go @@ -108,11 +108,13 @@ var _ = Describe("Reliable Consumer", func() { // kill the connection errDrop := test_helper.DropConnection(connectionToDrop, "15672") Expect(errDrop).NotTo(HaveOccurred()) - - time.Sleep(2 * time.Second) // we give some time to the client to reconnect - Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + /// just give some time to raise the event + time.Sleep(1200 * time.Millisecond) + Eventually(func() int { return consumer.GetStatus() }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusOpen)) + Expect(consumer.GetStatusAsString()).To(Equal("Open")) Expect(consumer.Close()).NotTo(HaveOccurred()) Expect(consumer.GetStatus()).To(Equal(StatusClosed)) + Expect(consumer.GetStatusAsString()).To(Equal("Closed")) }) It("Delete the stream should close the consumer", func() { @@ -123,10 +125,11 @@ var _ = Describe("Reliable Consumer", func() { Expect(err).NotTo(HaveOccurred()) Expect(consumer).NotTo(BeNil()) Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + Expect(consumer.GetStatusAsString()).To(Equal("Open")) Expect(envForRConsumer.DeleteStream(streamForRConsumer)).NotTo(HaveOccurred()) Eventually(func() int { return consumer.GetStatus() - }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusClosed)) + }).WithPolling(300 * time.Millisecond).WithTimeout(20 * time.Second).Should(Equal(StatusClosed)) }) }) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 7e743d84..e6a0b1db 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -24,28 +24,31 @@ func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishCo func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { - for event := range channelClose { - if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { - logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", p.getInfo()) - err, reconnected := retry(0, p) - if err != nil { - logs.LogInfo(""+ - "[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err) - } - if reconnected { - p.setStatus(StatusOpen) - } else { - p.setStatus(StatusClosed) - } + event := <-channelClose + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + p.setStatus(StatusReconnecting) + waitTime := randomWaitWithBackoff(1) + logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting in %d milliseconds waiting pending messages", p.getInfo(), waitTime) + time.Sleep(time.Duration(waitTime) * time.Millisecond) + err, reconnected := retry(1, p) + if err != nil { + logs.LogInfo( + "[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err) + } + if reconnected { + p.setStatus(StatusOpen) } else { - logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason) p.setStatus(StatusClosed) } - - p.reconnectionSignal.L.Lock() - p.reconnectionSignal.Broadcast() - p.reconnectionSignal.L.Unlock() + } else { + logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason) + p.setStatus(StatusClosed) } + + p.reconnectionSignal.L.Lock() + p.reconnectionSignal.Broadcast() + p.reconnectionSignal.L.Unlock() + logs.LogInfo("[Reliable] - %s reconnection signal sent", p.getInfo()) }() } @@ -60,6 +63,7 @@ type ReliableProducer struct { producerOptions *stream.ProducerOptions count int32 confirmMessageHandler ConfirmMessageHandler + channelPublishConfirm stream.ChannelPublishConfirm mutex *sync.Mutex mutexStatus *sync.Mutex status int @@ -97,14 +101,17 @@ func NewReliableProducer(env *stream.Environment, streamName string, } func (p *ReliableProducer) newProducer() error { + p.mutex.Lock() + defer p.mutex.Unlock() producer, err := p.env.NewProducer(p.streamName, p.producerOptions) if err != nil { + return err } - channelPublishConfirm := producer.NotifyPublishConfirmation() + p.channelPublishConfirm = producer.NotifyPublishConfirmation() + p.handlePublishConfirm(p.channelPublishConfirm) channelNotifyClose := producer.NotifyClose() p.handleNotifyClose(channelNotifyClose) - p.handlePublishConfirm(channelPublishConfirm) p.producer = producer return err } @@ -148,7 +155,6 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { if err := p.isReadyToSend(); err != nil { return err } - p.mutex.Lock() errW := p.producer.Send(message) p.mutex.Unlock() @@ -180,6 +186,21 @@ func (p *ReliableProducer) GetStatus() int { return p.status } +func (p *ReliableProducer) GetStatusAsString() string { + switch p.GetStatus() { + case StatusOpen: + return "Open" + case StatusClosed: + return "Closed" + case StatusStreamDoesNotExist: + return "StreamDoesNotExist" + case StatusReconnecting: + return "Reconnecting" + default: + return "Unknown" + } +} + // IReliable interface func (p *ReliableProducer) setStatus(value int) { p.mutexStatus.Lock() @@ -224,3 +245,7 @@ func (p *ReliableProducer) Close() error { } return nil } + +func (p *ReliableProducer) GetInfo() string { + return p.getInfo() +} diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go index 131aeb2e..86d8aa22 100644 --- a/pkg/ha/ha_publisher_test.go +++ b/pkg/ha/ha_publisher_test.go @@ -1,6 +1,7 @@ package ha import ( + "fmt" "sync/atomic" "time" @@ -82,7 +83,10 @@ var _ = Describe("Reliable Producer", func() { clientProvidedName := uuid.New().String() producer, err := NewReliableProducer(envForRProducer, streamForRProducer, NewProducerOptions().SetClientProvidedName(clientProvidedName), func(messageConfirm []*ConfirmationStatus) { + + defer GinkgoRecover() for _, confirm := range messageConfirm { + fmt.Printf("message result: %v \n ", confirm.IsConfirmed()) Expect(confirm.IsConfirmed()).To(BeTrue()) } if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 { @@ -173,9 +177,15 @@ var _ = Describe("Reliable Producer", func() { errDrop := test_helper.DropConnection(connectionToDrop, "15672") Expect(errDrop).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) // wait for the producer to be in reconnecting state Eventually(func() bool { return producer.GetStatus() == StatusReconnecting + }).WithPolling(300 * time.Millisecond).WithTimeout(20 * time.Second).Should(BeTrue()) + + // wait for the producer to be in reconnecting state + Eventually(func() bool { + return producer.GetStatusAsString() == "Reconnecting" }, time.Second*5, time.Millisecond). Should(BeTrue()) @@ -195,7 +205,7 @@ var _ = Describe("Reliable Producer", func() { Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred()) Eventually(func() int { return producer.GetStatus() - }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusClosed)) + }).WithPolling(300 * time.Millisecond).WithTimeout(20 * time.Second).Should(Equal(StatusClosed)) }) diff --git a/pkg/ha/reliable_common.go b/pkg/ha/reliable_common.go index 66e1cc9c..2e0fb8b6 100644 --- a/pkg/ha/reliable_common.go +++ b/pkg/ha/reliable_common.go @@ -25,6 +25,7 @@ type IReliable interface { getNewInstance() newEntityInstance getTimeOut() time.Duration getStreamName() string + GetStatusAsString() string } // Retry is a function that retries the IReliable to the stream @@ -41,12 +42,12 @@ type IReliable interface { // In both cases it retries the reconnection func retry(backoff int, reliable IReliable) (error, bool) { - reliable.setStatus(StatusReconnecting) - sleepValue := rand.Intn(int((reliable.getTimeOut().Seconds()-2+1)+2)*1000) + backoff*1000 - logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), sleepValue) - time.Sleep(time.Duration(sleepValue) * time.Millisecond) + waitTime := randomWaitWithBackoff(backoff) + logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), waitTime) + time.Sleep(time.Duration(waitTime) * time.Millisecond) streamMetaData, errS := reliable.getEnv().StreamMetaData(reliable.getStreamName()) if errors.Is(errS, stream.StreamDoesNotExist) { + logs.LogInfo("[Reliable] - The stream %s does not exist for %s. Stopping it", reliable.getStreamName(), reliable.getInfo()) return errS, false } if errors.Is(errS, stream.StreamNotAvailable) { @@ -76,3 +77,19 @@ func retry(backoff int, reliable IReliable) (error, bool) { return result, true } + +func randomWaitWithBackoff(attempt int) int { + r := rand.New(rand.NewSource(time.Now().Unix())) + baseWait := 3_000 + r.Intn(8_000) + + // Calculate the wait time considering the number of attempts + waitTime := baseWait * (1 << (attempt - 1)) // Exponential back-off + + // Cap the wait time at a maximum of 15 seconds + if waitTime > 15_000 { + waitTime = 15_000 + } + + return waitTime + +} diff --git a/pkg/integration_test/stream_integration_test.go b/pkg/integration_test/stream_integration_test.go index 8a835e1c..aaa37710 100644 --- a/pkg/integration_test/stream_integration_test.go +++ b/pkg/integration_test/stream_integration_test.go @@ -50,6 +50,9 @@ var _ = Describe("StreamIntegration", func() { for { select { case confirmations := <-confirmationCh: + for i := range confirmations { + Expect(confirmations[i].IsConfirmed()).To(BeTrue()) + } for range confirmations { count += 1 if count == totalExpected { @@ -113,6 +116,8 @@ var _ = Describe("StreamIntegration", func() { Expect(err).ToNot(HaveOccurred()) } + // wait a bit. We don't have confirmation here + // We should receive only 100 messages because Next sends the next chunk // in the stream. The previously 100 messages should be in a different chunk By("receiving only new messages") diff --git a/pkg/message/interface.go b/pkg/message/interface.go index 23314d3c..b5b615a3 100644 --- a/pkg/message/interface.go +++ b/pkg/message/interface.go @@ -2,6 +2,10 @@ package message import "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" +// StreamMessage is the interface that wraps the basic methods to interact with a message +// in the context of a stream. +// Currently, the StreamMessage interface is implemented by the amqp.Message struct. +// The implementations are not meant to be thread-safe. type StreamMessage interface { MarshalBinary() ([]byte, error) UnmarshalBinary(data []byte) error diff --git a/pkg/stream/aggregation.go b/pkg/stream/aggregation.go index 868809d5..af358054 100644 --- a/pkg/stream/aggregation.go +++ b/pkg/stream/aggregation.go @@ -10,7 +10,6 @@ import ( "github.com/pierrec/lz4" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "io" - "io/ioutil" ) const ( @@ -149,7 +148,7 @@ func (es compressGZIP) UnCompress(source *bufio.Reader, dataSize, uncompressedDa /// headers ---> payload --> headers --> payload (compressed) // Read in data. - uncompressedReader, err := ioutil.ReadAll(reader) + uncompressedReader, err := io.ReadAll(reader) if err != nil { logs.LogError("Error during reading buffer %s", err) } @@ -213,7 +212,7 @@ func (es compressSnappy) UnCompress(source *bufio.Reader, dataSize, uncompressed /// headers ---> payload --> headers --> payload (compressed) // Read in data. - uncompressedReader, err := ioutil.ReadAll(reader) + uncompressedReader, err := io.ReadAll(reader) if err != nil { logs.LogError("Error during reading buffer %s", err) } @@ -275,7 +274,7 @@ func (es compressLZ4) UnCompress(source *bufio.Reader, dataSize, uncompressedDat /// headers ---> payload --> headers --> payload (compressed) // Read in data. - uncompressedReader, err := ioutil.ReadAll(reader) + uncompressedReader, err := io.ReadAll(reader) if err != nil { logs.LogError("Error during reading buffer %s", err) } @@ -345,7 +344,7 @@ func (es compressZSTD) UnCompress(source *bufio.Reader, dataSize, uncompressedDa /// headers ---> payload --> headers --> payload (compressed) // Read in data. - uncompressedReader, err := ioutil.ReadAll(reader) + uncompressedReader, err := io.ReadAll(reader) if err != nil { logs.LogError("Error during reading buffer %s", err) } diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go new file mode 100644 index 00000000..d949b6e0 --- /dev/null +++ b/pkg/stream/blocking_queue.go @@ -0,0 +1,98 @@ +package stream + +import ( + "errors" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "sync/atomic" + "time" +) + +var ErrBlockingQueueStopped = errors.New("blocking queue stopped") + +type BlockingQueue[T any] struct { + queue chan T + status int32 +} + +// NewBlockingQueue initializes a new BlockingQueue with the given capacity +func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] { + return &BlockingQueue[T]{ + queue: make(chan T, capacity), + status: 0, + } +} + +// Enqueue adds an item to the queue, blocking if the queue is full +func (bq *BlockingQueue[T]) Enqueue(item T) error { + if bq.IsStopped() { + return ErrBlockingQueueStopped + } + bq.queue <- item + + return nil +} + +// Dequeue removes an item from the queue with a timeout +func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { + if bq.IsStopped() { + var zeroValue T // Zero value of type T + return zeroValue + } + select { + case item, ok := <-bq.queue: + if !ok { + var zeroValue T // Zero value of type T + return zeroValue + } + return item + case <-time.After(timeout): + var zeroValue T // Zero value of type T + return zeroValue + } +} + +func (bq *BlockingQueue[T]) Size() int { + return len(bq.queue) +} + +func (bq *BlockingQueue[T]) IsEmpty() bool { + return len(bq.queue) == 0 +} + +// Stop stops the queue from accepting new items +// but allows some pending items. +// Stop is different from Close in that it allows the +// existing items to be processed. +// Drain the queue to be sure there are not pending messages +func (bq *BlockingQueue[T]) Stop() { + atomic.StoreInt32(&bq.status, 1) + // drain the queue. To be sure there are not pending messages + // in the queue. + // it does not matter if we lose some messages here + // since there is the unConfirmed map to handle the messages + isActive := true + for isActive { + select { + case <-bq.queue: + // do nothing + case <-time.After(10 * time.Millisecond): + isActive = false + return + default: + isActive = false + return + } + } + logs.LogDebug("BlockingQueue stopped") +} + +func (bq *BlockingQueue[T]) Close() { + if bq.IsStopped() { + atomic.StoreInt32(&bq.status, 2) + close(bq.queue) + } +} + +func (bq *BlockingQueue[T]) IsStopped() bool { + return atomic.LoadInt32(&bq.status) == 1 || atomic.LoadInt32(&bq.status) == 2 +} diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index a3e5d943..1d130977 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -51,8 +51,8 @@ func newTCPParameterDefault() *TCPParameters { return &TCPParameters{ RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, - WriteBuffer: 8192, - ReadBuffer: 8192, + WriteBuffer: defaultWriteSocketBuffer, + ReadBuffer: defaultReadSocketBuffer, NoDelay: true, tlsConfig: nil, } diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 2c92e717..9e25a992 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -69,6 +69,8 @@ type Client struct { socketCallTimeout time.Duration availableFeatures *availableFeatures serverProperties map[string]string + + doneTimeoutTicker chan struct{} } func newClient(connectionName string, broker *Broker, @@ -103,6 +105,7 @@ func newClient(connectionName string, broker *Broker, }, socketCallTimeout: rpcTimeOut, availableFeatures: newAvailableFeatures(), + doneTimeoutTicker: make(chan struct{}, 1), } c.setConnectionName(connectionName) return c @@ -416,43 +419,54 @@ func (c *Client) heartBeat() { ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second) tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second) - resp := c.coordinator.NewResponseWitName("heartbeat") var heartBeatMissed int32 - + doneSendingTimeoutTicker := make(chan struct{}, 1) + wg := sync.WaitGroup{} + wg.Add(2) go func() { - for c.socket.isOpen() { - <-tickerHeartbeat.C - if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { - v := atomic.AddInt32(&heartBeatMissed, 1) - logs.LogWarn("Missing heart beat: %d", v) - if v >= 2 { - logs.LogWarn("Too many heartbeat missing: %d", v) - c.Close() + wg.Done() + select { + case <-c.doneTimeoutTicker: + doneSendingTimeoutTicker <- struct{}{} + ticker.Stop() + tickerHeartbeat.Stop() + return + case <-tickerHeartbeat.C: + for c.socket.isOpen() { + if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { + v := atomic.AddInt32(&heartBeatMissed, 1) + logs.LogWarn("Missing heart beat: %d", v) + if v >= 2 { + logs.LogWarn("Too many heartbeat missing: %d", v) + c.Close() + } + } else { + atomic.StoreInt32(&heartBeatMissed, 0) } - } else { - atomic.StoreInt32(&heartBeatMissed, 0) } - } - tickerHeartbeat.Stop() + }() go func() { + wg.Done() for { select { - case code := <-resp.code: - if code.id == closeChannel { - _ = c.coordinator.RemoveResponseByName("heartbeat") - } - ticker.Stop() + case <-doneSendingTimeoutTicker: + logs.LogDebug("Stopping sending heartbeat") return - case <-ticker.C: + case _, ok := <-ticker.C: + if !ok { + return + } logs.LogDebug("Sending heart beat: %s", time.Now()) c.sendHeartbeat() } } }() + wg.Wait() + } func (c *Client) sendHeartbeat() { @@ -464,12 +478,8 @@ func (c *Client) sendHeartbeat() { func (c *Client) closeHartBeat() { c.destructor.Do(func() { - r, err := c.coordinator.GetResponseByName("heartbeat") - if err != nil { - logs.LogDebug("error removing heartbeat: %s", err) - } else { - r.code <- Code{id: closeChannel} - } + c.doneTimeoutTicker <- struct{}{} + close(c.doneTimeoutTicker) }) } @@ -506,10 +516,9 @@ func (c *Client) Close() error { close(c.metadataListener) c.metadataListener = nil } - + c.closeHartBeat() if c.getSocket().isOpen() { - c.closeHartBeat() res := c.coordinator.NewResponse(CommandClose) length := 2 + 2 + 4 + 2 + 2 + len("OK") var b = bytes.NewBuffer(make([]byte, 0, length+4)) @@ -590,8 +599,8 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) ( } res := c.internalDeclarePublisher(streamName, producer) if res.Err == nil { - producer.startPublishTask() producer.startUnconfirmedMessagesTimeOutTask() + producer.processPendingSequencesQueue() } return producer, res.Err } @@ -755,8 +764,8 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) { brokers = append(brokers, replica) } - rand.Seed(time.Now().UnixNano()) - n := rand.Intn(len(brokers)) + r := rand.New(rand.NewSource(time.Now().Unix())) + n := r.Intn(len(brokers)) return brokers[n], nil } @@ -988,7 +997,10 @@ func (c *Client) DeclareSubscriber(streamName string, return } - case chunk := <-consumer.response.chunkForConsumer: + case chunk, ok := <-consumer.response.chunkForConsumer: + if !ok { + return + } for _, offMessage := range chunk.offsetMessages { consumer.setCurrentOffset(offMessage.offset) if canDispatch(offMessage) { diff --git a/pkg/stream/client_test.go b/pkg/stream/client_test.go index 7d392376..f7a9f017 100644 --- a/pkg/stream/client_test.go +++ b/pkg/stream/client_test.go @@ -142,7 +142,8 @@ var _ = Describe("Streaming testEnvironment", func() { producer, err := testEnvironment.NewProducer(testStreamName, nil) Expect(err).NotTo(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(1_000))).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(1_000)) + Expect(err).NotTo(HaveOccurred()) time.Sleep(time.Millisecond * 800) Expect(producer.Close()).NotTo(HaveOccurred()) diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 26ad5b5e..a19ab723 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -94,8 +94,8 @@ const ( LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" /// - defaultWriteSocketBuffer = 8092 - defaultReadSocketBuffer = 65536 + defaultWriteSocketBuffer = 8192 + defaultReadSocketBuffer = 8192 defaultQueuePublisherSize = 10000 minQueuePublisherSize = 100 maxQueuePublisherSize = 1_000_000 @@ -118,6 +118,7 @@ const ( MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" + DeletePublisher = "deletePublisher" StreamTcpPort = "5552" @@ -189,6 +190,8 @@ func lookErrorCode(errorCode uint16) error { return InternalError case responseCodeAuthenticationFailureLoopback: return AuthenticationFailureLoopbackError + case timeoutError: + return ConfirmationTimoutError default: { logs.LogWarn("Error not handled %d", errorCode) diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 0fc20c30..e9ff54bc 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -332,7 +332,7 @@ func (consumer *Consumer) Close() error { if err.Err != nil && err.isTimeout { return err.Err } - consumer.response.code <- Code{id: closeChannel} + errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{ Command: CommandUnsubscribe, StreamName: consumer.GetStreamName(), diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index 23422e2c..cb7cda83 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -12,7 +12,9 @@ import ( func SendMessages(testEnvironment *Environment, streamName string) { producer, err := testEnvironment.NewProducer(streamName, nil) Expect(err).NotTo(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(30))).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(30)) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) } @@ -210,7 +212,9 @@ var _ = Describe("Streaming Single Active Consumer", func() { SetAutoCommit(nil)) Expect(err).NotTo(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(10)) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) }, 5*time.Second).Should(Equal(int32(10)), diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index cdab829b..49406ed8 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -157,6 +157,7 @@ var _ = Describe("Streaming Consumers", func() { err = producer.BatchSend(CreateArrayMessagesForTesting(30)) // batch Send Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) var messagesReceived int32 = 0 consumer, err := env.NewConsumer(streamName, @@ -189,6 +190,7 @@ var _ = Describe("Streaming Consumers", func() { Expect(err).NotTo(HaveOccurred()) err = producer.BatchSend(CreateArrayMessagesForTesting(3)) // batch Send Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) Eventually(func() int32 { return atomic.LoadInt32(&messagesCount) @@ -205,6 +207,7 @@ var _ = Describe("Streaming Consumers", func() { // Given we have produced 105 messages ... err = producer.BatchSend(CreateArrayMessagesForTesting(105)) // batch Send Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -342,7 +345,9 @@ var _ = Describe("Streaming Consumers", func() { // same SetPublishingId // even we publish the same array more times for i := 0; i < 10; i++ { - Expect(producer.BatchSend(arr)).NotTo(HaveOccurred()) + err = producer.BatchSend(arr) + Expect(err).NotTo(HaveOccurred()) + } var messagesReceived int32 = 0 @@ -390,8 +395,9 @@ var _ = Describe("Streaming Consumers", func() { _, err = env.QueryOffset("consumer_test", streamName) Expect(err).To(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(107))). - NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(107)) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) var messagesReceived int32 = 0 consumer, err := env.NewConsumer(streamName, @@ -448,8 +454,9 @@ var _ = Describe("Streaming Consumers", func() { It("Check already closed", func() { producer, err := env.NewProducer(streamName, nil) Expect(err).NotTo(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(500))). - NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(500)) + Expect(err).NotTo(HaveOccurred()) + defer func(producer *Producer) { err := producer.Close() Expect(err).NotTo(HaveOccurred()) @@ -678,16 +685,17 @@ var _ = Describe("Streaming Consumers", func() { } totalMessages := 200 for i := 0; i < totalMessages; i++ { - msg := amqp.NewMessage(make([]byte, 50)) - Expect(producer1.Send(msg)).NotTo(HaveOccurred()) - Expect(producer2.Send(msg)).NotTo(HaveOccurred()) - Expect(producer3.Send(msg)).NotTo(HaveOccurred()) - Expect(producer4.Send(msg)).NotTo(HaveOccurred()) - Expect(producer5.Send(msg)).NotTo(HaveOccurred()) + + Expect(producer1.Send(amqp.NewMessage(make([]byte, 50)))).NotTo(HaveOccurred()) + Expect(producer2.Send(amqp.NewMessage(make([]byte, 50)))).NotTo(HaveOccurred()) + Expect(producer3.Send(amqp.NewMessage(make([]byte, 50)))).NotTo(HaveOccurred()) + Expect(producer4.Send(amqp.NewMessage(make([]byte, 50)))).NotTo(HaveOccurred()) + Expect(producer5.Send(amqp.NewMessage(make([]byte, 50)))).NotTo(HaveOccurred()) } for i := 0; i < 50; i++ { - Expect(producer6Batch.BatchSend(batchMessages)).NotTo(HaveOccurred()) + err2 := producer6Batch.BatchSend(batchMessages) + Expect(err2).NotTo(HaveOccurred()) } var messagesReceived int32 @@ -731,7 +739,8 @@ var _ = Describe("Streaming Consumers", func() { // so, even we set the SetPublishingId // it will be ignored for i := 0; i < 10; i++ { - Expect(producer.BatchSend(arr)).NotTo(HaveOccurred()) + err := producer.BatchSend(arr) + Expect(err).NotTo(HaveOccurred()) } var messagesReceived int32 = 0 diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index e3015f77..a2d9b198 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -54,9 +54,11 @@ func (coordinator *Coordinator) NewProducer( parameters *ProducerOptions) (*Producer, error) { coordinator.mutex.Lock() defer coordinator.mutex.Unlock() - size := 10000 + dynSize := 10000 + tickerTime := defaultConfirmationTimeOut if parameters != nil { - size = parameters.QueueSize + dynSize = parameters.BatchSize + tickerTime = parameters.ConfirmationTimeOut } var lastId, err = coordinator.getNextProducerItem() @@ -64,54 +66,46 @@ func (coordinator *Coordinator) NewProducer( return nil, err } var producer = &Producer{id: lastId, - options: parameters, - mutex: &sync.RWMutex{}, - mutexPending: &sync.Mutex{}, - unConfirmedMessages: map[int64]*ConfirmationStatus{}, - status: open, - messageSequenceCh: make(chan messageSequence, size), - pendingMessages: pendingMessagesSequence{ - messages: make([]*messageSequence, 0), - size: initBufferPublishSize, - }} + options: parameters, + mutex: &sync.RWMutex{}, + unConfirmed: newUnConfirmed(), + confirmationTimeoutTicker: time.NewTicker(tickerTime), + doneTimeoutTicker: make(chan struct{}, 1), + status: open, + pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize), + confirmMutex: &sync.Mutex{}, + } coordinator.producers[lastId] = producer return producer, err } func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error { - consumer, err := coordinator.GetConsumerById(id) + consumer, err := coordinator.ExtractConsumerById(id) if err != nil { return err } + consumer.setStatus(closed) reason.StreamName = consumer.GetStreamName() reason.Name = consumer.GetName() if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { closeHandler <- reason close(closeHandler) + closeHandler = nil } - return coordinator.removeById(id, coordinator.consumers) + close(consumer.response.chunkForConsumer) + close(consumer.response.code) + + return nil } func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { - producer, err := coordinator.GetProducerById(id) + producer, err := coordinator.ExtractProducerById(id) if err != nil { return err } - reason.StreamName = producer.GetStreamName() - reason.Name = producer.GetName() - tentatives := 0 - for producer.lenUnConfirmed() > 0 && tentatives < 3 { - time.Sleep(200 * time.Millisecond) - tentatives++ - } - - if producer.closeHandler != nil { - producer.closeHandler <- reason - } - - return coordinator.removeById(id, coordinator.producers) + return producer.close(reason) } func (coordinator *Coordinator) RemoveResponseById(id interface{}) error { @@ -220,6 +214,18 @@ func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, erro return v.(*Consumer), err } +func (coordinator *Coordinator) ExtractConsumerById(id interface{}) (*Consumer, error) { + coordinator.mutex.Lock() + defer coordinator.mutex.Unlock() + if coordinator.consumers[id] == nil { + return nil, errors.New("item #{id} not found ") + } + consumer := coordinator.consumers[id].(*Consumer) + coordinator.consumers[id] = nil + delete(coordinator.consumers, id) + return consumer, nil +} + func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error) { v, err := coordinator.getById(fmt.Sprintf("%d", id), coordinator.responses) if err != nil { @@ -240,6 +246,18 @@ func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, erro return v.(*Producer), err } +func (coordinator *Coordinator) ExtractProducerById(id interface{}) (*Producer, error) { + coordinator.mutex.Lock() + defer coordinator.mutex.Unlock() + if coordinator.producers[id] == nil { + return nil, errors.New("item #{id} not found ") + } + producer := coordinator.producers[id].(*Producer) + coordinator.producers[id] = nil + delete(coordinator.producers, id) + return producer, nil +} + // general functions func (coordinator *Coordinator) getById(id interface{}, refmap map[interface{}]interface{}) (interface{}, error) { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 3d0b17c0..2a4c52a5 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -125,8 +125,8 @@ func (env *Environment) newReconnectClient() (*Client, error) { logs.LogError("Can't connect the locator client, error:%s, retry in %d milliseconds, broker: ", err, sleepTime, brokerUri) time.Sleep(time.Duration(sleepTime) * time.Millisecond) - rand.Seed(time.Now().UnixNano()) - n := rand.Intn(len(env.options.ConnectionParameters)) + r := rand.New(rand.NewSource(time.Now().Unix())) + n := r.Intn(len(env.options.ConnectionParameters)) client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters, env.options.SaslConfiguration, env.options.RPCTimeout) tentatives = tentatives + 1 diff --git a/pkg/stream/filtering_test.go b/pkg/stream/filtering_test.go index 9cc2d718..803c9b3b 100644 --- a/pkg/stream/filtering_test.go +++ b/pkg/stream/filtering_test.go @@ -214,6 +214,7 @@ func send(producer *Producer, state string) { msg.ApplicationProperties = map[string]interface{}{"state": state} messages = append(messages, msg) } - Expect(producer.BatchSend(messages)).NotTo(HaveOccurred()) + err := producer.BatchSend(messages) + Expect(err).NotTo(HaveOccurred()) } diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index d4d61f37..a6bdb773 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -50,11 +50,6 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 { return cs.errorCode } -type pendingMessagesSequence struct { - messages []*messageSequence - size int -} - type messageSequence struct { messageBytes []byte unCompressedSize int @@ -64,20 +59,22 @@ type messageSequence struct { } type Producer struct { - id uint8 - options *ProducerOptions - onClose onInternalClose - unConfirmedMessages map[int64]*ConfirmationStatus - sequence int64 - mutex *sync.RWMutex - mutexPending *sync.Mutex - publishConfirm chan []*ConfirmationStatus - closeHandler chan Event - status int - - /// needed for the async publish - messageSequenceCh chan messageSequence - pendingMessages pendingMessagesSequence + id uint8 + options *ProducerOptions + onClose onInternalClose + unConfirmed *unConfirmed + sequence int64 + mutex *sync.RWMutex + + closeHandler chan Event + status int + confirmationTimeoutTicker *time.Ticker + doneTimeoutTicker chan struct{} + + confirmMutex *sync.Mutex + publishConfirmation chan []*ConfirmationStatus + + pendingSequencesQueue *BlockingQueue[*messageSequence] } type FilterValue func(message message.StreamMessage) string @@ -93,66 +90,94 @@ func NewProducerFilter(filterValue FilterValue) *ProducerFilter { } type ProducerOptions struct { - client *Client - streamName string - Name string // Producer name, valid for deduplication - QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server - BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() - BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() - SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id - Compression Compression // Compression type, it is valid only if SubEntrySize > 1 - ConfirmationTimeOut time.Duration // Time to wait for the confirmation - ClientProvidedName string // Client provider name that will be shown in the management UI - Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil -} - + client *Client + streamName string + // Producer name. You need to set it to enable the deduplication feature. + // Deduplication is a feature that allows the producer to avoid sending duplicate messages to the stream. + // see: https://www.rabbitmq.com/blog/2021/07/28/rabbitmq-streams-message-deduplication for more information. + // Don't use it if you don't need the deduplication. + Name string + // Deprecated: starting from 1.5.0 the QueueSize is deprecated, and it will be removed in the next releases + // It is not used anymore given the dynamic batching + QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server + BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() + // Deprecated: starting from 1.5.0 the BatchPublishingDelay is deprecated, and it will be removed in the next releases + // It is not used anymore given the dynamic batching + BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() + // Size of sub Entry, to aggregate more subEntry using one publishing id + SubEntrySize int + // Compression type, it is valid only if SubEntrySize > 1 + // The messages can be compressed before sending them to the server + Compression Compression + // Time to wait for the confirmation, see the unConfirmed structure + ConfirmationTimeOut time.Duration + // Client provider name that will be shown in the management UI + ClientProvidedName string + // Enable the filter feature, by default is disabled. Pointer nil + Filter *ProducerFilter +} + +// SetProducerName sets the producer name. This name is used to enable the deduplication feature. +// See ProducerOptions.Name for more details. +// Don't use it if you don't need the deduplication. func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions { po.Name = name return po } +// Deprecated: starting from 1.5.0 the SetQueueSize is deprecated, and it will be removed in the next releases +// It is not used anymore given the dynamic batching func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions { po.QueueSize = size return po } // SetBatchSize sets the batch size for the producer -// Valid only for the method Send() +// The batch size is the number of messages that are aggregated before sending them to the server +// The SendBatch splits the messages in multiple frames if the messages are bigger than the BatchSize func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions { po.BatchSize = size return po } +// Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases +// It is not used anymore given the dynamic batching func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions { po.BatchPublishingDelay = size return po } +// SetSubEntrySize See the ProducerOptions.SubEntrySize for more details func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions { po.SubEntrySize = size return po } +// SetCompression sets the compression for the producer. See ProducerOptions.Compression for more details func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions { po.Compression = compression return po } +// SetConfirmationTimeOut sets the time to wait for the confirmation. See ProducerOptions.ConfirmationTimeOut for more details func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions { po.ConfirmationTimeOut = duration return po } +// SetClientProvidedName sets the client provided name that will be shown in the management UI func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions { po.ClientProvidedName = name return po } +// SetFilter sets the filter for the producer. See ProducerOptions.Filter for more details func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions { po.Filter = filter return po } +// IsFilterEnabled returns true if the filter is enabled func (po *ProducerOptions) IsFilterEnabled() bool { return po.Filter != nil } @@ -171,85 +196,25 @@ func NewProducerOptions() *ProducerOptions { } func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus { - producer.mutex.RLock() - defer producer.mutex.RUnlock() - return producer.unConfirmedMessages -} - -func (producer *Producer) addUnConfirmedSequences(message []*messageSequence, producerID uint8) { - producer.mutex.Lock() - defer producer.mutex.Unlock() - - for _, msg := range message { - producer.unConfirmedMessages[msg.publishingId] = - &ConfirmationStatus{ - inserted: time.Now(), - message: *msg.refMessage, - producerID: producerID, - publishingId: msg.publishingId, - confirmed: false, - } - } - -} -func (producer *Producer) addUnConfirmed(sequence int64, message message.StreamMessage, producerID uint8) { - producer.mutex.Lock() - defer producer.mutex.Unlock() - producer.unConfirmedMessages[sequence] = &ConfirmationStatus{ - inserted: time.Now(), - message: message, - producerID: producerID, - publishingId: sequence, - confirmed: false, - } + return producer.unConfirmed.getAll() } func (po *ProducerOptions) isSubEntriesBatching() bool { return po.SubEntrySize > 1 } -func (producer *Producer) removeFromConfirmationStatus(status []*ConfirmationStatus) { - producer.mutex.Lock() - defer producer.mutex.Unlock() - - for _, msg := range status { - delete(producer.unConfirmedMessages, msg.publishingId) - for _, linked := range msg.linkedTo { - delete(producer.unConfirmedMessages, linked.publishingId) - } - } -} - -func (producer *Producer) removeUnConfirmed(sequence int64) { - producer.mutex.Lock() - defer producer.mutex.Unlock() - delete(producer.unConfirmedMessages, sequence) -} - func (producer *Producer) lenUnConfirmed() int { - producer.mutex.Lock() - defer producer.mutex.Unlock() - return len(producer.unConfirmedMessages) -} - -func (producer *Producer) lenPendingMessages() int { - producer.mutexPending.Lock() - defer producer.mutexPending.Unlock() - return len(producer.pendingMessages.messages) -} - -func (producer *Producer) getUnConfirmed(sequence int64) *ConfirmationStatus { - producer.mutex.RLock() - defer producer.mutex.RUnlock() - return producer.unConfirmedMessages[sequence] + return producer.unConfirmed.size() } +// NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer. func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm { ch := make(chan []*ConfirmationStatus, 1) - producer.publishConfirm = ch + producer.publishConfirmation = ch return ch } +// NotifyClose returns a channel that receives the close event of the producer. func (producer *Producer) NotifyClose() ChannelClose { ch := make(chan Event, 1) producer.closeHandler = ch @@ -275,205 +240,199 @@ func (producer *Producer) getStatus() int { return producer.status } -func (producer *Producer) sendBufferedMessages() { - - if len(producer.pendingMessages.messages) > 0 { - err := producer.internalBatchSend(producer.pendingMessages.messages) - if err != nil { - return - } - producer.pendingMessages.messages = producer.pendingMessages.messages[:0] - producer.pendingMessages.size = initBufferPublishSize - } -} - func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { go func() { - for producer.getStatus() == open { - time.Sleep(2 * time.Second) - toRemove := make([]*ConfirmationStatus, 0) - // check the unconfirmed messages and remove the one that are expired - // use the RLock to avoid blocking the producer - producer.mutex.RLock() - for _, msg := range producer.unConfirmedMessages { - if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut { - msg.err = ConfirmationTimoutError - msg.errorCode = timeoutError - msg.confirmed = false - toRemove = append(toRemove, msg) - } - } - producer.mutex.RUnlock() - - if len(toRemove) > 0 { - producer.removeFromConfirmationStatus(toRemove) - if producer.publishConfirm != nil { - producer.publishConfirm <- toRemove + for { + select { + case <-producer.doneTimeoutTicker: + logs.LogDebug("producer %d timeout thread closed", producer.id) + return + case <-producer.confirmationTimeoutTicker.C: + // check the unconfirmed messages and remove the one that are expired + if producer.getStatus() == open { + toRemove := producer.unConfirmed.extractWithTimeOut(producer.options.ConfirmationTimeOut) + if len(toRemove) > 0 { + producer.sendConfirmationStatus(toRemove) + } + } else { + logs.LogInfo("producer %d confirmationTimeoutTicker closed", producer.id) + return } } } - time.Sleep(5 * time.Second) - producer.flushUnConfirmedMessages(timeoutError, ConfirmationTimoutError) }() } -func (producer *Producer) startPublishTask() { - go func(ch chan messageSequence) { - var ticker = time.NewTicker(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond) - defer ticker.Stop() - for { - select { +func (producer *Producer) sendConfirmationStatus(status []*ConfirmationStatus) { + producer.confirmMutex.Lock() + defer producer.confirmMutex.Unlock() + if producer.publishConfirmation != nil { + producer.publishConfirmation <- status + } +} - case msg, running := <-ch: - { - if !running { - - producer.flushUnConfirmedMessages(connectionCloseError, ConnectionClosed) - if producer.publishConfirm != nil { - close(producer.publishConfirm) - producer.publishConfirm = nil - } - if producer.closeHandler != nil { - close(producer.closeHandler) - producer.closeHandler = nil - } - return - } - producer.mutexPending.Lock() - if producer.pendingMessages.size+msg.unCompressedSize >= producer.options.client.getTuneState(). - requestedMaxFrameSize { - producer.sendBufferedMessages() - } +func (producer *Producer) closeConfirmationStatus() { + producer.confirmMutex.Lock() + defer producer.confirmMutex.Unlock() + if producer.publishConfirmation != nil { + close(producer.publishConfirmation) + producer.publishConfirmation = nil + } +} - producer.pendingMessages.size += msg.unCompressedSize - producer.pendingMessages.messages = append(producer.pendingMessages.messages, &msg) - if len(producer.pendingMessages.messages) >= (producer.options.BatchSize) { - producer.sendBufferedMessages() - } - producer.mutexPending.Unlock() +// processPendingSequencesQueue aggregates the messages sequence in the queue and sends them to the server +// messages coming form the Send method through the pendingSequencesQueue +func (producer *Producer) processPendingSequencesQueue() { + + maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize + // the buffer is initialized with the size of the header + sequenceToSend := make([]*messageSequence, 0) + go func() { + totalBufferToSend := initBufferPublishSize + for { + var lastError error + // the dequeue is blocking with a timeout of 500ms + // as soon as a message is available the Dequeue will be unblocked + msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + if producer.pendingSequencesQueue.IsStopped() { + break + } + + if msg != nil { + // There is something in the queue.Checks the buffer is still less than the maxFrame + totalBufferToSend += msg.unCompressedSize + if totalBufferToSend > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // the producer sends the messages and reset the buffer + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend = initBufferPublishSize } - case <-ticker.C: - producer.mutexPending.Lock() - producer.sendBufferedMessages() - producer.mutexPending.Unlock() + sequenceToSend = append(sequenceToSend, msg) } - } - }(producer.messageSequenceCh) + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending + // the messages during the checks of the buffer. In this case + if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { + if len(sequenceToSend) > 0 { + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend += initBufferPublishSize + } + } + if lastError != nil { + logs.LogError("error during sending messages: %s", lastError) + } + } + logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) + }() } -func (producer *Producer) sendBytes(streamMessage message.StreamMessage, messageBytes []byte) error { - if len(messageBytes)+initBufferPublishSize > producer.options.client.getTuneState().requestedMaxFrameSize { - return FrameTooLarge +func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { + sequence := message.GetPublishingId() + // in case of sub entry the deduplication is disabled + if !message.HasPublishingId() || producer.options.isSubEntriesBatching() { + sequence = atomic.AddInt64(&producer.sequence, 1) } + return sequence +} +func (producer *Producer) fromMessageToMessageSequence(streamMessage message.StreamMessage) (*messageSequence, error) { + marshalBinary, err := streamMessage.MarshalBinary() + if err != nil { + return nil, err + } + seq := producer.assignPublishingID(streamMessage) filterValue := "" if producer.options.IsFilterEnabled() { filterValue = producer.options.Filter.FilterValue(streamMessage) } - sequence := producer.assignPublishingID(streamMessage) - producer.addUnConfirmed(sequence, streamMessage, producer.id) - - if producer.getStatus() == open { - producer.messageSequenceCh <- messageSequence{ - messageBytes: messageBytes, - unCompressedSize: len(messageBytes), - publishingId: sequence, - filterValue: filterValue, - } - } else { - // TODO: Change the error message with a typed error - return fmt.Errorf("producer id: %d closed", producer.id) - } + return &messageSequence{ + messageBytes: marshalBinary, + unCompressedSize: len(marshalBinary), + publishingId: seq, + filterValue: filterValue, + refMessage: &streamMessage, + }, nil - return nil } // Send sends a message to the stream and returns an error if the message could not be sent. -// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay -// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached. +// The Send is asynchronous. The message is sent to a channel ant then other goroutines aggregate and sent the messages +// The Send is dynamic so the number of messages sent decided internally based on the BatchSize +// and the messages contained in the buffer. The aggregation is up to the client. +// returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { - messageBytes, err := streamMessage.MarshalBinary() + messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { return err } - return producer.sendBytes(streamMessage, messageBytes) -} + producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) -func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { - sequence := message.GetPublishingId() - // in case of sub entry the deduplication is disabled - if !message.HasPublishingId() || producer.options.isSubEntriesBatching() { - sequence = atomic.AddInt64(&producer.sequence, 1) + if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize { + tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) + producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) + return FrameTooLarge } - return sequence + + // se the processPendingSequencesQueue function + err = producer.pendingSequencesQueue.Enqueue(messageSeq) + if err != nil { + return fmt.Errorf("error during enqueue message: %s. Message will be in timed. Producer id: %d ", err, producer.id) + } + return nil } // BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent. -// BatchSend is synchronous. The aggregation is up to the user. The user has to aggregate the messages +// The method is synchronous.The aggregation is up to the user. The user has to aggregate the messages // and send them in a batch. // BatchSend is not affected by the BatchSize and BatchPublishingDelay options. -// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and -// calls BatchSend internally. +// returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { - var messagesSequence = make([]*messageSequence, len(batchMessages)) + maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize + var messagesSequence = make([]*messageSequence, 0) totalBufferToSend := 0 - for i, batchMessage := range batchMessages { - messageBytes, err := batchMessage.MarshalBinary() + for _, batchMessage := range batchMessages { + messageSeq, err := producer.fromMessageToMessageSequence(batchMessage) if err != nil { return err } - filterValue := "" - if producer.options.IsFilterEnabled() { - filterValue = producer.options.Filter.FilterValue(batchMessage) - } + producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) - sequence := producer.assignPublishingID(batchMessage) - totalBufferToSend += len(messageBytes) - messagesSequence[i] = &messageSequence{ - messageBytes: messageBytes, - unCompressedSize: len(messageBytes), - publishingId: sequence, - filterValue: filterValue, - refMessage: &batchMessage, - } + totalBufferToSend += len(messageSeq.messageBytes) + messagesSequence = append(messagesSequence, messageSeq) } + // - producer.addUnConfirmedSequences(messagesSequence, producer.GetID()) + if totalBufferToSend+initBufferPublishSize > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // all the messages are unconfirmed - if totalBufferToSend+initBufferPublishSize > producer.options.client.tuneState.requestedMaxFrameSize { for _, msg := range messagesSequence { - - unConfirmedMessage := producer.getUnConfirmed(msg.publishingId) - - //producer.mutex.Lock() - if producer.publishConfirm != nil { - unConfirmedMessage.err = FrameTooLarge - unConfirmedMessage.errorCode = responseCodeFrameTooLarge - producer.publishConfirm <- []*ConfirmationStatus{unConfirmedMessage} - } - //producer.mutex.Unlock() - producer.removeUnConfirmed(msg.publishingId) + m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge) + producer.sendConfirmationStatus([]*ConfirmationStatus{m}) } - return FrameTooLarge } + // all the messages are unconfirmed return producer.internalBatchSend(messagesSequence) } func (producer *Producer) GetID() uint8 { return producer.id } + func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error { return producer.internalBatchSendProdId(messagesSequence, producer.GetID()) } -func (producer *Producer) simpleAggregation(messagesSequence []*messageSequence, b *bufio.Writer) { +func (producer *Producer) simpleAggregation(messagesSequence []*messageSequence, + b *bufio.Writer) { for _, msg := range messagesSequence { r := msg.messageBytes writeBLong(b, msg.publishingId) // publishingId @@ -528,12 +487,7 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c // the message 5 is linked to 6,7,8,9..15 if entry.publishingId != msg.publishingId { - unConfirmed := producer.getUnConfirmed(entry.publishingId) - if unConfirmed != nil { - unConfirmed.linkedTo = - append(unConfirmed.linkedTo, - producer.getUnConfirmed(msg.publishingId)) - } + producer.unConfirmed.link(entry.publishingId, msg.publishingId) } } @@ -542,9 +496,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c return subEntries, nil } -/// the producer id is always the producer.GetID(). This function is needed only for testing -// some condition, like simulate publish error, see - +// / the producer id is always the producer.GetID(). This function is needed only for testing +// some condition, like simulate publish error. func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error { producer.options.client.socket.mutex.Lock() defer producer.options.client.socket.mutex.Unlock() @@ -601,47 +554,71 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq err := producer.options.client.socket.writer.Flush() //writeAndFlush(b.Bytes()) if err != nil { logs.LogError("Producer BatchSend error during flush: %s", err) - producer.setStatus(closed) return err } return nil } -func (producer *Producer) flushUnConfirmedMessages(errorCode uint16, err error) { - producer.mutex.Lock() - - for _, msg := range producer.unConfirmedMessages { - msg.confirmed = false - msg.err = err - msg.errorCode = errorCode - if producer.publishConfirm != nil { - producer.publishConfirm <- []*ConfirmationStatus{msg} - } - delete(producer.unConfirmedMessages, msg.publishingId) +func (producer *Producer) flushUnConfirmedMessages() { + timeOut := producer.unConfirmed.extractWithTimeOut(time.Duration(0)) + if len(timeOut) > 0 { + producer.sendConfirmationStatus(timeOut) } - producer.mutex.Unlock() } +// GetLastPublishingId returns the last publishing id sent by the producer given the producer name. +// this function is useful when you need to know the last message sent by the producer in case of +// deduplication. func (producer *Producer) GetLastPublishingId() (int64, error) { return producer.options.client.queryPublisherSequence(producer.GetName(), producer.GetStreamName()) } + +// Close closes the producer and returns an error if the producer could not be closed. func (producer *Producer) Close() error { + + return producer.close(Event{ + Command: CommandDeletePublisher, + StreamName: producer.GetStreamName(), + Name: producer.GetName(), + Reason: DeletePublisher, + Err: nil, + }) +} +func (producer *Producer) close(reason Event) error { + if producer.getStatus() == closed { return AlreadyClosed } - producer.waitForInflightMessages() producer.setStatus(closed) + reason.StreamName = producer.GetStreamName() + reason.Name = producer.GetName() + if producer.closeHandler != nil { + producer.closeHandler <- reason + close(producer.closeHandler) + producer.closeHandler = nil + } + + producer.stopAndWaitPendingSequencesQueue() + + producer.closeConfirmationStatus() + + if producer.options == nil { + return nil + } + _ = producer.options.client.coordinator.RemoveProducerById(producer.id, reason) + if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") } - err := producer.options.client.deletePublisher(producer.id) - if err != nil { - logs.LogError("error delete Publisher on closing: %s", err) + // remove from the server only if the producer exists + if reason.Reason == DeletePublisher { + _ = producer.options.client.deletePublisher(producer.id) } + if producer.options.client.coordinator.ProducersCount() == 0 { err := producer.options.client.Close() if err != nil { @@ -656,27 +633,41 @@ func (producer *Producer) Close() error { close(ch) } - close(producer.messageSequenceCh) return nil } +// stopAndWaitPendingSequencesQueue stops the pendingSequencesQueue and waits for the inflight messages to be sent +func (producer *Producer) stopAndWaitPendingSequencesQueue() { + + // Stop the pendingSequencesQueue, so the producer can't send messages anymore + // but the producer can still handle the inflight messages + producer.pendingSequencesQueue.Stop() + + // Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages + producer.confirmationTimeoutTicker.Stop() + producer.doneTimeoutTicker <- struct{}{} + close(producer.doneTimeoutTicker) + + // Wait for the inflight messages + producer.waitForInflightMessages() + // Close the pendingSequencesQueue. It closes the channel + producer.pendingSequencesQueue.Close() + +} + func (producer *Producer) waitForInflightMessages() { // during the close there cloud be pending messages // it waits for producer.options.BatchPublishingDelay // to flush the last messages // see issues/103 - channelLength := len(producer.messageSequenceCh) - pendingMessagesLen := producer.lenPendingMessages() tentatives := 0 - for (channelLength > 0 || pendingMessagesLen > 0 || producer.lenUnConfirmed() > 0) && tentatives < 3 { - logs.LogDebug("waitForInflightMessages, channel: %d - pending messages len: %d - unconfirmed len: %d - retry: %d", - channelLength, pendingMessagesLen, + for (producer.lenUnConfirmed() > 0) && tentatives < 5 { + logs.LogInfo("wait inflight messages - unconfirmed len: %d - retry: %d", producer.lenUnConfirmed(), tentatives) - time.Sleep(time.Duration(2*producer.options.BatchPublishingDelay) * time.Millisecond) - channelLength = len(producer.messageSequenceCh) - pendingMessagesLen = producer.lenPendingMessages() + producer.flushUnConfirmedMessages() + time.Sleep(time.Duration(500) * time.Millisecond) tentatives++ } } @@ -726,7 +717,6 @@ func (producer *Producer) sendWithFilter(messagesSequence []*messageSequence, pr } return producer.options.client.socket.writer.Flush() - } func (c *Client) deletePublisher(publisherId byte) error { @@ -740,14 +730,5 @@ func (c *Client) deletePublisher(publisherId byte) error { writeByte(b, publisherId) errWrite := c.handleWrite(b.Bytes(), resp) - err := c.coordinator.RemoveProducerById(publisherId, Event{ - Command: CommandDeletePublisher, - Reason: "deletePublisher", - Err: nil, - }) - if err != nil { - logs.LogWarn("producer id: %d already removed", publisherId) - } - return errWrite.Err } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 6216c2cb..d2a0cd62 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -6,7 +6,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "sync/atomic" @@ -43,8 +42,40 @@ var _ = Describe("Streaming Producers", func() { It("NewProducer/Send/Close Publisher", func() { producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(5))).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(5)) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) + }) + + It("Close Publishers before send is competed", func() { + // force a high batch size to test the close + // during the close the producer should send the messages in the buffer + //and flush as timeout the messages can't be sent + producer, err := testEnvironment.NewProducer(testProducerStream, NewProducerOptions().SetBatchSize(500)) + ch := producer.NotifyPublishConfirmation() + var confirmedMessages int32 + var failedMessages int32 + + go func(ch ChannelPublishConfirm) { + for confirmed := range ch { + for _, msg := range confirmed { + if msg.IsConfirmed() { + atomic.AddInt32(&confirmedMessages, 1) + } else { + atomic.AddInt32(&failedMessages, 1) + } + } + + } + }(ch) + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 1_000; i++ { + Expect(producer.Send(amqp.NewMessage([]byte("test")))).NotTo(HaveOccurred()) + } Expect(producer.Close()).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&confirmedMessages) + atomic.LoadInt32(&failedMessages) + }).WithPolling(200 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(int32(1_000))) }) It("Multi-thread newProducer/Send", func() { @@ -57,7 +88,9 @@ var _ = Describe("Streaming Producers", func() { producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(5))).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(5)) + Expect(err).NotTo(HaveOccurred()) + err = producer.Close() Expect(err).NotTo(HaveOccurred()) }(&wg) @@ -215,10 +248,8 @@ var _ = Describe("Streaming Producers", func() { go func(ch ChannelPublishConfirm, p *Producer) { defer GinkgoRecover() for ids := range ch { - for i, msg := range ids { + for _, msg := range ids { atomic.AddInt64(&nRecv, 1) - logs.LogInfo(fmt.Sprintf("Message i: %d confirmed - publishingId %d, iteration %d", - i, msg.GetPublishingId(), atomic.LoadInt64(&nRecv))) Expect(msg.GetError()).NotTo(HaveOccurred()) Expect(msg.GetProducerID()).To(Equal(p.id)) Expect(msg.GetPublishingId()).To(Equal(atomic.LoadInt64(&nRecv))) @@ -234,8 +265,8 @@ var _ = Describe("Streaming Producers", func() { } }(chConfirm, producer) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(14))). - NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(14)) + Expect(err).NotTo(HaveOccurred()) Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) @@ -258,8 +289,7 @@ var _ = Describe("Streaming Producers", func() { Expect(producer.Close()).NotTo(HaveOccurred()) Expect(producer.lenUnConfirmed()).To(Equal(0)) - Expect(producer.lenPendingMessages()).To(Equal(0)) - Expect(len(producer.messageSequenceCh)).To(Equal(0)) + Expect(producer.pendingSequencesQueue.IsEmpty()).To(Equal(true)) }) It("Handle close", func() { @@ -273,8 +303,9 @@ var _ = Describe("Streaming Producers", func() { atomic.StoreInt32(&commandIdRecv, int32(event.Command)) }(chClose) - Expect(producer.BatchSend(CreateArrayMessagesForTesting(2))). - NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(2)) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(100 * time.Millisecond) Expect(producer.Close()).NotTo(HaveOccurred()) Eventually(func() int32 { @@ -284,39 +315,6 @@ var _ = Describe("Streaming Producers", func() { }) - It("Pre Publisher errors / Frame too large ", func() { - producer, err := testEnvironment.NewProducer(testProducerStream, nil) - var messagesError int32 - - chPublishConfirmation := producer.NotifyPublishConfirmation() - go func(ch ChannelPublishConfirm) { - defer GinkgoRecover() - for msgs := range ch { - for _, msg := range msgs { - if !msg.IsConfirmed() { - Expect(msg.GetError()).To(Equal(FrameTooLarge)) - atomic.AddInt32(&messagesError, 1) - } - } - } - }(chPublishConfirmation) - - Expect(err).NotTo(HaveOccurred()) - var arr []message.StreamMessage - for z := 0; z < 101; z++ { - s := make([]byte, 15000) - arr = append(arr, amqp.NewMessage(s)) - } - Expect(producer.BatchSend(arr)).To(Equal(FrameTooLarge)) - - Eventually(func() int32 { - return atomic.LoadInt32(&messagesError) - }, 5*time.Second).Should(Equal(int32(101)), - "invalidate all the messages sent in the batch") - - Expect(producer.Close()).NotTo(HaveOccurred()) - }) - It("Smart Send/Close", func() { producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) @@ -363,13 +361,9 @@ var _ = Describe("Streaming Producers", func() { Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) - }, 5*time.Second).Should(Equal(int32(2)), + }, 5*time.Second).WithPolling(200*time.Millisecond).Should(Equal(int32(2)), "confirm should receive same messages Send by producer") - By("Max frame Error") - s := make([]byte, 1148576) - Expect(producer.Send(amqp.NewMessage(s))).To(HaveOccurred()) - Expect(producer.lenUnConfirmed()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) producer, err = testEnvironment.NewProducer(testProducerStream, @@ -431,42 +425,59 @@ var _ = Describe("Streaming Producers", func() { Expect(producer.Close()).NotTo(HaveOccurred()) }) - It("Smart Send Send after BatchPublishingDelay", func() { - // this test is need to test "Send after BatchPublishingDelay" - // and the time check - producer, err := testEnvironment.NewProducer(testProducerStream, - NewProducerOptions().SetBatchPublishingDelay(50)) + It("BatchSend should not a send a big message", func() { + // 1.5 Milestone + // the batch send should not send a big message + // The message should be sed back to the client with an error + // FrameTooLarge and not confirmed + producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) - var messagesReceived int32 + var notConfirmedTooLarge int32 chConfirm := producer.NotifyPublishConfirmation() go func(ch ChannelPublishConfirm) { for ids := range ch { - atomic.AddInt32(&messagesReceived, int32(len(ids))) + for _, conf := range ids { + if !conf.IsConfirmed() { + Expect(conf.GetError()).To(Equal(FrameTooLarge)) + atomic.AddInt32(¬ConfirmedTooLarge, 1) + } + } } }(chConfirm) - - for z := 0; z < 5; z++ { - s := make([]byte, 50) - err = producer.Send(amqp.NewMessage(s)) - Expect(err).NotTo(HaveOccurred()) - time.Sleep(60 * time.Millisecond) - } - - for z := 0; z < 5; z++ { - s := make([]byte, 50) - err = producer.Send(amqp.NewMessage(s)) - Expect(err).NotTo(HaveOccurred()) - time.Sleep(20 * time.Millisecond) - } - + err = producer.BatchSend([]message.StreamMessage{amqp.NewMessage(make([]byte, MessageBufferTooBig))}) + Expect(err).To(HaveOccurred()) Eventually(func() int32 { - return atomic.LoadInt32(&messagesReceived) - }, 5*time.Second).Should(Equal(int32(10)), - "confirm should receive same messages Send by producer") + return atomic.LoadInt32(¬ConfirmedTooLarge) + }).Should(Equal(int32(1))) + Expect(producer.Close()).NotTo(HaveOccurred()) + }) - Expect(producer.lenUnConfirmed()).To(Equal(0)) - err = producer.Close() + It("Send should not a send a big message", func() { + // 1.5 Milestone + // the Send() method should not send a big message + // The message should be sed back to the client with an error + // FrameTooLarge and not confirmed + producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) + var notConfirmedTooLarge int32 + chConfirm := producer.NotifyPublishConfirmation() + go func(ch ChannelPublishConfirm) { + defer GinkgoRecover() + for ids := range ch { + for _, conf := range ids { + if !conf.IsConfirmed() { + Expect(conf.GetError()).To(Equal(FrameTooLarge)) + atomic.AddInt32(¬ConfirmedTooLarge, 1) + } + } + } + }(chConfirm) + err = producer.Send(amqp.NewMessage(make([]byte, MessageBufferTooBig))) + Expect(err).To(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(¬ConfirmedTooLarge) + }, 5*time.Second).Should(Equal(int32(1))) + Expect(producer.Close()).NotTo(HaveOccurred()) }) It("Already Closed/Limits", func() { @@ -529,9 +540,7 @@ var _ = Describe("Streaming Producers", func() { }) // this test is needed to test publish error. - // In order to simulate the producer id not found I need to - // change manually the producer id. - // It works, but would be better to introduce some mock function + // In order to simulate the producer id not found It("Publish Error", func() { env, err := NewEnvironment(nil) Expect(err).NotTo(HaveOccurred()) @@ -572,23 +581,19 @@ var _ = Describe("Streaming Producers", func() { messageBytes: messageBytes, unCompressedSize: len(messageBytes), } - for _, producerC := range producer.options.client.coordinator.producers { - producerC.(*Producer).id = uint8(200) - } - producer.options.client.coordinator.mutex.Lock() - producer.options.client.coordinator.producers[uint8(200)] = producer - producer.options.client.coordinator.mutex.Unlock() + // 200 producer ID doesn't exist Expect(producer.internalBatchSendProdId(messagesSequence, 200)). NotTo(HaveOccurred()) - Expect(env.DeleteStream(prodErrorStream)).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) - Eventually(func() int32 { return atomic.LoadInt32(&messagesConfirmed) }, 5*time.Second).ShouldNot(Equal(0), "it should receive some message") + + Expect(env.DeleteStream(prodErrorStream)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) It("Publish Confirm/Send reuse the same message", func() { @@ -626,7 +631,8 @@ var _ = Describe("Streaming Producers", func() { } atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 12; z++ { - Expect(producer.BatchSend(arr)).NotTo(HaveOccurred()) + err := producer.BatchSend(arr) + Expect(err).NotTo(HaveOccurred()) } Eventually(func() int32 { @@ -757,8 +763,8 @@ var _ = Describe("Streaming Producers", func() { atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 501; z++ { - Expect(producer.BatchSend(CreateArrayMessagesForTesting(5))). - NotTo(HaveOccurred()) + err := producer.BatchSend(CreateArrayMessagesForTesting(5)) + Expect(err).NotTo(HaveOccurred()) } Eventually(func() int32 { @@ -805,6 +811,15 @@ var _ = Describe("Streaming Producers", func() { }) + It("Can't send message if the producer is closed", func() { + + producer, err := testEnvironment.NewProducer(testProducerStream, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) + err = producer.Send(amqp.NewMessage(make([]byte, 50))) + Expect(err).To(HaveOccurred()) + }) + }) func testCompress(producer *Producer) { @@ -830,8 +845,9 @@ func testCompress(producer *Producer) { atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 457; z++ { - Expect(producer.BatchSend(CreateArrayMessagesForTesting(5))). - NotTo(HaveOccurred()) + err := producer.BatchSend(CreateArrayMessagesForTesting(5)) + Expect(err).NotTo(HaveOccurred()) + } Eventually(func() int32 { @@ -874,7 +890,8 @@ func sendConcurrentlyAndSynchronously(producer *Producer, threadCount int, wg *s totalBatchCount := totalMessageCountPerThread / batchSize for batchIndex := 0; batchIndex < totalBatchCount; batchIndex++ { messagePrefix := fmt.Sprintf("test_%d_%d_", goRoutingIndex, batchIndex) - Expect(producer.BatchSend(CreateArrayMessagesForTestingWithPrefix(messagePrefix, batchSize))).NotTo(HaveOccurred()) + err := producer.BatchSend(CreateArrayMessagesForTestingWithPrefix(messagePrefix, batchSize)) + Expect(err).NotTo(HaveOccurred()) } }) } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go new file mode 100644 index 00000000..fa30ed8a --- /dev/null +++ b/pkg/stream/producer_unconfirmed.go @@ -0,0 +1,113 @@ +package stream + +import ( + "sync" + "time" +) + +// unConfirmed is a structure that holds unconfirmed messages +// And unconfirmed message is a message that has been sent to the broker but not yet confirmed, +// and it is added to the unConfirmed structure as soon is possible when +// +// the Send() or BatchSend() method is called +// +// The confirmation status is updated when the confirmation is received from the broker (see server_frame.go) +// or due of timeout. The Timeout is configurable, and it is calculated client side. +type unConfirmed struct { + messages map[int64]*ConfirmationStatus + mutex sync.RWMutex +} + +const DefaultUnconfirmedSize = 10000 + +func newUnConfirmed() *unConfirmed { + + r := &unConfirmed{ + messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + mutex: sync.RWMutex{}, + } + + return r +} + +func (u *unConfirmed) addFromSequence(message *messageSequence, producerID uint8) { + + u.mutex.Lock() + u.messages[message.publishingId] = &ConfirmationStatus{ + inserted: time.Now(), + message: *message.refMessage, + producerID: producerID, + publishingId: message.publishingId, + confirmed: false, + } + u.mutex.Unlock() +} + +func (u *unConfirmed) link(from int64, to int64) { + u.mutex.Lock() + defer u.mutex.Unlock() + r := u.messages[from] + if r != nil { + r.linkedTo = append(r.linkedTo, u.messages[to]) + } +} + +func (u *unConfirmed) extractWithConfirm(id int64) *ConfirmationStatus { + u.mutex.Lock() + defer u.mutex.Unlock() + return u.extract(id, 0, true) +} + +func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { + u.mutex.Lock() + defer u.mutex.Unlock() + return u.extract(id, errorCode, false) +} + +func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *ConfirmationStatus { + rootMessage := u.messages[id] + if rootMessage != nil { + u.updateStatus(rootMessage, errorCode, confirmed) + + for _, linkedMessage := range rootMessage.linkedTo { + u.updateStatus(linkedMessage, errorCode, confirmed) + delete(u.messages, linkedMessage.publishingId) + } + delete(u.messages, id) + } + return rootMessage +} + +func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode uint16, confirmed bool) { + rootMessage.confirmed = confirmed + if confirmed { + return + } + rootMessage.errorCode = errorCode + rootMessage.err = lookErrorCode(errorCode) +} + +func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { + u.mutex.Lock() + defer u.mutex.Unlock() + var res []*ConfirmationStatus + for _, v := range u.messages { + if time.Since(v.inserted) > timeout { + v := u.extract(v.publishingId, timeoutError, false) + res = append(res, v) + } + } + return res +} + +func (u *unConfirmed) size() int { + u.mutex.Lock() + defer u.mutex.Unlock() + return len(u.messages) +} + +func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { + u.mutex.Lock() + defer u.mutex.Unlock() + return u.messages +} diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 3f4ebf79..14d3cfb9 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -140,7 +140,6 @@ func (c *Client) handleResponse() { } } } - } func (c *Client) handleSaslHandshakeResponse(streamingRes *ReaderProtocol, r *bufio.Reader) interface{} { @@ -241,43 +240,39 @@ func (c *Client) commandOpen(readProtocol *ReaderProtocol, r *bufio.Reader) { } func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) interface{} { - + producerFound := false readProtocol.PublishID = readByte(r) - //readProtocol.PublishingIdCount = ReadIntFromReader(testEnvironment.reader) publishingIdCount, _ := readUInt(r) - //var _publishingId int64 producer, err := c.coordinator.GetProducerById(readProtocol.PublishID) + producerFound = err == nil if err != nil { - logs.LogWarn("can't find the producer during confirmation: %s", err) - return nil + logs.LogWarn("can't find the producer during confirmation: %s. Id %d", err, readProtocol.PublishID) } - var unConfirmed []*ConfirmationStatus + + // even the producer is not found we need to read the publishingId + // to empty the buffer. + // The producer here could not exist because the producer is closed before the confirmations are received + var unConfirmedRecv []*ConfirmationStatus for publishingIdCount != 0 { seq := readInt64(r) + if producerFound { + + m := producer.unConfirmed.extractWithConfirm(seq) + if m != nil { + unConfirmedRecv = append(unConfirmedRecv, m) - m := producer.getUnConfirmed(seq) - if m != nil { - m.confirmed = true - unConfirmed = append(unConfirmed, m) - - // in case of sub-batch entry the client receives only - // one publishingId (or sequence) - // so the other messages are confirmed using the linkedTo - for _, message := range m.linkedTo { - message.confirmed = true - unConfirmed = append(unConfirmed, message) + // in case of sub-batch entry the client receives only + // one publishingId (or sequence) + // so the other messages are confirmed using the linkedTo + unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) } } + publishingIdCount-- } - - producer.removeFromConfirmationStatus(unConfirmed) - - //producer.mutex.Lock() - if producer.publishConfirm != nil { - producer.publishConfirm <- unConfirmed + if producerFound { + producer.sendConfirmationStatus(unConfirmedRecv) } - //producer.mutex.Unlock() return 0 } @@ -474,19 +469,13 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) { producer, err := c.coordinator.GetProducerById(publisherId) if err != nil { logs.LogWarn("producer id %d not found, publish error :%s", publisherId, lookErrorCode(code)) - producer = &Producer{unConfirmedMessages: map[int64]*ConfirmationStatus{}} + producer = &Producer{unConfirmed: newUnConfirmed()} } else { - unConfirmedMessage := producer.getUnConfirmed(publishingId) - - producer.mutex.Lock() + unConfirmedMessage := producer.unConfirmed.extractWithError(publishingId, code) - if producer.publishConfirm != nil && unConfirmedMessage != nil { - unConfirmedMessage.errorCode = code - unConfirmedMessage.err = lookErrorCode(code) - producer.publishConfirm <- []*ConfirmationStatus{unConfirmedMessage} + if unConfirmedMessage != nil { + producer.sendConfirmationStatus([]*ConfirmationStatus{unConfirmedMessage}) } - producer.mutex.Unlock() - producer.removeUnConfirmed(publishingId) } publishingErrorCount-- } diff --git a/pkg/stream/stream_test.go b/pkg/stream/stream_test.go index afd2de07..8754888c 100644 --- a/pkg/stream/stream_test.go +++ b/pkg/stream/stream_test.go @@ -6,6 +6,9 @@ import ( "strconv" ) +const MessageBufferTooBig = 1148001 +const MessageBufferBigButLessTheFrame = 1048400 + func CreateArrayMessagesForTesting(numberOfMessages int) []message.StreamMessage { return CreateArrayMessagesForTestingWithPrefix("test_", numberOfMessages) diff --git a/pkg/stream/super_stream_consumer_test.go b/pkg/stream/super_stream_consumer_test.go index 2340c0f9..b17bd145 100644 --- a/pkg/stream/super_stream_consumer_test.go +++ b/pkg/stream/super_stream_consumer_test.go @@ -45,7 +45,7 @@ func Send(env *Environment, superStream string) { } -var _ = Describe("Super Stream Producer", Label("super-stream-consumer"), func() { +var _ = Describe("Super Stream Consumer", Label("super-stream-consumer"), func() { It("Validate the Super Stream Consumer", func() { env, err := NewEnvironment(nil) @@ -167,7 +167,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-consumer"), func() env, err := NewEnvironment(nil) Expect(err).NotTo(HaveOccurred()) - superStream := fmt.Sprintf("sac-super-stream-the-second-consumer-in-action-%s", id) + superStream := fmt.Sprintf("sac-super-stream-the-second-consumer-in-action-%s-%d", id, time.Now().Unix()) Expect(env.DeclareSuperStream(superStream, NewPartitionsOptions(3))).NotTo(HaveOccurred()) var receivedMessages int32 @@ -484,6 +484,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-consumer"), func() var consumerItaly int32 filter := NewConsumerFilter([]string{"italy"}, false, func(message *amqp.Message) bool { + return message.ApplicationProperties["county"] == "italy" }) diff --git a/pkg/stream/super_stream_producer.go b/pkg/stream/super_stream_producer.go index fc7c2504..abbc33a8 100644 --- a/pkg/stream/super_stream_producer.go +++ b/pkg/stream/super_stream_producer.go @@ -357,11 +357,6 @@ func (s *SuperStreamProducer) getProducers() []*Producer { // Send sends a message to the partitions based on the routing strategy func (s *SuperStreamProducer) Send(message message.StreamMessage) error { - b, err := message.MarshalBinary() - if err != nil { - return err - } - ps, err := s.SuperStreamProducerOptions.RoutingStrategy.Route(message, s.GetPartitions()) // the routing strategy can return zero partitions @@ -382,7 +377,7 @@ func (s *SuperStreamProducer) Send(message message.StreamMessage) error { return ErrProducerNotFound } - err = producer.sendBytes(message, b) + err = producer.Send(message) if err != nil { return err } diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index a06de884..dcc98d5a 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -29,7 +29,7 @@ func NewTestingRandomStrategy() *TestingRandomStrategy { return &TestingRandomStrategy{} } -var _ = Describe("Super Stream Producer", Label("super-stream"), func() { +var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() { DescribeTable("Partitioning using Murmur3", @@ -150,7 +150,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream"), func() { Expect(superProducer.activeProducers).To(HaveLen(3)) go func(ch <-chan PartitionPublishConfirm) { - defer GinkgoRecover() + //defer GinkgoRecover() for superStreamPublishConfirm := range ch { Expect(superStreamPublishConfirm).NotTo(BeNil()) for _, status := range superStreamPublishConfirm.ConfirmationStatus { @@ -158,13 +158,13 @@ var _ = Describe("Super Stream Producer", Label("super-stream"), func() { Expect(status.IsConfirmed()).To(BeTrue()) } mutex.Lock() - msgReceived[superStreamPublishConfirm.Partition] = len(superStreamPublishConfirm.ConfirmationStatus) + msgReceived[superStreamPublishConfirm.Partition] += len(superStreamPublishConfirm.ConfirmationStatus) logs.LogInfo("Partition %s confirmed %d messages, total %d", superStreamPublishConfirm.Partition, len(superStreamPublishConfirm.ConfirmationStatus), msgReceived[superStreamPublishConfirm.Partition]) mutex.Unlock() } - }(superProducer.NotifyPublishConfirmation(1)) + }(superProducer.NotifyPublishConfirmation(0)) for i := 0; i < 20; i++ { msg := amqp.NewMessage(make([]byte, 0)) @@ -267,7 +267,8 @@ var _ = Describe("Super Stream Producer", Label("super-stream"), func() { for chq := range ch { if chq.Event.Reason == SocketClosed { time.Sleep(2 * time.Second) - Expect(chq.Context.ConnectPartition(chq.Partition)).NotTo(HaveOccurred()) + err := chq.Context.ConnectPartition(chq.Partition) + Expect(err).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) mutex.Lock() reconnectedMap[chq.Partition] = true @@ -289,7 +290,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream"), func() { time.Sleep(1 * time.Second) Eventually(func() bool { mutex.Lock(); defer mutex.Unlock(); return len(reconnectedMap) == 1 }, - 300*time.Millisecond).WithTimeout(5 * time.Second).Should(BeTrue()) + 300*time.Millisecond).WithTimeout(20 * time.Second).Should(BeTrue()) Eventually(func() bool { return len(superProducer.getProducers()) == 3