-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
216 lines (177 loc) · 5.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package main
import (
"crypto/tls"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"github.com/streadway/amqp"
)
var (
debug bool = false
sProtocol string = "amqp"
dProtocol string = "amqp"
)
func printDebug(string1, string2 string) {
if debug {
if string2 == "" {
fmt.Printf(" --> DEBUG: %s\n", yellow(string1))
} else {
fmt.Printf(" --> DEBUG: %s --> %s\n", yellow(string1), yellow(string2))
}
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", red(msg), err)
}
}
func main() {
debugOption := flag.Bool("debug", false, "Enable debug logging")
srcHost := flag.String("srcHost", "", "Source AMQ cluster")
srcVhost := flag.String("srcVhost", "", "Source AMQ VHost")
dstVhost := flag.String("dstVhost", "", "Destination AMQ VHost")
dstHost := flag.String("dstHost", "", "Destination AMQ cluster")
srcPort := flag.String("srcPort", "", "Source Port")
dstPort := flag.String("dstPort", "", "Destination Port")
srcUser := flag.String("srcUser", "", "Source Username")
dstUser := flag.String("dstUser", "", "Destination Username")
srcPass := flag.String("srcPass", "", "Source Password")
dstPass := flag.String("dstPass", "", "Destination Password")
srcQueue := flag.String("srcQueue", "", "Source queue to copy messages from")
dstQueue := flag.String("dstQueue", "", "Destination queue to copy messages to")
srcTls := flag.Bool("srcTls", false, "Enable / Disable TLS")
dstTls := flag.Bool("dstTls", false, "Enable / Disable TLS")
sArgs := flag.String("sArgs", "", "Comma separated list of SRC queue args in form of 'key:value:type'. Example: 'x-message-ttl:3600000:int,x-ha-policy:all:string'")
dArgs := flag.String("dArgs", "", "Comma separated list of DST queue args in form of 'key:value:type'. Example: 'x-message-ttl:10800000:int,x-ha-policy:all:string'")
usage := flag.Bool("usage", false, "Display usage options with examples")
verifyTls := flag.Bool("verifyTls", true, "Verify TLS certificates for AMQP connection")
flag.Parse()
if *usage {
printUsage()
return
}
if len(os.Args) < 2 {
printUsage()
return
}
if *debugOption == true {
debug = true
}
// Parse Queue Arguments
srcQueueArgs := make(map[string]interface{})
dstQueueArgs := make(map[string]interface{})
parseArgs := func(inArgs, inType string) map[string]interface{} {
tempMap := make(map[string]interface{})
list := strings.Split(inArgs, ",")
for _, arg := range list {
args := strings.Split(arg, ":")
fName := args[0]
fValue := args[1]
vType := args[2]
if vType == "int" {
i, _ := strconv.Atoi(fValue)
tempMap[fName] = i
} else {
tempMap[fName] = string(fValue)
}
printDebug("Setting queue arguement --> ", arg)
}
fmt.Printf("%s Queue Arguments: %s \n", inType, blue(tempMap))
return tempMap
}
srcQueueArgs = parseArgs(*sArgs, "SRC")
dstQueueArgs = parseArgs(*dArgs, "DST")
if *srcTls == true {
sProtocol = "amqps"
}
if *dstTls == true {
dProtocol = "amqps"
}
fmt.Printf("TLS Verify: %s \n", blue(*verifyTls))
fmt.Printf("SRC TLS: %s \n", blue(*srcTls))
fmt.Printf("DST TLS: %s \n", blue(*dstTls))
cfg := tls.Config{
InsecureSkipVerify: *verifyTls,
}
// SRC Cluster Connection
var srcconn *amqp.Connection
var serr error
srcConnString := fmt.Sprintf("%s://%s:%s@%s:%s/%%2F%s", sProtocol, *srcUser, *srcPass, *srcHost, *srcPort, *srcVhost)
if *srcTls {
srcconn, serr = amqp.DialTLS(srcConnString, &cfg)
} else {
srcconn, serr = amqp.Dial(srcConnString)
}
failOnError(serr, "Failed to connect to source RabbitMQ ( If connecting to a TLS port, be sure to use the '-srcTls' option to enable TLS )")
defer srcconn.Close()
srcch, err := srcconn.Channel()
failOnError(err, "Failed to open source channel")
defer srcch.Close()
err = srcch.Qos(1, 0, false)
failOnError(err, "Could not configure Qos on source cluster")
srcq, err := srcch.QueueDeclare(
*srcQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
srcQueueArgs, // args
)
failOnError(err, "Failed to delcare source queue")
//READ from SRC QUEUE
msgs, err := srcch.Consume(
srcq.Name, // name
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a Consumer on source cluster")
// DST Cluster Connection
var dstconn *amqp.Connection
var derr error
dstConnString := fmt.Sprintf("%s://%s:%s@%s:%s/%%2F%s", dProtocol, *dstUser, *dstPass, *dstHost, *dstPort, *dstVhost)
if *dstTls {
dstconn, derr = amqp.DialTLS(dstConnString, &cfg)
} else {
dstconn, derr = amqp.Dial(dstConnString)
}
failOnError(derr, "Failed to connect to destination RabbitMQ cluster ( If connecting to a TLS port, be sure to use the '-dstTls' option to enable TLS )")
defer dstconn.Close()
dstch, err := dstconn.Channel()
failOnError(err, "Failed to open a channel on the destination cluster")
defer dstch.Close()
dstq, err := dstch.QueueDeclare(
*dstQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
dstQueueArgs, // args
)
forever := make(chan bool)
// Publish messages to DST cluster
go func() {
for srcmsg := range msgs {
log.Printf("Recieved a message: %s", srcmsg.Body)
err = dstch.Publish(
"", // exchange
dstq.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(srcmsg.Body),
})
log.Printf(" [x] Sent %s", srcmsg.Body)
failOnError(err, "Failed to publish a message on destination cluster")
}
}()
log.Printf(" [*] Waiting for messages. To exit, press CTRL+C")
<-forever
}