Skip to content

Commit

Permalink
chore(abciclient): improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Feb 12, 2024
1 parent f547e7b commit 3b37f0d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 38 deletions.
113 changes: 75 additions & 38 deletions abci/client/routed_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ type routedClient struct {
service.Service
logger log.Logger
routing Routing
defaultClient Client
defaultClient ClientInfo
}

var _ Client = (*routedClient)(nil)

type RequestType string
type Routing map[RequestType][]Client
type Routing map[RequestType][]ClientInfo

type ClientInfo struct {
Client
// ClientID is an unique, human-readable, client identifier
ClientID string
}

// NewRoutedClientWithAddr returns a new ABCI client that routes requests to multiple
// underlying clients based on the request type.
Expand Down Expand Up @@ -58,9 +64,9 @@ func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) (
if len(parts) != 3 {
return nil, fmt.Errorf("invalid routing rule: %s", rule)
}
requestType := parts[0]
transport := parts[1]
address := parts[2]
requestType := strings.TrimSpace(parts[0])
transport := strings.TrimSpace(parts[1])
address := strings.TrimSpace(parts[2])

// Create a new client if it doesn't exist
clientName := fmt.Sprintf("%s:%s", transport, address)
Expand All @@ -82,7 +88,7 @@ func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) (
}

client := clients[clientName]
routing[RequestType(requestType)] = append(routing[RequestType(requestType)], client)
routing[RequestType(requestType)] = append(routing[RequestType(requestType)], ClientInfo{client, clientName})
}

if defaultClient == nil {
Expand All @@ -105,13 +111,18 @@ func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) (
// See docs of routedClient.delegate() for more details about implemented logic.
//
// TODO: Implement metrics for routed client and separate metrics for underlying clients.
func NewRoutedClient(
logger log.Logger, defaultClient Client, routing Routing) (Client, error) {
func NewRoutedClient(logger log.Logger, defaultClient Client, routing Routing) (Client, error) {
defaultClientID := ""
if s, ok := defaultClient.(fmt.Stringer); ok {
defaultClientID = fmt.Sprintf("DEFAULT:%s", s.String())
} else {
defaultClientID = "DEFAULT"
}

cli := &routedClient{
logger: logger,
routing: routing,
defaultClient: defaultClient,
defaultClient: ClientInfo{defaultClient, defaultClientID},
}

cli.Service = service.NewBaseService(logger, "RoutedClient", cli)
Expand Down Expand Up @@ -140,6 +151,20 @@ func (cli *routedClient) OnStart(ctx context.Context) error {
}

func (cli *routedClient) OnStop() {
for _, clients := range cli.routing {
for _, client := range clients {
if client.IsRunning() {
switch c := client.Client.(type) {
case *socketClient:
c.Stop()
case *localClient:
c.Stop()
case *grpcClient:
c.Stop()
}
}
}
}
}

// delegate calls the given function on the appropriate client with the given
Expand All @@ -153,11 +178,14 @@ func (cli *routedClient) OnStop() {
// function on each client in turn, and returns first result where any of returned
// values is non-zero. Results of subsequent calls are silently dropped.
//
// If all clients return only zero values, it returns response from last client.
// If all clients return only zero values, it returns response from last client, which is effectively
// also a zero value.
//
// If the function returns only 1 value, it assumes it is of type `error`.
// Otherwise it assumes response is `result, error`.
func (cli *routedClient) delegate(args ...interface{}) (res any, err error) {
//
// When a function call returns an error, error is returned and remaining functions are NOT called.
func (cli *routedClient) delegate(args ...interface{}) (firstResult any, err error) {
// Get the caller function name; it will be our request type
pc, _, _, _ := runtime.Caller(1)
funcObj := runtime.FuncForPC(pc)
Expand All @@ -167,59 +195,51 @@ func (cli *routedClient) delegate(args ...interface{}) (res any, err error) {

clients, ok := cli.routing[RequestType(funcName)]
if !ok {
clients = []Client{cli.defaultClient}
clients = []ClientInfo{cli.defaultClient}
// TODO change to trace or remove
cli.logger.Debug("no client found for method, falling back to default client", "method", funcName)
}

var firstResult []interface{}
// client that provided first non-zero result
winner := ""

startAll := time.Now()

var results []interface{}
var ret any
for _, client := range clients {
zerosReturned := false
start := time.Now()

zerosReturned, results = cli.call(client, funcName, args...)
zerosReturned, results := cli.call(client, funcName, args...)
if ret, err = parseReturned(funcName, results); err != nil {
cli.logger.Error("abci client returned error", "client_id", client.ClientID, "err", err)
return ret, err
}

// return first non-zero result
if !zerosReturned && firstResult == nil {
firstResult = results
winner = reflect.TypeOf(client).String()
firstResult = ret
winner = client.ClientID
}

// TODO change to Trace
cli.logger.Debug("routed ABCI request to a client",
"method", funcName,
"client", reflect.TypeOf(client).String(),
"client_id", client.ClientID,
"nil", zerosReturned,
"took", time.Since(start).String())
}

// TODO change to Trace
cli.logger.Debug("routed ABCI request execution finished",
cli.logger.Debug("routed ABCI request execution successful",
"method", funcName,
"winner", winner,
"client_id", winner,
"took", time.Since(startAll).String(),
"nil", firstResult == nil)

if firstResult == nil {
firstResult = results
firstResult = ret
}

switch len(firstResult) {
case 0:
// should never happen
return nil, fmt.Errorf("no result from any client for ABCI method %s", funcName)
case 1:
err, _ := firstResult[0].(error)
return nil, err

case 2:
err, _ := firstResult[1].(error)
return firstResult[0], err
default:
panic(fmt.Sprintf("unexpected number of return values: %d", len(firstResult)))
}
return firstResult, err
}

// call calls the given function on the given client with the given arguments.
Expand Down Expand Up @@ -250,9 +270,26 @@ func (cli *routedClient) call(client Client, funcName string, args ...interface{
return onlyZeros, result
}

func parseReturned(funcName string, ret []interface{}) (any, error) {
switch len(ret) {
case 0:
// should never happen
return nil, fmt.Errorf("no result from any client for ABCI method %s", funcName)
case 1:
err, _ := ret[0].(error)
return nil, err

case 2:
err, _ := ret[1].(error)
return ret[0], err
default:
panic(fmt.Sprintf("unexpected number of return values: %d", len(ret)))
}
}

// Error returns an error if the client was stopped abruptly.
func (cli *routedClient) Error() error {
var errs *multierror.Error
var errs error
for _, clients := range cli.routing {
for _, client := range clients {
err := client.Error()
Expand Down
8 changes: 8 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (cli *socketClient) OnStop() {
cli.drainQueue()
}

func (cli *socketClient) String() string {
if err := cli.Error(); err != nil {
return fmt.Sprintf("%T(%s):err=%s", cli, cli.addr, err.Error())
}

return fmt.Sprintf("%T(%s)", cli, cli.addr)
}

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.Lock()
Expand Down

0 comments on commit 3b37f0d

Please sign in to comment.