Skip to content

Commit

Permalink
Map HTTP/2 RST_STREAM codes back to RPC codes (#321)
Browse files Browse the repository at this point in the history
HTTP/2 includes its own set of error codes, which are sent in RST_STREAM
frames. When servers send one of these codes to clients, we should map
the HTTP/2 code back to one of our status codes.
  • Loading branch information
akshayjshah authored Jul 15, 2022
1 parent 8e89a92 commit 4da0450
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 10 deletions.
8 changes: 5 additions & 3 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func (d *duplexHTTPCall) Read(data []byte) (int, error) {
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
return d.response.Body.Read(data)
n, err := d.response.Body.Read(data)
return n, wrapIfRSTError(err)
}

func (d *duplexHTTPCall) CloseRead() error {
Expand All @@ -162,9 +163,9 @@ func (d *duplexHTTPCall) CloseRead() error {
return nil
}
if err := discard(d.response.Body); err != nil {
return err
return wrapIfRSTError(err)
}
return d.response.Body.Close()
return wrapIfRSTError(d.response.Body.Close())
}

// ResponseStatusCode is the response's HTTP status code.
Expand Down Expand Up @@ -245,6 +246,7 @@ func (d *duplexHTTPCall) makeRequest() {
err = wrapIfContextError(err)
err = wrapIfLikelyH2CNotConfiguredError(d.request, err)
err = wrapIfLikelyWithGRPCNotUsedError(err)
err = wrapIfRSTError(err)
if _, ok := asError(err); !ok {
err = NewError(CodeUnavailable, err)
}
Expand Down
57 changes: 57 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strings"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -247,3 +248,59 @@ func wrapIfLikelyWithGRPCNotUsedError(err error) error {
}
return err
}

// HTTP/2 has its own set of error codes, which it sends in RST_STREAM frames.
// When the server sends one of these errors, we should map it back into our
// RPC error codes following
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#http2-transport-mapping.
//
// This would be vastly simpler if we were using x/net/http2 directly, since
// the StreamError type is exported. When x/net/http2 gets vendored into
// net/http, though, all these types become unexported...so we're left with
// string munging.
func wrapIfRSTError(err error) error {
const (
streamErrPrefix = "stream error: "
fromPeerSuffix = "; received from peer"
)
if err == nil {
return nil
}
if _, ok := asError(err); ok {
return err
}
if urlErr := new(url.Error); errors.As(err, &urlErr) {
// If we get an RST_STREAM error from http.Client.Do, it's wrapped in a
// *url.Error.
err = urlErr.Unwrap()
}
msg := err.Error()
if !strings.HasPrefix(msg, streamErrPrefix) {
return err
}
if !strings.HasSuffix(msg, fromPeerSuffix) {
return err
}
msg = strings.TrimSuffix(msg, fromPeerSuffix)
i := strings.LastIndex(msg, ";")
if i < 0 || i >= len(msg)-1 {
return err
}
msg = msg[i+1:]
msg = strings.TrimSpace(msg)
switch msg {
case "NO_ERROR", "PROTOCOL_ERROR", "INTERNAL_ERROR", "FLOW_CONTROL_ERROR",
"SETTINGS_TIMEOUT", "FRAME_SIZE_ERROR", "COMPRESSION_ERROR", "CONNECT_ERROR":
return NewError(CodeInternal, err)
case "REFUSED_STREAM":
return NewError(CodeUnavailable, err)
case "CANCEL":
return NewError(CodeCanceled, err)
case "ENHANCE_YOUR_CALM":
return NewError(CodeResourceExhausted, fmt.Errorf("bandwidth exhausted: %w", err))
case "INADEQUATE_SECURITY":
return NewError(CodePermissionDenied, fmt.Errorf("transport protocol insecure: %w", err))
default:
return err
}
}
18 changes: 11 additions & 7 deletions recover_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ func (s *panicPingServer) Ping(
}

func (s *panicPingServer) CountUp(
context.Context,
*connect.Request[pingv1.CountUpRequest],
*connect.ServerStream[pingv1.CountUpResponse],
_ context.Context,
_ *connect.Request[pingv1.CountUpRequest],
stream *connect.ServerStream[pingv1.CountUpResponse],
) error {
if err := stream.Send(&pingv1.CountUpResponse{}); err != nil {
return err
}
panic(s.panicWith) // nolint:forbidigo
}

Expand All @@ -60,14 +63,15 @@ func TestWithRecover(t *testing.T) {
}
assertNotHandled := func(err error) {
t.Helper()
if err != nil {
assert.NotEqual(t, connect.CodeOf(err), connect.CodeFailedPrecondition)
}
// When HTTP/2 handlers panic, net/http sends an RST_STREAM frame with code
// INTERNAL_ERROR. We should be mapping this back to CodeInternal.
assert.Equal(t, connect.CodeOf(err), connect.CodeInternal)
}
drainStream := func(stream *connect.ServerStreamForClient[pingv1.CountUpResponse]) error {
t.Helper()
defer stream.Close()
assert.False(t, stream.Receive())
assert.True(t, stream.Receive()) // expect one response msg
assert.False(t, stream.Receive()) // expect panic before second response msg
return stream.Err()
}
pinger := &panicPingServer{}
Expand Down

0 comments on commit 4da0450

Please sign in to comment.