forked from beatlabs/patron
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
158 lines (138 loc) · 4.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package main
import (
"context"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/beatlabs/patron"
patrongrpc "github.com/beatlabs/patron/client/grpc"
patronsqs "github.com/beatlabs/patron/component/sqs"
"github.com/beatlabs/patron/encoding/json"
"github.com/beatlabs/patron/examples"
"github.com/beatlabs/patron/log"
"google.golang.org/grpc"
)
const (
awsRegion = "eu-west-1"
awsID = "test"
awsSecret = "test"
awsToken = "token"
awsSQSEndpoint = "http://localhost:4566"
awsSQSQueue = "patron"
)
func init() {
err := os.Setenv("PATRON_LOG_LEVEL", "debug")
if err != nil {
log.Fatalf("failed to set log level env var: %v", err)
}
err = os.Setenv("PATRON_JAEGER_SAMPLER_PARAM", "1.0")
if err != nil {
log.Fatalf("failed to set sampler env vars: %v", err)
}
err = os.Setenv("PATRON_HTTP_DEFAULT_PORT", "50004")
if err != nil {
log.Fatalf("failed to set default patron port env vars: %v", err)
}
}
func main() {
name := "sqs"
version := "1.0.0"
ctx := context.Background()
service, err := patron.New(name, version)
if err != nil {
log.Fatalf("failed to set up service: %v", err)
}
cc, err := patrongrpc.Dial("localhost:50006", grpc.WithInsecure())
if err != nil {
log.Fatalf("failed to dial grpc connection: %v", err)
}
defer func() {
_ = cc.Close()
}()
greeterClient := examples.NewGreeterClient(cc)
// Initialise SQS
sqsAPI := sqs.New(getAWSSession(awsSQSEndpoint))
sqsCmp, err := createSQSComponent(sqsAPI, greeterClient)
if err != nil {
log.Fatalf("failed to create sqs component: %v", err)
}
err = service.WithComponents(sqsCmp.cmp).Run(ctx)
if err != nil {
log.Fatalf("failed to create and run service: %v", err)
}
}
type sqsComponent struct {
cmp patron.Component
greeter examples.GreeterClient
}
func createSQSComponent(api sqsiface.SQSAPI, greeter examples.GreeterClient) (*sqsComponent, error) {
sqsCmp := sqsComponent{
greeter: greeter,
}
cmp, err := patronsqs.New("sqs-cmp", awsSQSQueue, api, sqsCmp.Process, patronsqs.PollWaitSeconds(5))
if err != nil {
return nil, err
}
sqsCmp.cmp = cmp
return &sqsCmp, nil
}
func (ac *sqsComponent) Process(_ context.Context, btc patronsqs.Batch) {
for _, msg := range btc.Messages() {
logger := log.FromContext(msg.Context())
var u examples.User
err := json.DecodeRaw(msg.Body(), &u)
if err != nil {
logger.Errorf("failed to decode message: %v", err)
msg.NACK()
continue
}
logger.Infof("request processed: %v, sending request to the gRPC service", u.String())
reply, err := ac.greeter.SayHello(msg.Context(), &examples.HelloRequest{Firstname: u.GetFirstname(), Lastname: u.GetLastname()})
if err != nil {
logger.Errorf("failed to send request: %v", err)
msg.NACK()
}
logger.Infof("reply from the gRPC service: %s", reply.GetMessage())
// We can either acknowledge the whole batch or each message individually.
err = msg.ACK()
if err != nil {
logger.Errorf("failed to acknowledge message with id %s: %v", msg.ID(), err)
}
}
// The commented code below can be used to acknowledge batch of messages instead of each single message
// logger := log.FromContext(ctx)
//
// // We can either acknowledge the whole batch or each message individually.
// failed, err := btc.ACK()
// if err != nil {
// return err
// }
//
// for _, msg := range failed {
// logger.Warnf("failed to acknowledge message with id: %s", msg.ID())
// }
}
func getAWSSession(endpoint string) *session.Session {
// 15 attempts 1 seconds separated to retrieve valid session
var s *session.Session = nil
var err error = nil
for i := 0; i < 15; i++ {
s, err = session.NewSession(
&aws.Config{
Region: aws.String(awsRegion),
Credentials: credentials.NewStaticCredentials(awsID, awsSecret, awsToken),
},
&aws.Config{Endpoint: aws.String(endpoint)},
)
if err == nil {
return s
}
time.Sleep(1 * time.Second)
}
// this will panic if error is not null
return session.Must(s, err)
}