Skip to content

Commit

Permalink
PRT-220 hot fix v0.4.2-rc (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet authored Jan 1, 2023
1 parent ca9031d commit da7d764
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 11 deletions.
6 changes: 4 additions & 2 deletions relayer/chainproxy/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Connector struct {
usedClients int
}

const DialTimeout = 500 * time.Millisecond

func NewConnector(ctx context.Context, nConns uint, addr string) *Connector {
connector := &Connector{
freeClients: make([]*rpcclient.Client, 0, nConns),
Expand All @@ -37,7 +39,7 @@ func NewConnector(ctx context.Context, nConns uint, addr string) *Connector {
connector.Close()
return nil
}
nctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
nctx, cancel := context.WithTimeout(ctx, DialTimeout)
rpcClient, err = rpcclient.DialContext(nctx, addr)
if err != nil {
utils.LavaFormatError("Could not connect to the client, retrying", err, nil)
Expand Down Expand Up @@ -133,7 +135,7 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec
connector.Close()
return nil
}
nctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
nctx, cancel := context.WithTimeout(ctx, DialTimeout)
grpcClient, err = grpc.DialContext(nctx, addr, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
utils.LavaFormatError("Could not connect to the client, retrying", err, nil)
Expand Down
1 change: 1 addition & 0 deletions relayer/chainproxy/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("jsonrpc http", true, "POST", c.Request().URI().String(), string(c.Body()), errMasking, msgSeed, err)
c.Status(fiber.StatusInternalServerError)
return c.SendString(fmt.Sprintf(`{"error": {"code":-32000,"message":"%s"}}`, errMasking))
}
cp.portalLogs.LogRequestAndResponse("jsonrpc http", false, "POST", c.Request().URI().String(), string(c.Body()), string(reply.Data), msgSeed, nil)
Expand Down
2 changes: 2 additions & 0 deletions relayer/chainproxy/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (cp *RestChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("http in/out", true, http.MethodPost, path, requestBody, errMasking, msgSeed, err)
c.Status(fiber.StatusInternalServerError)
return c.SendString(fmt.Sprintf(`{"error": "unsupported api","more_information:" %s}`, errMasking))
}
responseBody := string(reply.Data)
Expand All @@ -274,6 +275,7 @@ func (cp *RestChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("http in/out", true, http.MethodGet, path, "", errMasking, msgSeed, err)
c.Status(fiber.StatusInternalServerError)
return c.SendString(fmt.Sprintf(`{"error": "unsupported api","more_information": %s}`, errMasking))
}
responseBody := string(reply.Data)
Expand Down
2 changes: 2 additions & 0 deletions relayer/chainproxy/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("tendermint http in/out", true, "POST", c.Request().URI().String(), string(c.Body()), errMasking, msgSeed, err)
c.Status(fiber.StatusInternalServerError)
return c.SendString(fmt.Sprintf(`{"error": "unsupported api","more_information": %s}`, errMasking))
}
cp.portalLogs.LogRequestAndResponse("tendermint http in/out", false, "POST", c.Request().URI().String(), string(c.Body()), string(reply.Data), msgSeed, nil)
Expand All @@ -352,6 +353,7 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("tendermint http in/out", true, "GET", c.Request().URI().String(), "", errMasking, msgSeed, err)
c.Status(fiber.StatusInternalServerError)
if string(c.Body()) != "" {
return c.SendString(fmt.Sprintf(`{"error": "unsupported api", "recommendation": "For jsonRPC use POST", "more_information": "%s"}`, errMasking))
}
Expand Down
4 changes: 2 additions & 2 deletions relayer/lavasession/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

const (
MaxConsecutiveConnectionAttempts = 3
TimeoutForEstablishingAConnection = 300 * time.Millisecond
MaxConsecutiveConnectionAttempts = 10
TimeoutForEstablishingAConnection = 1 * time.Second
MaxSessionsAllowedPerProvider = 1000 // Max number of sessions allowed per provider
MaxAllowedBlockListedSessionPerProvider = 3
MaximumNumberOfFailuresAllowedPerConsumerSession = 3
Expand Down
6 changes: 4 additions & 2 deletions relayer/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func (csm *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSe
utils.LavaFormatFatal("Unsupported Error", err, nil)
}
} else if !connected {
// If failed to connect we try again getting a random provider to pick from
// If failed to connect we ignore this provider for this get session request only
// and try again getting a random provider to pick from
tempIgnoredProviders.providers[providerAddress] = struct{}{}
continue
}

Expand Down Expand Up @@ -203,7 +205,7 @@ func (csm *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredP
currentEpoch = csm.atomicReadCurrentEpoch() // reading the epoch here while locked, to get the epoch of the pairing.
if ignoredProviders.currentEpoch < currentEpoch {
utils.LavaFormatDebug("ignoredProviders epoch is not the current epoch, resetting ignoredProviders", &map[string]string{"ignoredProvidersEpoch": strconv.FormatUint(ignoredProviders.currentEpoch, 10), "currentEpoch": strconv.FormatUint(currentEpoch, 10)})
ignoredProviders.providers = nil // reset the old providers as epochs changed so we have a new pairing list.
ignoredProviders.providers = make(map[string]struct{}) // reset the old providers as epochs changed so we have a new pairing list.
ignoredProviders.currentEpoch = currentEpoch
}

Expand Down
8 changes: 3 additions & 5 deletions relayer/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er
return nil
}

func (cswp *ConsumerSessionsWithProvider) connectRawClient(ctx context.Context, addr string) (*pairingtypes.RelayerClient, error) {
connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
func (cswp *ConsumerSessionsWithProvider) connectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, error) {
connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection)
defer cancel()

conn, err := grpc.DialContext(connectCtx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
Expand Down Expand Up @@ -212,9 +212,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
continue
}
if endpoint.Client == nil {
connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection)
conn, err := cswp.connectRawClient(connectCtx, endpoint.Addr)
cancel()
conn, err := cswp.connectRawClientWithTimeout(ctx, endpoint.Addr)
if err != nil {
endpoint.ConnectionRefusals++
utils.LavaFormatError("error connecting to provider", err, &map[string]string{"provider endpoint": endpoint.Addr, "provider address": cswp.Acc, "endpoint": fmt.Sprintf("%+v", endpoint)})
Expand Down

0 comments on commit da7d764

Please sign in to comment.