forked from myzhan/boomer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_gomq.go
114 lines (97 loc) · 2.6 KB
/
client_gomq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// +build !goczmq
package boomer
import (
"fmt"
"log"
"net"
"strings"
"github.com/zeromq/gomq"
"github.com/zeromq/gomq/zmtp"
)
type gomqSocketClient struct {
pushSocket *gomq.Socket
pullSocket *gomq.Socket
}
func newClient() client {
log.Println("Boomer is built with gomq support.")
var message string
var client client
if rpc == "zeromq" {
client = newZmqClient(masterHost, masterPort)
message = fmt.Sprintf("Boomer is connected to master(%s:%d|%d) press Ctrl+c to quit.", masterHost, masterPort, masterPort+1)
} else if rpc == "socket" {
client = newSocketClient(masterHost, masterPort)
message = fmt.Sprintf("Boomer is connected to master(%s:%d) press Ctrl+c to quit.", masterHost, masterPort)
} else {
log.Fatal("Unknown rpc type:", rpc)
}
log.Println(message)
return client
}
func newGomqSocket(socketType zmtp.SocketType) *gomq.Socket {
socket := gomq.NewSocket(false, socketType, nil, zmtp.NewSecurityNull())
return socket
}
func getNetConn(addr string) net.Conn {
parts := strings.Split(addr, "://")
netConn, err := net.Dial(parts[0], parts[1])
if err != nil {
log.Fatal(err)
}
return netConn
}
func connectSock(socket *gomq.Socket, addr string) {
netConn := getNetConn(addr)
zmtpConn := zmtp.NewConnection(netConn)
_, err := zmtpConn.Prepare(socket.SecurityMechanism(), socket.SocketType(), nil, false, nil)
if err != nil {
log.Fatal(err)
}
conn := gomq.NewConnection(netConn, zmtpConn)
socket.AddConnection(conn)
zmtpConn.Recv(socket.RecvChannel())
}
func newZmqClient(masterHost string, masterPort int) *gomqSocketClient {
pushAddr := fmt.Sprintf("tcp://%s:%d", masterHost, masterPort)
pullAddr := fmt.Sprintf("tcp://%s:%d", masterHost, masterPort+1)
pushSocket := newGomqSocket(zmtp.PushSocketType)
connectSock(pushSocket, pushAddr)
pullSocket := newGomqSocket(zmtp.PullSocketType)
connectSock(pullSocket, pullAddr)
log.Println("ZMQ sockets connected")
newClient := &gomqSocketClient{
pushSocket: pushSocket,
pullSocket: pullSocket,
}
go newClient.recv()
go newClient.send()
return newClient
}
func (c *gomqSocketClient) recv() {
for {
msg, err := c.pullSocket.Recv()
if err != nil {
log.Printf("Error reading: %v\n", err)
} else {
msgFromMaster := newMessageFromBytes(msg)
fromMaster <- msgFromMaster
}
}
}
func (c *gomqSocketClient) send() {
for {
select {
case msg := <-toMaster:
c.sendMessage(msg)
if msg.Type == "quit" {
disconnectedFromMaster <- true
}
}
}
}
func (c *gomqSocketClient) sendMessage(msg *message) {
err := c.pushSocket.Send(msg.serialize())
if err != nil {
log.Printf("Error sending: %v\n", err)
}
}