From da6403e4e266ecbe2cf6102d96f90a914f07d30b Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 31 Jan 2024 21:16:08 -0300 Subject: [PATCH] Revert "health: Create API to wait for a stream to become active (#180)" (#181) This reverts commit 5ab219c35912e7708526a8735be41ab31b44794f. --- api/handler.go | 27 ++---------------- health/core.go | 24 ---------------- health/record.go | 50 +++++---------------------------- health/reducer.go | 2 -- health/reducers/stream_state.go | 9 +++--- 5 files changed, 15 insertions(+), 97 deletions(-) diff --git a/api/handler.go b/api/handler.go index 4f5d1b77..318ed3c1 100644 --- a/api/handler.go +++ b/api/handler.go @@ -98,19 +98,14 @@ func (h *apiHandler) streamHealthHandler() chi.Router { if opts.AuthURL != "" { router.Use(authorization(opts.AuthURL)) } - - regionalMiddlewares := []middleware{ + router.Use( streamStatus(healthcore), - regionProxy(opts.RegionalHostFormat, opts.OwnRegion), - } + regionProxy(opts.RegionalHostFormat, opts.OwnRegion)) + h.withMetrics(router, "get_stream_health"). - With(regionalMiddlewares...). MethodFunc("GET", "/health", h.getStreamHealth) h.withMetrics(router, "stream_health_events"). - With(regionalMiddlewares...). MethodFunc("GET", "/events", h.subscribeEvents) - h.withMetrics(router, "wait_stream_active"). - MethodFunc("GET", "/wait-active", h.waitStreamActive) return router } @@ -563,22 +558,6 @@ func (h *apiHandler) subscribeEvents(rw http.ResponseWriter, r *http.Request) { } } -func (h *apiHandler) waitStreamActive(rw http.ResponseWriter, r *http.Request) { - if h.core == nil { - respondError(rw, http.StatusNotImplemented, errors.New("stream healthcore is unavailable")) - return - } - - streamID := apiParam(r, streamIDParam) - err := h.core.WaitActive(r.Context(), streamID) - if err != nil { - respondError(rw, http.StatusInternalServerError, err) - return - } - - rw.WriteHeader(http.StatusNoContent) -} - func makeSSEEventChan(ctx context.Context, pastEvents []data.Event, subscription <-chan data.Event) <-chan jsse.Event { if subscription == nil { events := make(chan jsse.Event, len(pastEvents)) diff --git a/health/core.go b/health/core.go index 422aac74..80b02056 100644 --- a/health/core.go +++ b/health/core.go @@ -191,20 +191,6 @@ func (c *Core) handleSingleEvent(evt data.Event) (err error) { glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts) } } - - for _, cond := range record.LastStatus.Conditions { - if cond.Type != ConditionActive { - continue - } - // We flag the record as initialized unless, from the received events, - // we know for sure that the stream is inactive. - isInactive := cond.Status != nil && !*cond.Status - if !isInactive { - record.FlagInitialized() - } - break - } - return nil } @@ -261,16 +247,6 @@ func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID return pastEvents, subs, nil } -func (c *Core) WaitActive(ctx context.Context, manifestID string) error { - // We actually create the record here if it doesn't exist, so that we can - // wait for it to be initialized. - record := c.storage.GetOrCreate(manifestID, c.conditionTypes) - if err := record.WaitInitialized(ctx); err != nil { - return err - } - return nil -} - func getPastEventsLocked(record *Record, lastEvtID *uuid.UUID, from, to *time.Time) ([]data.Event, error) { fromIdx, toIdx := 0, len(record.PastEvents) if lastEvtID != nil { diff --git a/health/record.go b/health/record.go index 4e09ab48..58b6624e 100644 --- a/health/record.go +++ b/health/record.go @@ -16,8 +16,7 @@ type Record struct { Conditions []data.ConditionType sync.RWMutex - initialized chan struct{} - disposed chan struct{} + disposed chan struct{} PastEvents []data.Event EventsByID map[uuid.UUID]data.Event @@ -33,43 +32,11 @@ func NewRecord(id string, conditionTypes []data.ConditionType) *Record { conditions[i] = data.NewCondition(cond, time.Time{}, nil, nil) } return &Record{ - ID: id, - Conditions: conditionTypes, - initialized: make(chan struct{}), - disposed: make(chan struct{}), - EventsByID: map[uuid.UUID]data.Event{}, - LastStatus: data.NewHealthStatus(id, conditions), - } -} - -// FlagInitialized will flag the record as initialized. It is meant to be called -// after the first event is processed, meaning the record is not empty anymore. -// -// This is used to allow waiting until a stream is started by creating its -// record in an uninitialized state first and calling `WaitInitialized`. The -// initialization flag is simply a channel that is closed, which will unblock -// all goroutines waiting to receive from it (`WaitInitialized`). -func (r *Record) FlagInitialized() { - if !r.IsInitialized() { - close(r.initialized) - } -} - -func (r *Record) IsInitialized() bool { - select { - case <-r.initialized: - return true - default: - return false - } -} - -func (r *Record) WaitInitialized(ctx context.Context) error { - select { - case <-r.initialized: - return nil - case <-ctx.Done(): - return ctx.Err() + ID: id, + Conditions: conditionTypes, + disposed: make(chan struct{}), + EventsByID: map[uuid.UUID]data.Event{}, + LastStatus: data.NewHealthStatus(id, conditions), } } @@ -135,10 +102,7 @@ func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration) func (s *RecordStorage) Get(id string) (*Record, bool) { if saved, ok := s.records.Load(id); ok { - // Until Initialize is called, the record is considered inexistent - if record := saved.(*Record); record.IsInitialized() { - return record, true - } + return saved.(*Record), true } return nil, false } diff --git a/health/reducer.go b/health/reducer.go index 4ced2df5..62d9f302 100644 --- a/health/reducer.go +++ b/health/reducer.go @@ -5,8 +5,6 @@ import ( "github.com/livepeer/livepeer-data/pkg/event" ) -const ConditionActive data.ConditionType = "Active" - type Reducer interface { Bindings() []event.BindingArgs Conditions() []data.ConditionType diff --git a/health/reducers/stream_state.go b/health/reducers/stream_state.go index fa8384ed..a2878ae2 100644 --- a/health/reducers/stream_state.go +++ b/health/reducers/stream_state.go @@ -4,13 +4,14 @@ import ( "time" "github.com/golang/glog" - "github.com/livepeer/livepeer-data/health" "github.com/livepeer/livepeer-data/pkg/data" "github.com/livepeer/livepeer-data/pkg/event" ) const ( streamStateBindingKey = "stream.state.#" + + ConditionActive data.ConditionType = "Active" ) type ActiveConditionExtraData struct { @@ -27,7 +28,7 @@ func (t StreamStateReducer) Bindings() []event.BindingArgs { } func (t StreamStateReducer) Conditions() []data.ConditionType { - return []data.ConditionType{health.ConditionActive} + return []data.ConditionType{ConditionActive} } func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, evtIface data.Event) (*data.HealthStatus, interface{}) { @@ -54,7 +55,7 @@ func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, ev current = data.NewHealthStatus(current.ID, conditions) } for i, cond := range conditions { - if cond.Type == health.ConditionActive { + if cond.Type == ConditionActive { newCond := data.NewCondition(cond.Type, evt.Timestamp(), &isActive, cond) newCond.ExtraData = ActiveConditionExtraData{NodeID: evt.NodeID, Region: evt.Region} conditions[i] = newCond @@ -74,7 +75,7 @@ func clearConditions(conditions []*data.Condition) []*data.Condition { } func GetLastActiveData(status *data.HealthStatus) ActiveConditionExtraData { - data, ok := status.Condition(health.ConditionActive).ExtraData.(ActiveConditionExtraData) + data, ok := status.Condition(ConditionActive).ExtraData.(ActiveConditionExtraData) if !ok { return ActiveConditionExtraData{} }