-
-
Notifications
You must be signed in to change notification settings - Fork 26
/
outboxer_example_test.go
86 lines (75 loc) · 2.04 KB
/
outboxer_example_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package outboxer_test
import (
"context"
"database/sql"
"fmt"
"os"
"time"
"github.com/italolelis/outboxer"
amqpOut "github.com/italolelis/outboxer/es/amqp"
"github.com/italolelis/outboxer/storage/postgres"
amqp "github.com/rabbitmq/amqp091-go"
)
// nolint
func ExampleNew() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
if err != nil {
fmt.Printf("could not connect to amqp: %s", err)
return
}
conn, err := amqp.Dial(os.Getenv("ES_DSN"))
if err != nil {
fmt.Printf("could not connect to amqp: %s", err)
return
}
// we need to create a data store instance first
ds, err := postgres.WithInstance(ctx, db)
if err != nil {
fmt.Printf("could not setup the data store: %s", err)
return
}
defer ds.Close()
// we create an event stream passing the amqp connection
es := amqpOut.NewAMQP(conn)
// now we create an outboxer instance passing the data store and event stream
o, err := outboxer.New(
outboxer.WithDataStore(ds),
outboxer.WithEventStream(es),
outboxer.WithCheckInterval(1*time.Second),
outboxer.WithCleanupInterval(5*time.Second),
outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)),
outboxer.WithCleanUpBatchSize(10),
outboxer.WithMessageBatchSize(10),
)
if err != nil {
fmt.Printf("could not create an outboxer instance: %s", err)
return
}
// here we initialize the outboxer checks and cleanup go rotines
o.Start(ctx)
defer o.Stop()
// finally we are ready to send messages
if err = o.Send(ctx, &outboxer.OutboxMessage{
Payload: []byte("test payload"),
Options: map[string]interface{}{
amqpOut.ExchangeNameOption: "test",
amqpOut.ExchangeTypeOption: "topic",
amqpOut.RoutingKeyOption: "test.send",
},
}); err != nil {
fmt.Printf("could not send message: %s", err)
return
}
// we can also listen for errors and ok messages that were send
for {
select {
case err := <-o.ErrChan():
fmt.Printf("could not send message: %s", err)
case <-o.OkChan():
fmt.Printf("message received")
return
}
}
}