-
Notifications
You must be signed in to change notification settings - Fork 20
/
automaticOffsetTracking.go
101 lines (88 loc) · 2.68 KB
/
automaticOffsetTracking.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"bufio"
"fmt"
"github.com/google/uuid"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"os"
"strconv"
"sync/atomic"
"time"
)
func CheckErr(err error) {
if err != nil {
fmt.Printf("%s ", err)
os.Exit(1)
}
}
func main() {
reader := bufio.NewReader(os.Stdin)
fmt.Println("Automatic Offset tracking example")
fmt.Println("Connecting to RabbitMQ streaming ...")
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5552).
SetUser("guest").
SetPassword("guest"))
CheckErr(err)
streamName := uuid.New().String()
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
CheckErr(err)
producer, err := env.NewProducer(streamName, nil)
CheckErr(err)
go func() {
for i := 0; i < 220; i++ {
err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
CheckErr(err)
}
}()
var counter int32
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
if atomic.AddInt32(&counter, 1)%20 == 0 {
fmt.Printf("messages consumed with auto commit: %d \n ", atomic.LoadInt32(&counter))
}
}
consumerOffsetNumber, err := env.NewConsumer(streamName,
handleMessages,
stream.NewConsumerOptions().
// set a consumerOffsetNumber name
SetConsumerName("my_consumer").
// nil is also a valid value. Default values will be used
SetAutoCommit(stream.NewAutoCommitStrategy().
SetCountBeforeStorage(50). // each 50 messages stores the index
SetFlushInterval(20*time.Second)).
SetOffset(stream.OffsetSpecification{}.First())) // or after 20 seconds
CheckErr(err)
time.Sleep(2 * time.Second)
atomic.StoreInt32(&counter, 0)
// so here we consume only 20 messages
handleMessagesAfter := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
if atomic.AddInt32(&counter, 1)%20 == 0 {
fmt.Printf("messages consumed after: %d \n ", atomic.LoadInt32(&counter))
}
}
consumerNext, err := env.NewConsumer(streamName,
handleMessagesAfter,
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumerOffsetNumber name
SetOffset(stream.OffsetSpecification{}.LastConsumed())) // With last consumed we point to the last saved.
// in this case will be 200. So it will consume 20
//messages
CheckErr(err)
fmt.Println("Press any key to stop ")
_, _ = reader.ReadString('\n')
err = producer.Close()
CheckErr(err)
err = consumerOffsetNumber.Close()
CheckErr(err)
err = consumerNext.Close()
CheckErr(err)
err = env.DeleteStream(streamName)
CheckErr(err)
}