Skip to content

Commit

Permalink
feat(workflows/handler): adds all event handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 25, 2024
1 parent 23b9787 commit 5212eb9
Show file tree
Hide file tree
Showing 3 changed files with 428 additions and 12 deletions.
191 changes: 179 additions & 12 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,82 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
h.lggr.Debugf("workflow 0x%x registered and started", wfID)
return nil
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:])
cma := h.emitter.With(
platform.KeyWorkflowID, newWorkflowID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowUpdatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowPausedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
return err
}
return nil
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)
if err := h.workflowActivatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowDeletedEvent:
payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowDeletedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr)
return err
}

return nil
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
Expand Down Expand Up @@ -286,26 +357,122 @@ func (h *eventHandler) workflowRegisteredEvent(

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowUpdatedV1,
) error {
return ErrNotImplemented
oldWorkflowID := hex.EncodeToString(payload.OldWorkflowID[:])

// stop the old workflow engine
e, err := h.engineRegistry.Pop(oldWorkflowID)
if err != nil {
return fmt.Errorf("failed to get old workflow engine: %w", err)
}

if err := e.Close(); err != nil {
return fmt.Errorf("failed to close old workflow engine: %w", err)
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.NewWorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
SecretsURL: payload.SecretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowPausedV1,
) error {
return ErrNotImplemented
wfID := hex.EncodeToString(payload.WorkflowID[:])

// Pop the workflow engine and close it
e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}
err = e.Close()
if err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}

// get existing workflow spec from DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// update the status of the workflow spec
spec.Status = job.WorkflowSpecStatusPaused
if _, err := h.orm.UpsertWorkflowSpec(ctx, spec); err != nil {
return fmt.Errorf("failed to update workflow spec: %w", err)
}

return nil
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowActivatedV1,
) error {
// fetch the workflow spec from the DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// Do nothing if the workflow is already active
if spec.Status == job.WorkflowSpecStatusActive {
return nil
}

// get the secrets url by the secrets id
secretsURL, err := h.orm.GetSecretsURLByID(ctx, spec.SecretsID.Int64)
if err != nil {
return fmt.Errorf("failed to get secrets URL by ID: %w", err)
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.WorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: spec.BinaryURL,
ConfigURL: spec.ConfigURL,
SecretsURL: secretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowDeletedEvent handles the WorkflowDeletedEvent event type.
func (h *eventHandler) workflowDeletedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowDeletedV1,
) error {
return ErrNotImplemented
wfID := hex.EncodeToString(payload.WorkflowID[:])

e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}
if err := e.Close(); err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}

if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil {
return fmt.Errorf("failed to delete workflow spec: %w", err)
}
return nil
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
Expand Down
Loading

0 comments on commit 5212eb9

Please sign in to comment.