diff --git a/rhp/v3/rhp.go b/rhp/v3/rhp.go index a305d6bc..abc897a7 100644 --- a/rhp/v3/rhp.go +++ b/rhp/v3/rhp.go @@ -173,14 +173,16 @@ var ( // handleHostStream handles streams routed to the "host" subscriber func (sh *SessionHandler) handleHostStream(remoteAddr string, s *rhpv3.Stream) { - s.SetDeadline(time.Now().Add(30 * time.Second)) + defer s.Close() // close the stream when the RPC has completed + + s.SetDeadline(time.Now().Add(30 * time.Second)) // set an initial timeout rpcID, err := s.ReadID() - s.SetDeadline(time.Time{}) if err != nil { sh.log.Debug("failed to read RPC ID", zap.Error(err)) return } + s.SetDeadline(time.Now().Add(30 * time.Second)) start := time.Now() switch rpcID { case rhpv3.RPCAccountBalanceID: @@ -188,6 +190,7 @@ func (sh *SessionHandler) handleHostStream(remoteAddr string, s *rhpv3.Stream) { case rhpv3.RPCUpdatePriceTableID: err = sh.handleRPCPriceTable(s) case rhpv3.RPCExecuteProgramID: + s.SetDeadline(time.Now().Add(time.Minute)) err = sh.handleRPCExecute(s) case rhpv3.RPCFundAccountID: err = sh.handleRPCFundAccount(s) @@ -228,7 +231,9 @@ func (sh *SessionHandler) Serve() error { ingress, egress := sh.settings.BandwidthLimiters() t, err := rhpv3.NewHostTransport(rhp.NewConn(conn, ingress, egress), sh.privateKey) if err != nil { - return fmt.Errorf("failed to upgrade conn: %w", err) + conn.Close() + sh.log.Debug("failed to upgrade conn", zap.Error(err), zap.String("remoteAddress", conn.RemoteAddr().String())) + continue } go func() {