From f6f0735200a0cef545c5733041d500428ba43428 Mon Sep 17 00:00:00 2001 From: Ingmar Stein Date: Tue, 19 Nov 2024 22:56:51 +0100 Subject: [PATCH] Add `--timeout` flag to configure the read/write timeout This might be needed for slow Modbus devices (#9) --- cmd/server.go | 13 +++++++++---- pkg/multiplexer/multiplexer.go | 20 +++++++++++--------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cmd/server.go b/cmd/server.go index f94647c..88f6f30 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -27,15 +27,19 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/ingmarstein/tcp-multiplexer/pkg/message" "github.com/ingmarstein/tcp-multiplexer/pkg/multiplexer" "github.com/spf13/cobra" ) -var port string -var targetServer string -var applicationProtocol string +var ( + port string + targetServer string + applicationProtocol string + timeout int +) // serverCmd represents the server command var serverCmd = &cobra.Command{ @@ -56,7 +60,7 @@ var serverCmd = &cobra.Command{ os.Exit(2) } - mux := multiplexer.New(targetServer, port, msgReader) + mux := multiplexer.New(targetServer, port, msgReader, time.Duration(timeout)*time.Second) go func() { err := mux.Start() if err != nil { @@ -87,4 +91,5 @@ func init() { serverCmd.Flags().StringVarP(&port, "listen", "l", "8000", "multiplexer will listen on") serverCmd.Flags().StringVarP(&targetServer, "targetServer", "t", "127.0.0.1:1234", "multiplexer will forward message to") serverCmd.Flags().StringVarP(&applicationProtocol, "applicationProtocol", "p", "echo", "multiplexer will parse to message echo/http/iso8583/modbus") + serverCmd.Flags().IntVar(&timeout, "timeout", 60, "timeout in seconds") } diff --git a/pkg/multiplexer/multiplexer.go b/pkg/multiplexer/multiplexer.go index 6e1dbe7..b29f6ca 100644 --- a/pkg/multiplexer/multiplexer.go +++ b/pkg/multiplexer/multiplexer.go @@ -29,6 +29,7 @@ type ( targetServer string port string messageReader message.Reader + timeout time.Duration l net.Listener quit chan struct{} wg *sync.WaitGroup @@ -42,19 +43,20 @@ const ( Packet ) -func deadline() time.Time { - return time.Now().Add(60 * time.Second) -} - -func New(targetServer, port string, messageReader message.Reader) Multiplexer { +func New(targetServer, port string, messageReader message.Reader, timeout time.Duration) Multiplexer { return Multiplexer{ targetServer: targetServer, port: port, messageReader: messageReader, quit: make(chan struct{}), + timeout: timeout, } } +func (mux *Multiplexer) deadline() time.Time { + return time.Now().Add(mux.timeout) +} + func (mux *Multiplexer) Start() error { var err error mux.l, err = net.Listen("tcp", ":"+mux.port) @@ -112,7 +114,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai callback := make(chan *respContainer) for { - err := conn.SetReadDeadline(deadline()) + err := conn.SetReadDeadline(mux.deadline()) if err != nil { slog.Error(fmt.Sprintf("error setting read deadline: %v", err)) } @@ -143,7 +145,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai } // write back - err = conn.SetWriteDeadline(deadline()) + err = conn.SetWriteDeadline(mux.deadline()) if err != nil { slog.Error(fmt.Sprintf("error setting write deadline: %v", err)) } @@ -201,7 +203,7 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) { conn = mux.createTargetConn() } - err := conn.SetWriteDeadline(deadline()) + err := conn.SetWriteDeadline(mux.deadline()) if err != nil { slog.Error(fmt.Sprintf("error setting write deadline: %v", err)) } @@ -222,7 +224,7 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) { continue } - err = conn.SetReadDeadline(deadline()) + err = conn.SetReadDeadline(mux.deadline()) if err != nil { slog.Error(fmt.Sprintf("error setting read deadline: %v", err)) }