Skip to content

Commit

Permalink
Rework handler and client internals into streams
Browse files Browse the repository at this point in the history
This is a very large commit that reworks the internals of handlers and
clients to operate on streams. Apart from a few changes to NewHandler
and NewClient, the tests continue to pass as-is. Adding support for
streaming RPCs should now be easy: we're just generating type-safe
wrappers around the generic stream type.

The surface area of the change is relatively small: we add a Stream
interface, Func equivalents for client- and server-side streams, and two
helper functions to pull Interceptors out of Options.

There are some distinct rough edges left:

* I haven't yet added interceptor support for streams.
* The client type should probably be reduced to a single-shot call.
  Though we'll still expose clients from generated code, they're doing
  very little beyond holding options.
* The client-side stream implementation is overly complex.
* Hand-writing unary handlers is now a fair bit of work, so we should
  move the health-checking support into a subpackage so we can use
  generated code without import cycles.
* Once we can generate bidi streaming handlers, we should consider
  moving reflection support into a separate package too.

This begins to address #1.
  • Loading branch information
akshayjshah committed Feb 28, 2022
1 parent 46cab7f commit 27d4be4
Show file tree
Hide file tree
Showing 17 changed files with 1,198 additions and 779 deletions.
26 changes: 17 additions & 9 deletions bad_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@ import (
// Twirp specification, mount this handler at the root of your API (so that it
// handles any requests for invalid protobuf methods).
func NewBadRouteHandler(opts ...HandlerOption) *Handler {
wrapped := Func(badRouteUnaryImpl)
if ic := ConfiguredHandlerInterceptor(opts...); ic != nil {
wrapped = ic.Wrap(wrapped)
}
return NewHandler(
"", "", "", // protobuf method, service, package names
func() proto.Message { return &emptypb.Empty{} }, // unused req msg
func(ctx context.Context, _ proto.Message) (proto.Message, error) {
path := "???"
if md, ok := HandlerMeta(ctx); ok {
path = md.Spec.Path
}
return nil, Wrap(CodeNotFound, newBadRouteError(path))
"", "", "", // protobuf package, service, method names
func(ctx context.Context, stream Stream) {
defer stream.CloseReceive()
_, err := wrapped(ctx, &emptypb.Empty{})
_ = stream.CloseSend(err)
},
opts...,
)
}

func badRouteUnaryImpl(ctx context.Context, _ proto.Message) (proto.Message, error) {
path := "???"
if md, ok := HandlerMeta(ctx); ok {
path = md.Spec.Path
}
return nil, Wrap(CodeNotFound, newBadRouteError(path))
}
245 changes: 59 additions & 186 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
package rerpc

import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"fmt"
"net/http"
"net/url"
"strconv"
"time"

"google.golang.org/protobuf/proto"

statuspb "github.com/rerpc/rerpc/internal/status/v1"
)

// Doer is the transport-level interface reRPC expects HTTP clients to
Expand All @@ -23,8 +14,11 @@ type Doer interface {
}

type callCfg struct {
Package string
Service string
Method string
EnableGzipRequest bool
MaxResponseBytes int
MaxResponseBytes int64
Interceptor Interceptor
Hooks *Hooks
}
Expand All @@ -44,72 +38,78 @@ type CallOption interface {
// To see an example of how Client is used in the generated code, see the
// internal/ping/v1test package.
type Client struct {
doer Doer
url string
methodFQN string
serviceFQN string
packageFQN string
newResponse func() proto.Message
opts []CallOption
doer Doer
baseURL string
pkg, service, method string
opts []CallOption
}

// NewClient creates a Client. The supplied URL must be the full,
// method-specific URL, without trailing slashes. The supplied method, service,
// and package must be fully-qualified protobuf identifiers, and the
// newResponse constructor must be safe to call concurrently. Any options
// passed here apply to all calls made with this client.
// NewClient creates a Client. The supplied URL must be the root URL of the
// server's API, without a trailing slash (e.g., https://api.acme.com or
// https://acme.com/grpc). The supplied package, service, and method must be
// protobuf identifiers. Any options passed here apply to all calls made with
// this client.
//
// For example, the URL https://api.acme.com/acme.foo.v1.FooService/Bar
// corresponds to method "acme.foo.v1.FooService.Bar", service
// "acme.foo.v1.FooService", and package "acme.foo.v1". In that case, the
// newResponse constructor would be:
// func() proto.Message {
// return &foopb.BarResponse{}
// }
// For example, to call the URL
// https://api.acme.com/acme.foo.v1.FooService/Bar, you'd pass the URL
// "https://api.acme.com", the package "acme.foo.v1", the service "FooService",
// and the method "Bar".
//
// Remember that NewClient is usually called from generated code - most users
// won't need to deal with long URLs or protobuf identifiers directly.
func NewClient(doer Doer, url, methodFQN, serviceFQN, packageFQN string, newResponse func() proto.Message, opts ...CallOption) *Client {
// won't need to deal with it directly.
//
// TODO: refactor this into a one-shot call. There's virtually no work that
// happens at the client level outside generated code.
func NewClient(doer Doer, baseURL, pkg, service, method string, opts ...CallOption) *Client {
return &Client{
doer: doer,
url: url,
methodFQN: methodFQN,
serviceFQN: serviceFQN,
packageFQN: packageFQN,
newResponse: newResponse,
opts: opts,
doer: doer,
baseURL: baseURL,
pkg: pkg,
service: service,
method: method,
opts: opts,
}
}

// Call the remote procedure. Any options passed apply only to the current
// call.
func (c *Client) Call(ctx context.Context, req proto.Message, opts ...CallOption) (proto.Message, error) {
var cfg callCfg
// Call creates a stream for the remote procedure. Any options passed apply
// only to the current call.
func (c *Client) Call(ctx context.Context, opts ...CallOption) Stream {
md, ok := CallMeta(ctx)
if !ok {
ctx = c.Context(ctx)
md, _ = CallMeta(ctx)
}
spec := md.Spec
methodURL := fmt.Sprintf("%s/%s.%s/%s", c.baseURL, spec.Package, spec.Service, spec.Method)
next := CallStreamFunc(func(ctx context.Context) Stream {
return newClientStream(ctx, c.doer, methodURL, spec.ReadMaxBytes, spec.RequestCompression == CompressionGzip)
})
// TODO: apply interceptors
return next(ctx)
}

func (c *Client) Context(ctx context.Context, opts ...CallOption) context.Context {
cfg := callCfg{
Package: c.pkg,
Service: c.service,
Method: c.method,
}
for _, opt := range c.opts {
opt.applyToCall(&cfg)
}
for _, opt := range opts {
opt.applyToCall(&cfg)
}

next := Func(func(ctx context.Context, req proto.Message) (proto.Message, error) {
// Take care not to return a typed nil from this function.
res, err := c.call(ctx, req, &cfg)
if err != nil {
return nil, err
}
return res, nil
})
if cfg.Interceptor != nil {
next = cfg.Interceptor.Wrap(next)
}
spec := &Specification{
Method: c.methodFQN,
Service: c.serviceFQN,
Package: c.packageFQN,
Package: cfg.Package,
Service: cfg.Service,
Method: cfg.Method,
RequestCompression: CompressionGzip,
ReadMaxBytes: cfg.MaxResponseBytes,
}
if url, err := url.Parse(c.url); err == nil {
methodURL := fmt.Sprintf("%s/%s.%s/%s", c.baseURL, spec.Package, spec.Service, spec.Method)
if url, err := url.Parse(methodURL); err == nil {
spec.Path = url.Path
}
if !cfg.EnableGzipRequest {
Expand All @@ -121,132 +121,5 @@ func (c *Client) Call(ctx context.Context, req proto.Message, opts ...CallOption
reqHeader.Set("Grpc-Encoding", spec.RequestCompression)
reqHeader.Set("Grpc-Accept-Encoding", acceptEncodingValue) // always advertise identity & gzip
reqHeader.Set("Te", "trailers")
ctx = NewCallContext(ctx, *spec, reqHeader, make(http.Header))
return next(ctx, req)
}

func (c *Client) call(ctx context.Context, req proto.Message, cfg *callCfg) (proto.Message, *Error) {
md, hasMD := CallMeta(ctx)
if !hasMD {
return nil, errorf(CodeInternal, "no call metadata available on context")
}

if deadline, ok := ctx.Deadline(); ok {
untilDeadline := time.Until(deadline)
if untilDeadline <= 0 {
return nil, errorf(CodeDeadlineExceeded, "no time to make RPC: timeout is %v", untilDeadline)
}
if enc, err := encodeTimeout(untilDeadline); err == nil {
// Tests verify that the error in encodeTimeout is unreachable, so we
// should be safe without observability for the error case.
md.req.raw.Set("Grpc-Timeout", enc)
}
}

body := &bytes.Buffer{}
if err := marshalLPM(ctx, body, req, md.Spec.RequestCompression, 0 /* maxBytes */, cfg.Hooks); err != nil {
return nil, errorf(CodeInvalidArgument, "can't marshal request as protobuf: %w", err)
}

request, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url, body)
if err != nil {
return nil, errorf(CodeInternal, "can't create HTTP request: %w", err)
}
request.Header = md.req.raw

response, err := c.doer.Do(request)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil, errorf(CodeCanceled, "context canceled")
}
if errors.Is(err, context.DeadlineExceeded) {
return nil, errorf(CodeDeadlineExceeded, "context deadline exceeded")
}
// Error message comes from our networking stack, so it's safe to expose.
return nil, wrap(CodeUnknown, err)
}
defer response.Body.Close()
defer io.Copy(ioutil.Discard, response.Body)
*md.res = NewImmutableHeader(response.Header)

if response.StatusCode != http.StatusOK {
code := CodeUnknown
if c, ok := httpToGRPC[response.StatusCode]; ok {
code = c
}
return nil, errorf(code, "HTTP status %v", response.StatusCode)
}
compression := response.Header.Get("Grpc-Encoding")
if compression == "" {
compression = CompressionIdentity
}
switch compression {
case CompressionIdentity, CompressionGzip:
default:
// Per https://github.com/grpc/grpc/blob/master/doc/compression.md, we
// should return CodeInternal and specify acceptable compression(s) (in
// addition to setting the Grpc-Accept-Encoding header).
return nil, errorf(
CodeInternal,
"unknown compression %q: accepted grpc-encoding values are %v",
compression,
acceptEncodingValue,
)
}

// When there's no body, errors sent from the first-party gRPC servers will
// be in the headers.
if err := extractError(response.Header); err != nil {
return nil, err
}

res := c.newResponse()
// Handling this error is a little complicated - read on.
unmarshalErr := unmarshalLPM(response.Body, res, compression, cfg.MaxResponseBytes)
// To ensure that we've read the trailers, read the body to completion.
io.Copy(io.Discard, response.Body)
serverErr := extractError(response.Trailer)
if serverErr != nil {
// Server sent us an error. In this case, we don't care if the
// length-prefixed message was corrupted and unmarshalErr is non-nil.
return nil, serverErr
} else if unmarshalErr != nil {
// Server thinks response was successful, so unmarshalErr is real.
return nil, errorf(CodeUnknown, "server returned invalid protobuf: %w", unmarshalErr)
}
// Server thinks response was successful and so do we, so we're done.
return res, nil
}

func extractError(h http.Header) *Error {
codeHeader := h.Get("Grpc-Status")
codeIsSuccess := (codeHeader == "" || codeHeader == "0")
if codeIsSuccess {
return nil
}

code, err := strconv.ParseUint(codeHeader, 10 /* base */, 32 /* bitsize */)
if err != nil {
return errorf(CodeUnknown, "gRPC protocol error: got invalid error code %q", codeHeader)
}
message := percentDecode(h.Get("Grpc-Message"))
ret := wrap(Code(code), errors.New(message))

detailsBinaryEncoded := h.Get("Grpc-Status-Details-Bin")
if len(detailsBinaryEncoded) > 0 {
detailsBinary, err := decodeBinaryHeader(detailsBinaryEncoded)
if err != nil {
return errorf(CodeUnknown, "server returned invalid grpc-error-details-bin trailer: %w", err)
}
var status statuspb.Status
if err := proto.Unmarshal(detailsBinary, &status); err != nil {
return errorf(CodeUnknown, "server returned invalid protobuf for error details: %w", err)
}
ret.details = status.Details
// Prefer the protobuf-encoded data to the headers (grpc-go does this too).
ret.code = Code(status.Code)
ret.err = errors.New(status.Message)
}

return ret
return NewCallContext(ctx, *spec, reqHeader, make(http.Header))
}
Loading

0 comments on commit 27d4be4

Please sign in to comment.