Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflows/handler): adds all event handlers #15400

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 182 additions & 13 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,82 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
h.lggr.Debugf("workflow 0x%x registered and started", wfID)
return nil
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:])
cma := h.emitter.With(
platform.KeyWorkflowID, newWorkflowID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowUpdatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowPausedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
return err
}
return nil
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)
if err := h.workflowActivatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowDeletedEvent:
payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowDeletedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr)
return err
}

return nil
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
Expand Down Expand Up @@ -284,28 +355,126 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type by first finding the
// current workflow engine, stopping it, and then starting a new workflow engine with the
// updated workflow spec.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowUpdatedV1,
) error {
return ErrNotImplemented
oldWorkflowID := hex.EncodeToString(payload.OldWorkflowID[:])

// stop the old workflow engine
e, err := h.engineRegistry.Pop(oldWorkflowID)
MStreet3 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to get old workflow engine: %w", err)
}

if err := e.Close(); err != nil {
return fmt.Errorf("failed to close old workflow engine: %w", err)
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.NewWorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
SecretsURL: payload.SecretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowPausedV1,
) error {
return ErrNotImplemented
wfID := hex.EncodeToString(payload.WorkflowID[:])

// Pop the workflow engine and close it
e, err := h.engineRegistry.Pop(wfID)
MStreet3 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}
err = e.Close()
if err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}

// get existing workflow spec from DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// update the status of the workflow spec
spec.Status = job.WorkflowSpecStatusPaused
if _, err := h.orm.UpsertWorkflowSpec(ctx, spec); err != nil {
return fmt.Errorf("failed to update workflow spec: %w", err)
}

return nil
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowActivatedV1,
) error {
// fetch the workflow spec from the DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// Do nothing if the workflow is already active
if spec.Status == job.WorkflowSpecStatusActive {
return nil
}

// get the secrets url by the secrets id
secretsURL, err := h.orm.GetSecretsURLByID(ctx, spec.SecretsID.Int64)
if err != nil {
return fmt.Errorf("failed to get secrets URL by ID: %w", err)
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.WorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: spec.BinaryURL,
ConfigURL: spec.ConfigURL,
SecretsURL: secretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowDeletedEvent handles the WorkflowDeletedEvent event type.
func (h *eventHandler) workflowDeletedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowDeletedV1,
) error {
return ErrNotImplemented
wfID := hex.EncodeToString(payload.WorkflowID[:])

e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}
if err := e.Close(); err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}

if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil {
return fmt.Errorf("failed to delete workflow spec: %w", err)
}
return nil
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
Expand Down
Loading
Loading