From da7d76426a7a491682c2c788510360c0b8931c12 Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Sun, 1 Jan 2023 16:50:28 +0100 Subject: [PATCH] PRT-220 hot fix v0.4.2-rc (#216) --- relayer/chainproxy/connector.go | 6 ++++-- relayer/chainproxy/jsonRPC.go | 1 + relayer/chainproxy/rest.go | 2 ++ relayer/chainproxy/tendermintRPC.go | 2 ++ relayer/lavasession/common.go | 4 ++-- relayer/lavasession/consumer_session_manager.go | 6 ++++-- relayer/lavasession/consumer_types.go | 8 +++----- 7 files changed, 18 insertions(+), 11 deletions(-) diff --git a/relayer/chainproxy/connector.go b/relayer/chainproxy/connector.go index 94bf08674f..76c35b004d 100644 --- a/relayer/chainproxy/connector.go +++ b/relayer/chainproxy/connector.go @@ -24,6 +24,8 @@ type Connector struct { usedClients int } +const DialTimeout = 500 * time.Millisecond + func NewConnector(ctx context.Context, nConns uint, addr string) *Connector { connector := &Connector{ freeClients: make([]*rpcclient.Client, 0, nConns), @@ -37,7 +39,7 @@ func NewConnector(ctx context.Context, nConns uint, addr string) *Connector { connector.Close() return nil } - nctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + nctx, cancel := context.WithTimeout(ctx, DialTimeout) rpcClient, err = rpcclient.DialContext(nctx, addr) if err != nil { utils.LavaFormatError("Could not connect to the client, retrying", err, nil) @@ -133,7 +135,7 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec connector.Close() return nil } - nctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + nctx, cancel := context.WithTimeout(ctx, DialTimeout) grpcClient, err = grpc.DialContext(nctx, addr, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { utils.LavaFormatError("Could not connect to the client, retrying", err, nil) diff --git a/relayer/chainproxy/jsonRPC.go b/relayer/chainproxy/jsonRPC.go index 005d35b1d1..f10bc1e93d 100644 --- a/relayer/chainproxy/jsonRPC.go +++ b/relayer/chainproxy/jsonRPC.go @@ -363,6 +363,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat if err != nil { errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed) cp.portalLogs.LogRequestAndResponse("jsonrpc http", true, "POST", c.Request().URI().String(), string(c.Body()), errMasking, msgSeed, err) + c.Status(fiber.StatusInternalServerError) return c.SendString(fmt.Sprintf(`{"error": {"code":-32000,"message":"%s"}}`, errMasking)) } cp.portalLogs.LogRequestAndResponse("jsonrpc http", false, "POST", c.Request().URI().String(), string(c.Body()), string(reply.Data), msgSeed, nil) diff --git a/relayer/chainproxy/rest.go b/relayer/chainproxy/rest.go index d9bbc99278..bd174a8675 100644 --- a/relayer/chainproxy/rest.go +++ b/relayer/chainproxy/rest.go @@ -249,6 +249,7 @@ func (cp *RestChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat if err != nil { errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed) cp.portalLogs.LogRequestAndResponse("http in/out", true, http.MethodPost, path, requestBody, errMasking, msgSeed, err) + c.Status(fiber.StatusInternalServerError) return c.SendString(fmt.Sprintf(`{"error": "unsupported api","more_information:" %s}`, errMasking)) } responseBody := string(reply.Data) @@ -274,6 +275,7 @@ func (cp *RestChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat if err != nil { errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed) cp.portalLogs.LogRequestAndResponse("http in/out", true, http.MethodGet, path, "", errMasking, msgSeed, err) + c.Status(fiber.StatusInternalServerError) return c.SendString(fmt.Sprintf(`{"error": "unsupported api","more_information": %s}`, errMasking)) } responseBody := string(reply.Data) diff --git a/relayer/chainproxy/tendermintRPC.go b/relayer/chainproxy/tendermintRPC.go index e990a7dfd5..c9c2595e31 100644 --- a/relayer/chainproxy/tendermintRPC.go +++ b/relayer/chainproxy/tendermintRPC.go @@ -330,6 +330,7 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc if err != nil { errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed) cp.portalLogs.LogRequestAndResponse("tendermint http in/out", true, "POST", c.Request().URI().String(), string(c.Body()), errMasking, msgSeed, err) + c.Status(fiber.StatusInternalServerError) return c.SendString(fmt.Sprintf(`{"error": "unsupported api","more_information": %s}`, errMasking)) } cp.portalLogs.LogRequestAndResponse("tendermint http in/out", false, "POST", c.Request().URI().String(), string(c.Body()), string(reply.Data), msgSeed, nil) @@ -352,6 +353,7 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc if err != nil { errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed) cp.portalLogs.LogRequestAndResponse("tendermint http in/out", true, "GET", c.Request().URI().String(), "", errMasking, msgSeed, err) + c.Status(fiber.StatusInternalServerError) if string(c.Body()) != "" { return c.SendString(fmt.Sprintf(`{"error": "unsupported api", "recommendation": "For jsonRPC use POST", "more_information": "%s"}`, errMasking)) } diff --git a/relayer/lavasession/common.go b/relayer/lavasession/common.go index eb77fb35d5..c82e74ea16 100644 --- a/relayer/lavasession/common.go +++ b/relayer/lavasession/common.go @@ -7,8 +7,8 @@ import ( ) const ( - MaxConsecutiveConnectionAttempts = 3 - TimeoutForEstablishingAConnection = 300 * time.Millisecond + MaxConsecutiveConnectionAttempts = 10 + TimeoutForEstablishingAConnection = 1 * time.Second MaxSessionsAllowedPerProvider = 1000 // Max number of sessions allowed per provider MaxAllowedBlockListedSessionPerProvider = 3 MaximumNumberOfFailuresAllowedPerConsumerSession = 3 diff --git a/relayer/lavasession/consumer_session_manager.go b/relayer/lavasession/consumer_session_manager.go index cccba14c97..f07579ad47 100644 --- a/relayer/lavasession/consumer_session_manager.go +++ b/relayer/lavasession/consumer_session_manager.go @@ -119,7 +119,9 @@ func (csm *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSe utils.LavaFormatFatal("Unsupported Error", err, nil) } } else if !connected { - // If failed to connect we try again getting a random provider to pick from + // If failed to connect we ignore this provider for this get session request only + // and try again getting a random provider to pick from + tempIgnoredProviders.providers[providerAddress] = struct{}{} continue } @@ -203,7 +205,7 @@ func (csm *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredP currentEpoch = csm.atomicReadCurrentEpoch() // reading the epoch here while locked, to get the epoch of the pairing. if ignoredProviders.currentEpoch < currentEpoch { utils.LavaFormatDebug("ignoredProviders epoch is not the current epoch, resetting ignoredProviders", &map[string]string{"ignoredProvidersEpoch": strconv.FormatUint(ignoredProviders.currentEpoch, 10), "currentEpoch": strconv.FormatUint(currentEpoch, 10)}) - ignoredProviders.providers = nil // reset the old providers as epochs changed so we have a new pairing list. + ignoredProviders.providers = make(map[string]struct{}) // reset the old providers as epochs changed so we have a new pairing list. ignoredProviders.currentEpoch = currentEpoch } diff --git a/relayer/lavasession/consumer_types.go b/relayer/lavasession/consumer_types.go index db9e82c85f..861400843f 100644 --- a/relayer/lavasession/consumer_types.go +++ b/relayer/lavasession/consumer_types.go @@ -135,8 +135,8 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er return nil } -func (cswp *ConsumerSessionsWithProvider) connectRawClient(ctx context.Context, addr string) (*pairingtypes.RelayerClient, error) { - connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second) +func (cswp *ConsumerSessionsWithProvider) connectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, error) { + connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection) defer cancel() conn, err := grpc.DialContext(connectCtx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) @@ -212,9 +212,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes continue } if endpoint.Client == nil { - connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection) - conn, err := cswp.connectRawClient(connectCtx, endpoint.Addr) - cancel() + conn, err := cswp.connectRawClientWithTimeout(ctx, endpoint.Addr) if err != nil { endpoint.ConnectionRefusals++ utils.LavaFormatError("error connecting to provider", err, &map[string]string{"provider endpoint": endpoint.Addr, "provider address": cswp.Acc, "endpoint": fmt.Sprintf("%+v", endpoint)})