Skip to content
This repository has been archived by the owner on Oct 4, 2019. It is now read-only.

Commit

Permalink
Merge pull request #578 from ethereumproject/rpc/pending-req
Browse files Browse the repository at this point in the history
rpc: honour pending requests before tearing conn down
  • Loading branch information
whilei authored May 1, 2018
2 parents b18a737 + a6b88b8 commit 456053c
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -138,14 +139,15 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
// requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
var pend sync.WaitGroup

defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
glog.Errorln(string(buf))
}

s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
Expand Down Expand Up @@ -174,8 +176,13 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
// If a parsing error occurred, send an error
if err.Error() != "EOF" {
glog.V(logger.Debug).Infof("%v", err)
codec.Write(codec.CreateErrorResponse(nil, err))
}
// Error or end of stream, wait for requests and tear down
pend.Wait()
return nil
}

Expand All @@ -194,20 +201,27 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
}
return nil
}

if singleShot && batch {
s.execBatch(ctx, codec, reqs)
return nil
} else if singleShot && !batch {
s.exec(ctx, codec, reqs[0])
// If a single shot request is executing, run and return immediately
if singleShot {
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
return nil
} else if !singleShot && batch {
go s.execBatch(ctx, codec, reqs)
} else {
go s.exec(ctx, codec, reqs[0])
}
}
// For multi-shot connections, start a goroutine to serve and loop back
pend.Add(1)

go func(reqs []*serverRequest, batch bool) {
defer pend.Done()
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
}(reqs, batch)
}
return nil
}

Expand Down

0 comments on commit 456053c

Please sign in to comment.