Skip to content

Commit

Permalink
Add --timeout flag to configure the read/write timeout
Browse files Browse the repository at this point in the history
This might be needed for slow Modbus devices (#9)
  • Loading branch information
IngmarStein committed Nov 19, 2024
1 parent 2da2625 commit f6f0735
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
13 changes: 9 additions & 4 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/ingmarstein/tcp-multiplexer/pkg/message"
"github.com/ingmarstein/tcp-multiplexer/pkg/multiplexer"
"github.com/spf13/cobra"
)

var port string
var targetServer string
var applicationProtocol string
var (
port string
targetServer string
applicationProtocol string
timeout int
)

// serverCmd represents the server command
var serverCmd = &cobra.Command{
Expand All @@ -56,7 +60,7 @@ var serverCmd = &cobra.Command{
os.Exit(2)
}

mux := multiplexer.New(targetServer, port, msgReader)
mux := multiplexer.New(targetServer, port, msgReader, time.Duration(timeout)*time.Second)
go func() {
err := mux.Start()
if err != nil {
Expand Down Expand Up @@ -87,4 +91,5 @@ func init() {
serverCmd.Flags().StringVarP(&port, "listen", "l", "8000", "multiplexer will listen on")
serverCmd.Flags().StringVarP(&targetServer, "targetServer", "t", "127.0.0.1:1234", "multiplexer will forward message to")
serverCmd.Flags().StringVarP(&applicationProtocol, "applicationProtocol", "p", "echo", "multiplexer will parse to message echo/http/iso8583/modbus")
serverCmd.Flags().IntVar(&timeout, "timeout", 60, "timeout in seconds")
}
20 changes: 11 additions & 9 deletions pkg/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type (
targetServer string
port string
messageReader message.Reader
timeout time.Duration
l net.Listener
quit chan struct{}
wg *sync.WaitGroup
Expand All @@ -42,19 +43,20 @@ const (
Packet
)

func deadline() time.Time {
return time.Now().Add(60 * time.Second)
}

func New(targetServer, port string, messageReader message.Reader) Multiplexer {
func New(targetServer, port string, messageReader message.Reader, timeout time.Duration) Multiplexer {
return Multiplexer{
targetServer: targetServer,
port: port,
messageReader: messageReader,
quit: make(chan struct{}),
timeout: timeout,
}
}

func (mux *Multiplexer) deadline() time.Time {
return time.Now().Add(mux.timeout)
}

func (mux *Multiplexer) Start() error {
var err error
mux.l, err = net.Listen("tcp", ":"+mux.port)
Expand Down Expand Up @@ -112,7 +114,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai
callback := make(chan *respContainer)

for {
err := conn.SetReadDeadline(deadline())
err := conn.SetReadDeadline(mux.deadline())
if err != nil {
slog.Error(fmt.Sprintf("error setting read deadline: %v", err))
}
Expand Down Expand Up @@ -143,7 +145,7 @@ func (mux *Multiplexer) handleConnection(conn net.Conn, sender chan<- *reqContai
}

// write back
err = conn.SetWriteDeadline(deadline())
err = conn.SetWriteDeadline(mux.deadline())
if err != nil {
slog.Error(fmt.Sprintf("error setting write deadline: %v", err))
}
Expand Down Expand Up @@ -201,7 +203,7 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {
conn = mux.createTargetConn()
}

err := conn.SetWriteDeadline(deadline())
err := conn.SetWriteDeadline(mux.deadline())
if err != nil {
slog.Error(fmt.Sprintf("error setting write deadline: %v", err))
}
Expand All @@ -222,7 +224,7 @@ func (mux *Multiplexer) targetConnLoop(requestQueue <-chan *reqContainer) {
continue
}

err = conn.SetReadDeadline(deadline())
err = conn.SetReadDeadline(mux.deadline())
if err != nil {
slog.Error(fmt.Sprintf("error setting read deadline: %v", err))
}
Expand Down

0 comments on commit f6f0735

Please sign in to comment.