From 2e9078c74e09e0580f5c3f3044b0fa645b3395fd Mon Sep 17 00:00:00 2001 From: Ingmar Stein Date: Fri, 24 May 2024 21:28:53 +0200 Subject: [PATCH] Set I/O deadlines --- pkg/message/modbus.go | 4 +- pkg/multiplexer/multiplexer.go | 70 +++++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/pkg/message/modbus.go b/pkg/message/modbus.go index 4de5581..c9502c1 100644 --- a/pkg/message/modbus.go +++ b/pkg/message/modbus.go @@ -7,8 +7,8 @@ import ( ) const ( - maxTCPFrameLength int = 260 - mbapHeaderLength int = 7 + maxTCPFrameLength = 260 + mbapHeaderLength = 6 ) type ModbusMessageReader struct { diff --git a/pkg/multiplexer/multiplexer.go b/pkg/multiplexer/multiplexer.go index 423fd71..b08fbac 100644 --- a/pkg/multiplexer/multiplexer.go +++ b/pkg/multiplexer/multiplexer.go @@ -10,7 +10,30 @@ import ( "time" ) -type messageType int +type ( + messageType int + + reqContainer struct { + typ messageType + message []byte + sender chan<- *respContainer + } + + respContainer struct { + message []byte + err error + } + + Multiplexer struct { + targetServer string + port string + messageReader message.Reader + l net.Listener + quit chan struct{} + wg *sync.WaitGroup + requestQueue chan *reqContainer + } +) const ( Connection messageType = iota @@ -18,25 +41,8 @@ const ( Packet ) -type reqContainer struct { - typ messageType - message []byte - sender chan<- *respContainer -} - -type respContainer struct { - message []byte - err error -} - -type Multiplexer struct { - targetServer string - port string - messageReader message.Reader - l net.Listener - quit chan struct{} - wg *sync.WaitGroup - requestQueue chan *reqContainer +func deadline() time.Time { + return time.Now().Add(60 * time.Second) } func New(targetServer, port string, messageReader message.Reader) Multiplexer { @@ -107,7 +113,12 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai callback := make(chan *respContainer) for { + err := conn.SetReadDeadline(deadline()) + if err != nil { + logrus.Errorf("error setting read deadline: %v", err) + } msg, err := mux.messageReader.ReadMessage(conn) + logrus.Debug("Done reading from client...") if err == io.EOF { logrus.Infof("closed: %v <-> %v", conn.RemoteAddr(), conn.LocalAddr()) break @@ -137,6 +148,10 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai } // write back + err = conn.SetWriteDeadline(deadline()) + if err != nil { + logrus.Errorf("error setting write deadline: %v", err) + } _, err = conn.Write(resp.message) if err != nil { logrus.Errorf("%v", err) @@ -148,7 +163,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai func (mux *Multiplexer) createTargetConn() net.Conn { for { logrus.Info("creating target connection") - conn, err := net.Dial("tcp", mux.targetServer) + conn, err := net.DialTimeout("tcp", mux.targetServer, 30*time.Second) if err != nil { logrus.Errorf("failed to connect to target server %s, %v", mux.targetServer, err) // TODO: make sleep time configurable @@ -194,8 +209,12 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) { conn = mux.createTargetConn() } - request := container.message - _, err := conn.Write(request) + err := conn.SetWriteDeadline(deadline()) + if err != nil { + logrus.Errorf("error setting write deadline: %v", err) + } + + _, err = conn.Write(container.message) if err != nil { container.sender <- &respContainer{ err: err, @@ -211,6 +230,11 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) { continue } + err = conn.SetReadDeadline(deadline()) + if err != nil { + logrus.Errorf("error setting read deadline: %v", err) + } + msg, err := mux.messageReader.ReadMessage(conn) container.sender <- &respContainer{ message: msg,