-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
85 lines (72 loc) · 2.15 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main
import (
"fmt"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/spf13/cobra"
)
var (
profile string
sourceQueue string
targetQueue string
loopCount int32
visibilityTimeout int32
)
func main() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func init() {
rootCmd.PersistentFlags().StringVarP(&profile, "profile", "p", "", "aws profile")
rootCmd.PersistentFlags().StringVarP(&sourceQueue, "source-queue", "s", "", "source queue url")
rootCmd.PersistentFlags().StringVarP(&targetQueue, "target-queue", "t", "", "target queue url")
rootCmd.PersistentFlags().Int32VarP(&loopCount, "loop-count", "", 1000, "number of loops for receive")
rootCmd.PersistentFlags().Int32VarP(&visibilityTimeout, "visibility-timeout", "", 60, "visibility timeout in seconds")
}
var rootCmd = &cobra.Command{
Use: "sqs-message-mover",
RunE: func(cmd *cobra.Command, args []string) error {
sess, err := session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Profile: profile,
})
if err != nil {
return err
}
svc := sqs.New(sess)
for i := 1; i <= int(loopCount); i++ {
receiveResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: aws.String(sourceQueue),
MaxNumberOfMessages: aws.Int64(10),
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
WaitTimeSeconds: aws.Int64(10),
VisibilityTimeout: aws.Int64(int64(visibilityTimeout)),
})
if err != nil {
return err
}
for _, message := range receiveResult.Messages {
if _, err := svc.SendMessage(&sqs.SendMessageInput{
QueueUrl: aws.String(targetQueue),
MessageBody: message.Body,
MessageAttributes: message.MessageAttributes,
}); err != nil {
return err
}
if _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(sourceQueue),
ReceiptHandle: message.ReceiptHandle,
}); err != nil {
return err
}
}
}
return nil
},
}