Skip to content

Commit

Permalink
[KS-602] Fix for remote exectable client not respecting context (#15721)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk authored Dec 17, 2024
1 parent 631cd8f commit 30e3a16
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
25 changes: 20 additions & 5 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ var _ services.Service = &client{}

const expiryCheckInterval = 30 * time.Second

var (
ErrRequestExpired = errors.New("request expired by executable client")
ErrContextDoneBeforeResponseQuorum = errors.New("context done before remote client received a quorum of responses")
)

func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
Expand Down Expand Up @@ -122,7 +127,7 @@ func (c *client) expireRequests() {

for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired by executable client"))
req.Cancel(ErrRequestExpired)
delete(c.requestIDToCallerRequest, messageID)
}

Expand Down Expand Up @@ -164,12 +169,22 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to send request: %w", err)
}

resp := <-req.ResponseChan()
if resp.Err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("error executing request: %w", resp.Err)
var respResult []byte
var respErr error
select {
case resp := <-req.ResponseChan():
respResult = resp.Result
respErr = resp.Err
case <-ctx.Done():
// NOTE: ClientRequest will not block on sending to ResponseChan() because that channel is buffered (with size 1)
return commoncap.CapabilityResponse{}, errors.Join(ErrContextDoneBeforeResponseQuorum, ctx.Err())
}

if respErr != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("error executing request: %w", respErr)
}

capabilityResponse, err := pb.UnmarshalCapabilityResponse(resp.Result)
capabilityResponse, err := pb.UnmarshalCapabilityResponse(respResult)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal capability response: %w", err)
}
Expand Down
28 changes: 27 additions & 1 deletion core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
assert.Error(t, responseError)
require.Error(t, responseError)
require.ErrorIs(t, responseError, executable.ErrRequestExpired)
}

capability := &TestCapability{}
Expand All @@ -169,6 +170,31 @@ func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) {
})
}

func Test_Client_ContextCanceledBeforeQuorumReached(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
require.Error(t, responseError)
require.ErrorIs(t, responseError, executable.ErrContextDoneBeforeResponseQuorum)
}

capability := &TestCapability{}
transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "20s",
})
require.NoError(t, err)

cancel()
testClient(t, 2, 20*time.Second, 2, 2,
capability,
func(caller commoncap.ExecutableCapability) {
executeInputs, err := values.NewMap(map[string]any{"executeValue1": "aValue1"})
require.NoError(t, err)
executeMethod(ctx, caller, transmissionSchedule, executeInputs, responseTest, t)
})
}

func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout time.Duration,
numCapabilityPeers int, capabilityDonF uint8, underlying commoncap.ExecutableCapability,
method func(caller commoncap.ExecutableCapability)) {
Expand Down

0 comments on commit 30e3a16

Please sign in to comment.