-
Notifications
You must be signed in to change notification settings - Fork 0
/
exchange-client.go
127 lines (120 loc) · 3.41 KB
/
exchange-client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/* lvcl is a simple program clustering libvirt servers
* Copyright (C) 2020 Adam Prycki (email: [email protected])
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package main
import "net"
import "encoding/json"
import "fmt"
type eclient struct{
originLocal bool
usock bool
hostname string
outgoing chan message
incoming chan message
conn net.Conn
exch *Exchange
}
func (ec *eclient)listen(){
fmt.Printf("conn listener started for %+v\n", ec.conn)
d := json.NewDecoder(ec.conn)
var m message
var err error
for{
err = d.Decode(&m)
if config.DebugNetwork {
fmt.Printf("conn Listener received %+v\n", m)}
if err == nil{
if ec.conn != nil{
ec.incoming <- m}
}else{
break}}
ec.conn.Close()
if ec.conn != nil{
ec.conn = nil}
//delete itself from the map if running usock
//if ec.usock {
// ec.exch.notifyClusterAboutClientDisconnect(ec.hostname)
// //e.rwmux_ec.Lock()
// delete(ec.exch.outgoing, ec.hostname)
// delete(ec.exch.incoming, ec.hostname)
// //e.rwmux_ec.Unlock()
// //e.rwmuxUSock.Lock()
// delete(ec.exch.cliLogTap, ec.hostname)
// //e.rwmuxUSock.Unlock()
// close(ec.outgoing)
// }
go ec.report_dead_eclient()
ec = nil
lg.err("eclient Decoder ", err)}
func (ec *eclient)forward(){
var data message
var err error
var cleanup_loop bool = true
var chanOpen bool = false
enc := json.NewEncoder(ec.conn)
fmt.Printf("conn Forwarder started for %s\n", ec.hostname)
for{
data, chanOpen = <-ec.outgoing
if ! chanOpen {
lg.msg_debug(2, fmt.Sprintf("%+v outgoing channel closed",ec))
break}
if config.DebugNetwork {
fmt.Printf("conn Forwarder to %s received %+v\n", ec.hostname, data)}
err = enc.Encode(data)
if err != nil{
break}}
if ! ec.usock {
ec.conn.Close()
}
if ec.conn != nil{
ec.conn = nil}
//lock exchange writing mutex
// THIS MUTEX CAUSES ISSUES WITH LOCKUP ON DISCONNECTING CLIENT
//ec.exch.rwmux.Lock()
//if ! ec.usock {
// ec.exch.outgoing[ec.hostname]=nil
//}else{
// ec.exch.outgoing[ec.hostname]=nil
//ec.exch.outgoing[ec.hostname]=nil
//unlock exchange writing mutex
//ec.exch.rwmux.Unlock()
//e.loc_ex <- message{
// SrcMod: msgModExchn,
// DestMod: msgModExchn,
// SrcHost: config.GetMyHostname(),
// DestHost: "__local__",
// RpcFunc: exchangeClientDisconnected,
// Argv: []string{ ec.hostname},
//}
go ec.report_dead_eclient()
//read all queued messages
for cleanup_loop {
data,cleanup_loop = <-ec.outgoing
//fmt.Println("CLEANUP LOOP ",data)
}
ec = nil
lg.err("eclient forwarder serializer ", err)}
func (ec *eclient)report_dead_eclient(){
e.loc_ex <- message{
SrcMod: msgModExchn,
DestMod: msgModExchn,
SrcHost: config.GetMyHostname(),
DestHost: "__local__",
RpcFunc: exchangeClientDisconnected,
Argv: []string{ ec.hostname},
}
}