Skip to content

Commit

Permalink
remove register/unregister
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed Dec 20, 2024
1 parent 1ce7214 commit 5b0425a
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 550 deletions.
58 changes: 0 additions & 58 deletions core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
Expand All @@ -31,7 +30,6 @@ const (
)

func Test_Client_DonTopologies(t *testing.T) {
tests.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363")
ctx := testutils.Context(t)

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -59,18 +57,6 @@ func Test_Client_DonTopologies(t *testing.T) {
executeMethod(ctx, caller, transmissionSchedule, executeInputs, responseTest, t)
})

methods = append(methods, func(caller commoncap.ExecutableCapability) {
registerToWorkflowMethod(ctx, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.NoError(t, responseError)
}, t)
})

methods = append(methods, func(caller commoncap.ExecutableCapability) {
unregisterFromWorkflowMethod(ctx, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.NoError(t, responseError)
}, t)
})

for _, method := range methods {
testClient(t, 1, responseTimeOut, 1, 0,
capability, method)
Expand All @@ -90,7 +76,6 @@ func Test_Client_DonTopologies(t *testing.T) {
}

func Test_Client_TransmissionSchedules(t *testing.T) {
tests.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363")
ctx := testutils.Context(t)

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
Expand Down Expand Up @@ -264,34 +249,6 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout
wg.Wait()
}

func registerToWorkflowMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseError error), t *testing.T) {
err := caller.RegisterToWorkflow(ctx, commoncap.RegisterToWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}

func unregisterFromWorkflowMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseError error), t *testing.T) {
err := caller.UnregisterFromWorkflow(ctx, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}

func executeMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map,
executeInputs *values.Map, responseTest func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error), t *testing.T) {
responseCh, err := caller.Execute(ctx,
Expand Down Expand Up @@ -362,21 +319,6 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo
resp, responseErr := t.executableCapability.Execute(context.Background(), capabilityRequest)
payload, marshalErr := pb.MarshalCapabilityResponse(resp)
t.sendResponse(messageID, responseErr, payload, marshalErr)

case remotetypes.MethodRegisterToWorkflow:
registerRequest, err := pb.UnmarshalRegisterToWorkflowRequest(msg.Payload)
if err != nil {
panic(err)
}
responseErr := t.executableCapability.RegisterToWorkflow(context.Background(), registerRequest)
t.sendResponse(messageID, responseErr, nil, nil)
case remotetypes.MethodUnregisterFromWorkflow:
unregisterRequest, err := pb.UnmarshalUnregisterFromWorkflowRequest(msg.Payload)
if err != nil {
panic(err)
}
responseErr := t.executableCapability.UnregisterFromWorkflow(context.Background(), unregisterRequest)
t.sendResponse(messageID, responseErr, nil, nil)
default:
panic("unknown method")
}
Expand Down
48 changes: 0 additions & 48 deletions core/capabilities/remote/executable/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,54 +48,6 @@ type ClientRequest struct {
wg *sync.WaitGroup
}

func NewClientRegisterToWorkflowRequest(ctx context.Context, lggr logger.Logger, req commoncap.RegisterToWorkflowRequest,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) (*ClientRequest, error) {
rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.RegisterToWorkflowRequestToProto(req))
if err != nil {
return nil, fmt.Errorf("failed to marshal register to workflow request: %w", err)
}

workflowID := req.Metadata.WorkflowID
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID is invalid: %w", err)
}

requestID := types.MethodRegisterToWorkflow + ":" + workflowID

tc := transmission.TransmissionConfig{
Schedule: transmission.Schedule_AllAtOnce,
DeltaStage: 0,
}

return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout,
tc, types.MethodRegisterToWorkflow, rawRequest)
}

func NewClientUnregisterFromWorkflowRequest(ctx context.Context, lggr logger.Logger, req commoncap.UnregisterFromWorkflowRequest,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) (*ClientRequest, error) {
rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.UnregisterFromWorkflowRequestToProto(req))
if err != nil {
return nil, fmt.Errorf("failed to marshal unregister from workflow request: %w", err)
}

workflowID := req.Metadata.WorkflowID
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID is invalid: %w", err)
}

requestID := types.MethodUnregisterFromWorkflow + ":" + workflowID

tc := transmission.TransmissionConfig{
Schedule: transmission.Schedule_AllAtOnce,
DeltaStage: 0,
}

return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout,
tc, types.MethodUnregisterFromWorkflow, rawRequest)
}

func NewClientExecuteRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) (*ClientRequest, error) {
Expand Down
163 changes: 0 additions & 163 deletions core/capabilities/remote/executable/request/client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
Config: transmissionSchedule,
}

registerToWorkflowRequest := commoncap.RegisterToWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
WorkflowOwner: "0xaa",
ReferenceID: "refID",
},
Config: transmissionSchedule,
}

m, err := values.NewMap(map[string]any{"response": "response1"})
require.NoError(t, err)
capabilityResponse := commoncap.CapabilityResponse{
Expand Down Expand Up @@ -317,160 +308,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

assert.Equal(t, resp, values.NewString("response1"))
})

t.Run("Register To Workflow Request", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientRegisterToWorkflowRequest(ctx, lggr, registerToWorkflowRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodRegisterToWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
require.NoError(t, response.Err)
})

t.Run("Register To Workflow Request with error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientRegisterToWorkflowRequest(ctx, lggr, registerToWorkflowRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodRegisterToWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
Error: types.Error_INTERNAL_ERROR,
ErrorMsg: "an error",
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
assert.Equal(t, "an error", response.Err.Error())
})

t.Run("Unregister From Workflow Request", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientUnregisterFromWorkflowRequest(ctx, lggr, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
},
}, capInfo, workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodUnregisterFromWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
require.NoError(t, response.Err)
})

t.Run("Unregister From Workflow Request with error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientUnregisterFromWorkflowRequest(ctx, lggr, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
},
}, capInfo, workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodUnregisterFromWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
Error: types.Error_INTERNAL_ERROR,
ErrorMsg: "an error",
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
assert.Equal(t, "an error", response.Err.Error())
})
}

type clientRequestTestDispatcher struct {
Expand Down
34 changes: 0 additions & 34 deletions core/capabilities/remote/executable/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ func (e *ServerRequest) OnMessage(ctx context.Context, msg *types.MessageBody) e
switch e.method {
case types.MethodExecute:
e.executeRequest(ctx, msg.Payload, executeCapabilityRequest)
case types.MethodRegisterToWorkflow:
e.executeRequest(ctx, msg.Payload, registerToWorkflow)
case types.MethodUnregisterFromWorkflow:
e.executeRequest(ctx, msg.Payload, unregisterFromWorkflow)
default:
e.setError(types.Error_INTERNAL_ERROR, "unknown method %s"+e.method)
}
Expand Down Expand Up @@ -252,33 +248,3 @@ func executeCapabilityRequest(ctx context.Context, lggr logger.Logger, capabilit
lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID)
return responsePayload, nil
}

func registerToWorkflow(ctx context.Context, _ logger.Logger, capability capabilities.ExecutableCapability,
payload []byte) ([]byte, error) {
registerRequest, err := pb.UnmarshalRegisterToWorkflowRequest(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal register to workflow request: %w", err)
}

err = capability.RegisterToWorkflow(ctx, registerRequest)
if err != nil {
return nil, fmt.Errorf("failed to register to workflow: %w", err)
}

return nil, nil
}

func unregisterFromWorkflow(ctx context.Context, _ logger.Logger, capability capabilities.ExecutableCapability,
payload []byte) ([]byte, error) {
unregisterRequest, err := pb.UnmarshalUnregisterFromWorkflowRequest(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal unregister from workflow request: %w", err)
}

err = capability.UnregisterFromWorkflow(ctx, unregisterRequest)
if err != nil {
return nil, fmt.Errorf("failed to unregister from workflow: %w", err)
}

return nil, nil
}
Loading

0 comments on commit 5b0425a

Please sign in to comment.