From 3b37f0d0370da54c1b0f69a93372ca3b68a2d373 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 12 Feb 2024 10:35:10 +0100 Subject: [PATCH] chore(abciclient): improve error handling --- abci/client/routed_client.go | 113 +++++++++++++++++++++++------------ abci/client/socket_client.go | 8 +++ 2 files changed, 83 insertions(+), 38 deletions(-) diff --git a/abci/client/routed_client.go b/abci/client/routed_client.go index 2d06d710f9..a01edd8801 100644 --- a/abci/client/routed_client.go +++ b/abci/client/routed_client.go @@ -19,13 +19,19 @@ type routedClient struct { service.Service logger log.Logger routing Routing - defaultClient Client + defaultClient ClientInfo } var _ Client = (*routedClient)(nil) type RequestType string -type Routing map[RequestType][]Client +type Routing map[RequestType][]ClientInfo + +type ClientInfo struct { + Client + // ClientID is an unique, human-readable, client identifier + ClientID string +} // NewRoutedClientWithAddr returns a new ABCI client that routes requests to multiple // underlying clients based on the request type. @@ -58,9 +64,9 @@ func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) ( if len(parts) != 3 { return nil, fmt.Errorf("invalid routing rule: %s", rule) } - requestType := parts[0] - transport := parts[1] - address := parts[2] + requestType := strings.TrimSpace(parts[0]) + transport := strings.TrimSpace(parts[1]) + address := strings.TrimSpace(parts[2]) // Create a new client if it doesn't exist clientName := fmt.Sprintf("%s:%s", transport, address) @@ -82,7 +88,7 @@ func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) ( } client := clients[clientName] - routing[RequestType(requestType)] = append(routing[RequestType(requestType)], client) + routing[RequestType(requestType)] = append(routing[RequestType(requestType)], ClientInfo{client, clientName}) } if defaultClient == nil { @@ -105,13 +111,18 @@ func NewRoutedClientWithAddr(logger log.Logger, addr string, mustConnect bool) ( // See docs of routedClient.delegate() for more details about implemented logic. // // TODO: Implement metrics for routed client and separate metrics for underlying clients. -func NewRoutedClient( - logger log.Logger, defaultClient Client, routing Routing) (Client, error) { +func NewRoutedClient(logger log.Logger, defaultClient Client, routing Routing) (Client, error) { + defaultClientID := "" + if s, ok := defaultClient.(fmt.Stringer); ok { + defaultClientID = fmt.Sprintf("DEFAULT:%s", s.String()) + } else { + defaultClientID = "DEFAULT" + } cli := &routedClient{ logger: logger, routing: routing, - defaultClient: defaultClient, + defaultClient: ClientInfo{defaultClient, defaultClientID}, } cli.Service = service.NewBaseService(logger, "RoutedClient", cli) @@ -140,6 +151,20 @@ func (cli *routedClient) OnStart(ctx context.Context) error { } func (cli *routedClient) OnStop() { + for _, clients := range cli.routing { + for _, client := range clients { + if client.IsRunning() { + switch c := client.Client.(type) { + case *socketClient: + c.Stop() + case *localClient: + c.Stop() + case *grpcClient: + c.Stop() + } + } + } + } } // delegate calls the given function on the appropriate client with the given @@ -153,11 +178,14 @@ func (cli *routedClient) OnStop() { // function on each client in turn, and returns first result where any of returned // values is non-zero. Results of subsequent calls are silently dropped. // -// If all clients return only zero values, it returns response from last client. +// If all clients return only zero values, it returns response from last client, which is effectively +// also a zero value. // // If the function returns only 1 value, it assumes it is of type `error`. // Otherwise it assumes response is `result, error`. -func (cli *routedClient) delegate(args ...interface{}) (res any, err error) { +// +// When a function call returns an error, error is returned and remaining functions are NOT called. +func (cli *routedClient) delegate(args ...interface{}) (firstResult any, err error) { // Get the caller function name; it will be our request type pc, _, _, _ := runtime.Caller(1) funcObj := runtime.FuncForPC(pc) @@ -167,59 +195,51 @@ func (cli *routedClient) delegate(args ...interface{}) (res any, err error) { clients, ok := cli.routing[RequestType(funcName)] if !ok { - clients = []Client{cli.defaultClient} + clients = []ClientInfo{cli.defaultClient} + // TODO change to trace or remove + cli.logger.Debug("no client found for method, falling back to default client", "method", funcName) } - - var firstResult []interface{} + // client that provided first non-zero result winner := "" startAll := time.Now() - var results []interface{} + var ret any for _, client := range clients { - zerosReturned := false start := time.Now() - zerosReturned, results = cli.call(client, funcName, args...) + zerosReturned, results := cli.call(client, funcName, args...) + if ret, err = parseReturned(funcName, results); err != nil { + cli.logger.Error("abci client returned error", "client_id", client.ClientID, "err", err) + return ret, err + } + // return first non-zero result if !zerosReturned && firstResult == nil { - firstResult = results - winner = reflect.TypeOf(client).String() + firstResult = ret + winner = client.ClientID } // TODO change to Trace cli.logger.Debug("routed ABCI request to a client", "method", funcName, - "client", reflect.TypeOf(client).String(), + "client_id", client.ClientID, "nil", zerosReturned, "took", time.Since(start).String()) } // TODO change to Trace - cli.logger.Debug("routed ABCI request execution finished", + cli.logger.Debug("routed ABCI request execution successful", "method", funcName, - "winner", winner, + "client_id", winner, "took", time.Since(startAll).String(), "nil", firstResult == nil) if firstResult == nil { - firstResult = results + firstResult = ret } - switch len(firstResult) { - case 0: - // should never happen - return nil, fmt.Errorf("no result from any client for ABCI method %s", funcName) - case 1: - err, _ := firstResult[0].(error) - return nil, err - - case 2: - err, _ := firstResult[1].(error) - return firstResult[0], err - default: - panic(fmt.Sprintf("unexpected number of return values: %d", len(firstResult))) - } + return firstResult, err } // call calls the given function on the given client with the given arguments. @@ -250,9 +270,26 @@ func (cli *routedClient) call(client Client, funcName string, args ...interface{ return onlyZeros, result } +func parseReturned(funcName string, ret []interface{}) (any, error) { + switch len(ret) { + case 0: + // should never happen + return nil, fmt.Errorf("no result from any client for ABCI method %s", funcName) + case 1: + err, _ := ret[0].(error) + return nil, err + + case 2: + err, _ := ret[1].(error) + return ret[0], err + default: + panic(fmt.Sprintf("unexpected number of return values: %d", len(ret))) + } +} + // Error returns an error if the client was stopped abruptly. func (cli *routedClient) Error() error { - var errs *multierror.Error + var errs error for _, clients := range cli.routing { for _, client := range clients { err := client.Error() diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 26adc2318f..9dacbc71d5 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -99,6 +99,14 @@ func (cli *socketClient) OnStop() { cli.drainQueue() } +func (cli *socketClient) String() string { + if err := cli.Error(); err != nil { + return fmt.Sprintf("%T(%s):err=%s", cli, cli.addr, err.Error()) + } + + return fmt.Sprintf("%T(%s)", cli, cli.addr) +} + // Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { cli.mtx.Lock()