Skip to content

Commit

Permalink
Use rwlock to protect the bufConn map
Browse files Browse the repository at this point in the history
Map in golang do not support concurrent read and write, use
rwlock to protect it.
  • Loading branch information
chenglch committed Dec 14, 2017
1 parent 8b00b7f commit 278122f
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Console struct {
session plugins.ConsoleSession // interface for plugin
running chan bool
node *Node
mutex *sync.Mutex
mutex *sync.RWMutex
}

func NewConsole(baseSession *plugins.BaseSession, node *Node) *Console {
Expand All @@ -44,13 +44,15 @@ func NewConsole(baseSession *plugins.BaseSession, node *Node) *Console {
network: new(common.Network),
bufConn: make(map[net.Conn]chan []byte),
running: make(chan bool, 0),
mutex: new(sync.Mutex)}
mutex: new(sync.RWMutex)}
}

// Accept connection from client
func (c *Console) Accept(conn net.Conn) {
plog.DebugNode(c.node.StorageNode.Name, "Accept connection from client")
c.mutex.Lock()
c.bufConn[conn] = make(chan []byte)
c.mutex.Unlock()
go c.writeTarget(conn)
go c.writeClient(conn)
}
Expand Down Expand Up @@ -79,10 +81,13 @@ func (c *Console) writeTarget(conn net.Conn) {
c.Disconnect(conn)
}()
for {
c.mutex.RLock()
if _, ok := c.bufConn[conn]; !ok {
c.mutex.RUnlock()
plog.ErrorNode(c.node.StorageNode.Name, fmt.Sprintf("Failed to find the connection from bufConn, the connection may be closed."))
return
}
c.mutex.RUnlock()
n, err := c.network.ReceiveInt(conn)
if err != nil {
plog.WarningNode(c.node.StorageNode.Name, fmt.Sprintf("Failed to receive message head from client. Error:%s.", err.Error()))
Expand Down Expand Up @@ -132,10 +137,13 @@ func (c *Console) writeClient(conn net.Conn) {
plog.WarningNode(c.node.StorageNode.Name, fmt.Sprintf("Failed to log message to %s. Error:%s", logFile, err.Error()))
}
for {
c.mutex.RLock()
if bufChan, ok = c.bufConn[conn]; !ok {
c.mutex.RUnlock()
plog.ErrorNode(c.node.StorageNode.Name, fmt.Sprintf("Failed to find the connection from bufConn, the connection may be closed."))
return
}
c.mutex.RUnlock()
b := <-bufChan
err = c.network.SendByteWithLengthTimeout(conn, b, clientTimeout)
if err != nil {
Expand Down Expand Up @@ -192,9 +200,11 @@ func (c *Console) readTarget() {
func (c *Console) writeClientChan(buf []byte) {
b := make([]byte, len(buf))
copy(b, buf)
c.mutex.RLock()
for _, v := range c.bufConn {
v <- b
}
c.mutex.RUnlock()
}

func (c *Console) Start() {
Expand All @@ -217,25 +227,25 @@ func (c *Console) Stop() {
func (c *Console) ListSessionUser() []string {
ret := make([]string, len(c.bufConn))
i := 0
c.mutex.RLock()
for c, _ := range c.bufConn {
ret[i] = c.RemoteAddr().String()
i++
}
c.mutex.RUnlock()
return ret
}

// close session from rest api
func (c *Console) Close() {
plog.DebugNode(c.node.StorageNode.Name, "Close console session.")
c.mutex.Lock()
for k, v := range c.bufConn {
c.mutex.Lock()
if _, ok := c.bufConn[k]; ok {
close(v)
}
close(v)
k.Close()
delete(c.bufConn, k)
c.mutex.Unlock()
}
c.mutex.Unlock()
if c.running != nil {
c.mutex.Lock()
if c.running != nil {
Expand Down

0 comments on commit 278122f

Please sign in to comment.