Skip to content

Commit

Permalink
Set I/O deadlines
Browse files Browse the repository at this point in the history
  • Loading branch information
IngmarStein committed May 24, 2024
1 parent f33400f commit 2e9078c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pkg/message/modbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

const (
maxTCPFrameLength int = 260
mbapHeaderLength int = 7
maxTCPFrameLength = 260
mbapHeaderLength = 6
)

type ModbusMessageReader struct {
Expand Down
70 changes: 47 additions & 23 deletions pkg/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,39 @@ import (
"time"
)

type messageType int
type (
messageType int

reqContainer struct {
typ messageType
message []byte
sender chan<- *respContainer
}

respContainer struct {
message []byte
err error
}

Multiplexer struct {
targetServer string
port string
messageReader message.Reader
l net.Listener
quit chan struct{}
wg *sync.WaitGroup
requestQueue chan *reqContainer
}
)

const (
Connection messageType = iota
Disconnection
Packet
)

type reqContainer struct {
typ messageType
message []byte
sender chan<- *respContainer
}

type respContainer struct {
message []byte
err error
}

type Multiplexer struct {
targetServer string
port string
messageReader message.Reader
l net.Listener
quit chan struct{}
wg *sync.WaitGroup
requestQueue chan *reqContainer
func deadline() time.Time {
return time.Now().Add(60 * time.Second)
}

func New(targetServer, port string, messageReader message.Reader) Multiplexer {
Expand Down Expand Up @@ -107,7 +113,12 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai
callback := make(chan *respContainer)

for {
err := conn.SetReadDeadline(deadline())
if err != nil {
logrus.Errorf("error setting read deadline: %v", err)
}
msg, err := mux.messageReader.ReadMessage(conn)
logrus.Debug("Done reading from client...")
if err == io.EOF {
logrus.Infof("closed: %v <-> %v", conn.RemoteAddr(), conn.LocalAddr())
break
Expand Down Expand Up @@ -137,6 +148,10 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai
}

// write back
err = conn.SetWriteDeadline(deadline())
if err != nil {
logrus.Errorf("error setting write deadline: %v", err)
}
_, err = conn.Write(resp.message)
if err != nil {
logrus.Errorf("%v", err)
Expand All @@ -148,7 +163,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai
func (mux *Multiplexer) createTargetConn() net.Conn {
for {
logrus.Info("creating target connection")
conn, err := net.Dial("tcp", mux.targetServer)
conn, err := net.DialTimeout("tcp", mux.targetServer, 30*time.Second)
if err != nil {
logrus.Errorf("failed to connect to target server %s, %v", mux.targetServer, err)
// TODO: make sleep time configurable
Expand Down Expand Up @@ -194,8 +209,12 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {
conn = mux.createTargetConn()
}

request := container.message
_, err := conn.Write(request)
err := conn.SetWriteDeadline(deadline())
if err != nil {
logrus.Errorf("error setting write deadline: %v", err)
}

_, err = conn.Write(container.message)
if err != nil {
container.sender <- &respContainer{
err: err,
Expand All @@ -211,6 +230,11 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {
continue
}

err = conn.SetReadDeadline(deadline())
if err != nil {
logrus.Errorf("error setting read deadline: %v", err)
}

msg, err := mux.messageReader.ReadMessage(conn)
container.sender <- &respContainer{
message: msg,
Expand Down

0 comments on commit 2e9078c

Please sign in to comment.