Skip to content

Commit

Permalink
Support webrpc SSE streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
VojtechVitek committed Dec 10, 2023
1 parent d244bd7 commit 4fe37fb
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 86 deletions.
190 changes: 150 additions & 40 deletions client.go.tmpl
Original file line number Diff line number Diff line change
@@ -1,41 +1,47 @@
{{define "client"}}
{{- $typeMap := .TypeMap -}}
{{- $typePrefix := .TypePrefix -}}
{{- if .Services -}}
{{- $services := .Services -}}
{{- $opts := .Opts }}

{{- if $services -}}
//
// Client
//

{{range .Services -}}
{{range $services -}}
const {{.Name}}PathPrefix = "/rpc/{{.Name}}/"
{{end}}

{{- range .Services -}}
{{- range $_, $service := $services -}}

{{ $serviceName := (printf "%sClient" (.Name | firstLetterToLower)) }}
type {{$serviceName}} struct {
{{- $serviceNameClient := (printf "%sClient" ($service.Name | firstLetterToLower)) }}
{{- $ServiceNameClient := (printf "%sClient" ($service.Name | firstLetterToUpper)) }}
type {{$serviceNameClient}} struct {
client HTTPClient
urls [{{len .Methods}}]string
urls [{{len $service.Methods}}]string
}

func New{{.Name | firstLetterToUpper }}Client(addr string, client HTTPClient) {{.Name}} {
prefix := urlBase(addr) + {{.Name}}PathPrefix
urls := [{{len .Methods}}]string{
{{- range .Methods}}
prefix + "{{.Name}}",
func New{{$ServiceNameClient}}(addr string, client HTTPClient) {{$ServiceNameClient}} {
prefix := urlBase(addr) + {{$service.Name}}PathPrefix
urls := [{{len $service.Methods}}]string{
{{- range $_, $method := $service.Methods}}
prefix + "{{$method.Name}}",
{{- end}}
}
return &{{$serviceName}}{
return &{{$serviceNameClient}}{
client: client,
urls: urls,
}
}

{{- range $i, $method := .Methods -}}
{{- range $i, $method := $service.Methods -}}
{{- $inputs := $method.Inputs -}}
{{- $outputs := $method.Outputs }}

func (c *{{$serviceName}}) {{.Name}}(ctx context.Context{{range $_, $input := $inputs}}, {{$input.Name}} {{template "field" dict "Name" $input.Name "Type" $input.Type "Optional" $input.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $input.Meta}}{{end}}) {{if len .Outputs}}({{end}}{{range $i, $output := .Outputs}}{{template "field" dict "Name" $output.Name "Type" $output.Type "Optional" $output.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $output.Meta}}{{if lt $i (len $method.Outputs)}}, {{end}}{{end}}error{{if len .Outputs}}){{end}} {
{{ if eq $method.StreamOutput false -}}

func (c *{{$serviceNameClient}}) {{$method.Name}}(ctx context.Context{{range $_, $input := $inputs}}, {{$input.Name}} {{template "field" dict "Name" $input.Name "Type" $input.Type "Optional" $input.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $input.Meta}}{{end}}) {{if len $outputs}}({{end}}{{range $i, $output := $outputs}}{{template "field" dict "Name" $output.Name "Type" $output.Type "Optional" $output.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $output.Meta}}{{if lt $i (len $method.Outputs)}}, {{end}}{{end}}error{{if len $outputs}}){{end}} {
{{- $inputVar := "nil" -}}
{{- $outputVar := "nil" -}}
{{- if $inputs | len}}
Expand All @@ -49,15 +55,129 @@ func (c *{{$serviceName}}) {{.Name}}(ctx context.Context{{range $_, $input := $i
{{- if $outputs | len}}
{{- $outputVar = "&out"}}
out := struct {
{{- range $i, $output := .Outputs}}
{{- range $i, $output := $outputs}}
Ret{{$i}} {{template "field" dict "Name" $output.Name "Type" $output.Type "Optional" $output.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $output.Meta "JsonTags" true}}
{{- end}}
}{}
{{ end }}
err := doJSONRequest(ctx, c.client, c.urls[{{$i}}], {{$inputVar}}, {{$outputVar}})
return {{range $i, $output := .Outputs}}out.Ret{{$i}}, {{end}}err

resp, err := doHTTPRequest(ctx, c.client, c.urls[{{$i}}], {{$inputVar}}, {{$outputVar}})
defer func() {
if resp != nil {
cerr := resp.Body.Close()
if err == nil && cerr != nil {
err = ErrWebrpcRequestFailed.WithCause(fmt.Errorf("failed to close response body: %w", cerr))
}
}
}()

return {{range $i, $output := $outputs}}out.Ret{{$i}}, {{end}}err
}

{{- else -}}

func (c *{{$serviceNameClient}}) {{$method.Name}}(ctx context.Context{{range $_, $input := $inputs}}, {{$input.Name}} {{template "field" dict "Name" $input.Name "Type" $input.Type "Optional" $input.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $input.Meta}}{{end}}) ({{$method.Name}}StreamReader, error) {
{{- $inputVar := "nil" -}}
{{- if $inputs | len}}
{{- $inputVar = "in"}}
in := struct {
{{- range $i, $input := $inputs}}
Arg{{$i}} {{template "field" dict "Name" $input.Name "Type" $input.Type "Optional" $input.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $input.Meta "JsonTags" true}}
{{- end}}
}{ {{- range $i, $input := $inputs}}{{if gt $i 0}}, {{end}}{{$input.Name}}{{end}}}
{{- end}}

resp, err := doHTTPRequest(ctx, c.client, c.urls[{{$i}}], {{$inputVar}}, nil)
if err != nil {
if resp != nil {
resp.Body.Close()
}
return nil, err
}

buf := bufio.NewReader(resp.Body)
return &{{$method.Name | firstLetterToLower}}StreamReader{streamReader{ctx: ctx, c: resp.Body, r: buf, d: json.NewDecoder(buf)}}, nil
}

{{- end -}}

{{- end -}}

{{- range $i, $method := $service.Methods -}}
{{ if eq $method.StreamOutput true }}

type subscribeMessagesStreamReader struct {
streamReader
}

func (r *subscribeMessagesStreamReader) Read() ({{end}}{{range $i, $output := $method.Outputs}}{{template "field" dict "Name" $output.Name "Type" $output.Type "Optional" $output.Optional "TypeMap" $typeMap "TypePrefix" $typePrefix "TypeMeta" $output.Meta}}{{if lt $i (len $method.Outputs)}}, {{end}}error) {
var out struct{
Ret0 *Message `json:"message"`
WebRPCError *WebRPCError `json:"webrpcError"`
}

err := r.streamReader.read(&out)
if err != nil {
return nil, err
}

if out.WebRPCError != nil {
return nil, out.WebRPCError
}

return out.Ret0, nil
}
{{- end }}
{{- end }}
{{- end }}

{{- if $opts.streaming }}

type streamReader struct {
ctx context.Context
c io.Closer
r *bufio.Reader
d *json.Decoder
}

func (r *streamReader) read(v interface {}) error {
for {
// Read newlines (keep-alive pings) and unblock decoder on ctx timeout.
select {
case <-r.ctx.Done():
r.c.Close()
return ErrWebrpcClientDisconnected.WithCause(r.ctx.Err())
default:
}

b, err := r.r.ReadByte()
if err != nil {
return r.handleReadError(err)
}
if b != '\n' {
r.r.UnreadByte()
break
}
}

if err := r.d.Decode(&v); err != nil {
return r.handleReadError(err)
}

return nil
}

func (r *streamReader) handleReadError(err error) error {
defer r.c.Close()
if errors.Is(err, io.EOF) {
return ErrWebrpcStreamFinished.WithCause(err)
}
if errors.Is(err, io.ErrUnexpectedEOF) {
return ErrWebrpcStreamLost.WithCause(err)
}
return ErrWebrpcBadResponse.WithCause(fmt.Errorf("reading stream: %w", err))
}

{{- end }}

// HTTPClient is the interface used by generated clients to send HTTP requests.
Expand Down Expand Up @@ -100,65 +220,55 @@ func newRequest(ctx context.Context, url string, reqBody io.Reader, contentType
return req, nil
}

// doJSONRequest is common code to make a request to the remote service.
func doJSONRequest(ctx context.Context, client HTTPClient, url string, in, out interface{}) error {
// doHTTPRequest is common code to make a request to the remote service.
func doHTTPRequest(ctx context.Context, client HTTPClient, url string, in, out interface{}) (*http.Response, error) {
reqBody, err := json.Marshal(in)
if err != nil {
return ErrWebrpcRequestFailed.WithCause(fmt.Errorf("failed to marshal JSON body: %w", err))
return nil, ErrWebrpcRequestFailed.WithCause(fmt.Errorf("failed to marshal JSON body: %w", err))
}
if err = ctx.Err(); err != nil {
return ErrWebrpcRequestFailed.WithCause(fmt.Errorf("aborted because context was done: %w", err))
return nil, ErrWebrpcRequestFailed.WithCause(fmt.Errorf("aborted because context was done: %w", err))
}

req, err := newRequest(ctx, url, bytes.NewBuffer(reqBody), "application/json")
if err != nil {
return ErrWebrpcRequestFailed.WithCause(fmt.Errorf("could not build request: %w", err))
return nil, ErrWebrpcRequestFailed.WithCause(fmt.Errorf("could not build request: %w", err))
}

resp, err := client.Do(req)
if err != nil {
return ErrWebrpcRequestFailed.WithCause(err)
}

defer func() {
cerr := resp.Body.Close()
if err == nil && cerr != nil {
err = ErrWebrpcRequestFailed.WithCause(fmt.Errorf("failed to close response body: %w", cerr))
}
}()

if err = ctx.Err(); err != nil {
return ErrWebrpcRequestFailed.WithCause(fmt.Errorf("aborted because context was done: %w", err))
return nil, ErrWebrpcRequestFailed.WithCause(err)
}

if resp.StatusCode != 200 {
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to read server error response body: %w", err))
return nil, ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to read server error response body: %w", err))
}

var rpcErr WebRPCError
if err := json.Unmarshal(respBody, &rpcErr); err != nil {
return ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to unmarshal server error: %w", err))
return nil, ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to unmarshal server error: %w", err))
}
if rpcErr.Cause != "" {
rpcErr.cause = errors.New(rpcErr.Cause)
}
return rpcErr
return nil, rpcErr
}

if out != nil {
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to read response body: %w", err))
return nil, ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to read response body: %w", err))
}

err = json.Unmarshal(respBody, &out)
if err != nil {
return ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to unmarshal JSON response body: %w", err))
return nil, ErrWebrpcBadResponse.WithCause(fmt.Errorf("failed to unmarshal JSON response body: %w", err))
}
}

return nil
return resp, nil
}

func WithHTTPRequestHeaders(ctx context.Context, h http.Header) (context.Context, error) {
Expand Down
6 changes: 3 additions & 3 deletions errors.go.tmpl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{define "errors"}}
{{- $webrpcErrors := .WebrpcErrors -}}
{{- $schemaErrors := .SchemaErrors -}}
{{- $opts := .Opts }}
{{- $opts := .Opts -}}
//
// Errors
//
Expand Down Expand Up @@ -82,6 +82,6 @@ var (
{{ printf "Err%s = WebRPCError{Code: %v, Name: %q, Message: %q, HTTPStatus: %v}" $error.Name $error.Code $error.Name $error.Message $error.HTTPStatus}}
{{- end}}
)
{{- end}}
{{ end -}}

{{- end }}
{{- end -}}
4 changes: 2 additions & 2 deletions helpers.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ func initializeNils(v reflect.Value) {
}
}
}
{{ end }}
{{- end -}}

{{ end -}}
{{- end -}}
11 changes: 10 additions & 1 deletion imports.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
{{- end -}}
{{- end -}}

{{- if $opts.streaming -}}
{{- if $opts.server }}
{{- set $stdlibImports "sync" "" -}}
{{- end -}}
{{- if $opts.client }}
{{- set $stdlibImports "bufio" "" -}}
{{- end -}}
{{- end -}}

{{- /* Import "time" if there's at least one timestamp. */ -}}
{{ if $opts.types -}}
{{ if eq $opts.importTypesFrom "" -}}
Expand Down Expand Up @@ -85,7 +94,7 @@ import (
{{if ne $rename ""}}{{$rename}} {{end}}"{{$import}}"
{{end}}
{{- end }}
{{- end }}
{{- end -}}
)

{{- if eq $opts.json "jsoniter" }}
Expand Down
17 changes: 13 additions & 4 deletions main.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
{{- $typePrefix = (printf "%s." $typePrefix) -}}
{{- end -}}

{{- set $opts "" false -}}
{{- range $_, $service := .Services -}}
{{- range $_, $method := $service.Methods -}}
{{ if eq $method.StreamOutput true -}}
{{- set $opts "streaming" true -}}
{{- end -}}
{{- end -}}
{{- end }}

{{- /* Print help on -help. */ -}}
{{- if exists .Opts "help" -}}
{{- template "help" $opts -}}
Expand Down Expand Up @@ -93,20 +102,20 @@ func WebRPCSchemaHash() string {
{{ template "types" dict "Services" .Services "Types" .Types "TypeMap" $typeMap "TypePrefix" $typePrefix "Opts" $opts }}
{{ end -}}

{{- if $opts.server}}
{{- if $opts.server }}
{{ template "server" dict "Services" .Services "TypeMap" $typeMap "TypePrefix" $typePrefix "Opts" $opts }}
{{ end -}}

{{ if $opts.client }}
{{ template "client" dict "Services" .Services "TypeMap" $typeMap "TypePrefix" $typePrefix }}
{{ template "client" dict "Services" .Services "TypeMap" $typeMap "Opts" $opts "TypePrefix" $typePrefix }}
{{ end -}}

{{ template "helpers" dict "Opts" $opts }}

{{- template "errors" dict "WebrpcErrors" .WebrpcErrors "SchemaErrors" .Errors "Opts" $opts "TypePrefix" $typePrefix }}
{{ template "errors" dict "WebrpcErrors" .WebrpcErrors "SchemaErrors" .Errors "Opts" $opts "TypePrefix" $typePrefix }}

{{- if $opts.legacyErrors }}
{{ template "legacyErrors" . }}
{{- end }}

{{ end }}
{{- end }}
Loading

0 comments on commit 4fe37fb

Please sign in to comment.