diff --git a/go/tasks/pluginmachinery/sensors/plugin.go b/go/tasks/pluginmachinery/sensors/plugin.go new file mode 100644 index 000000000..2b38ab1a2 --- /dev/null +++ b/go/tasks/pluginmachinery/sensors/plugin.go @@ -0,0 +1,88 @@ +// This provides a simplistic API to implement scalable backend Sensor Plugins +// Sensors are defined as tasks that wait for some event to happen, before marking them-selves are ready. +// Sensor Implementations should be consistent in behavior, such that they are either in ``WaitState`` or proceed to +// ``ReadyState``. Wait Indicates the the condition is not met. ReadyState implies that the condition is met. +// Once ReadyState is achieved, subsequent calls should result in ``ReadyState`` only. This can happen because recovery +// from failures may need successive retries. +package sensors + +import ( + "context" + pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +) + +//go:generate mockery -all -case=underscore +//go:generate enumer -type=Phase + +type Phase int8 + +const ( + // Indicates that the Sensor condition is not yet met. So it is still waiting for the expected condition to be + // fulfilled + PhaseWait Phase = iota + // Indicates the Sensor condition has been met and the Sensor can be marked as completed + PhaseReady + // Indicates that the Sensor has encountered a permanent failure and should be marked as failed. Sensors + // usually do not have retries, so this will permanent mark the task as failed + PhaseFailure +) + +type PhaseInfo struct { + Phase Phase + Reason string +} + +type Properties struct { + // Maximum Desirable Rate (number of requests per second) that the downstream system can handle + MaxRate float64 + // Maximum Burst rate allowed for this Plugin + BurstRate int +} + +// PluginEntry is a structure that is used to indicate to the system a K8s plugin +type PluginEntry struct { + // ID/Name of the plugin. This will be used to identify this plugin and has to be unique in the entire system + // All functions like enabling and disabling a plugin use this ID + ID pluginsCore.TaskType + // A list of all the task types for which this plugin is applicable. + RegisteredTaskTypes []pluginsCore.TaskType + // Instance of the Sensor Plugin + Plugin Plugin + // Properties desirable for this Sensor Plugin + Properties Properties +} + +// Simplified interface for Sensor Plugins. This context is passed for every Poke invoked +type PluginContext interface { + // Returns a secret manager that can retrieve configured secrets for this plugin + SecretManager() pluginsCore.SecretManager + + // Returns a TaskReader, to retrieve task details + TaskReader() pluginsCore.TaskReader + + // Returns an input reader to retrieve input data + InputReader() io.InputReader + + // Returns a handle to the Task's execution metadata. + TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata +} + +// Simplified interface for Sensor Plugins initialization +type PluginSetupContext interface { + // Returns a secret manager that can retrieve configured secrets for this plugin + SecretManager() pluginsCore.SecretManager +} + +type Plugin interface { + // This is called only once, when the Plugin is being initialized. This can be used to initialize remote clients + // and other such things. Make sure this is not doing extremely expensive operations + // Error in this case will halt the loading of the module and may result in ejection of the plugin + Initialize(ctx context.Context, iCtx PluginSetupContext) error + + // The function will be periodically invoked. It should return a Phase or an error + // Phase indicates a condition in the sensor. For any system error's the ``error`` should be returned. + // System errors will automatically cause system retries and most importantly indicate the reason for failure + // this is a blocking call and should not be used to do very expensive operations. + Poke(ctx context.Context, pluginCtx PluginContext) (PhaseInfo, error) +} diff --git a/go/tasks/pluginmachinery/sensors/sensor_manager.go b/go/tasks/pluginmachinery/sensors/sensor_manager.go new file mode 100644 index 000000000..39b10bca1 --- /dev/null +++ b/go/tasks/pluginmachinery/sensors/sensor_manager.go @@ -0,0 +1,67 @@ +package sensors + +import ( + "context" + pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flytestdlib/utils" +) + +// Sensor Manager implements the core.Plugin interface and provides the state management for building Sensors using a +// simplified interface +type SensorManager struct { + ID pluginsCore.TaskType + plugin Plugin + rateLimiter utils.RateLimiter +} + +func (s SensorManager) GetID() string { + return s.ID +} + +func (s SensorManager) GetProperties() pluginsCore.PluginProperties { + return pluginsCore.PluginProperties{} +} + +// Handle provides the necessary boilerplate to create simple Sensor Plugins +func (s SensorManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { + if err := s.rateLimiter.Wait(ctx); err != nil { + logger.Errorf(ctx, "Failed to wait on rateLimiter for sensor %s, error: %s", s.ID, err) + return pluginsCore.Transition{}, err + } + p, err := s.plugin.Poke(ctx, tCtx) + if err != nil { + logger.Errorf(ctx, "Received error from sensor %s, err: %s", s.ID, err) + return pluginsCore.Transition{}, err + } + if p.Phase == PhaseFailure { + return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("SensorFailed", p.Reason, nil)), nil + } + if p.Phase == PhaseReady { + return pluginsCore.DoTransition(pluginsCore.PhaseInfoSuccess(nil)), nil + } + return pluginsCore.DoTransition(pluginsCore.PhaseInfoRunning(0, nil)), nil +} + +func (s SensorManager) Abort(_ context.Context, _ pluginsCore.TaskExecutionContext) error { + return nil +} + +func (s SensorManager) Finalize(_ context.Context, _ pluginsCore.TaskExecutionContext) error { + return nil +} + +// Creates a new SensorManager for the given PluginEntry. The Plugin.Initialize method is also invoked during this +// construction +func NewSensorManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry PluginEntry) (*SensorManager, error) { + if err := entry.Plugin.Initialize(ctx, iCtx); err != nil { + logger.Errorf(ctx, "Failed to initialize plugin %s, err: %s", entry.ID, err) + return nil, err + } + name := entry.ID + return &SensorManager{ + ID: entry.ID, + rateLimiter: utils.NewRateLimiter(name, entry.Properties.MaxRate, entry.Properties.BurstRate), + plugin: entry.Plugin, + }, nil +}