Skip to content

Commit

Permalink
Revert "health: Create API to wait for a stream to become active (#180)…
Browse files Browse the repository at this point in the history
…" (#181)

This reverts commit 5ab219c.
  • Loading branch information
victorges authored Feb 1, 2024
1 parent 5ab219c commit da6403e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 97 deletions.
27 changes: 3 additions & 24 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,14 @@ func (h *apiHandler) streamHealthHandler() chi.Router {
if opts.AuthURL != "" {
router.Use(authorization(opts.AuthURL))
}

regionalMiddlewares := []middleware{
router.Use(
streamStatus(healthcore),
regionProxy(opts.RegionalHostFormat, opts.OwnRegion),
}
regionProxy(opts.RegionalHostFormat, opts.OwnRegion))

h.withMetrics(router, "get_stream_health").
With(regionalMiddlewares...).
MethodFunc("GET", "/health", h.getStreamHealth)
h.withMetrics(router, "stream_health_events").
With(regionalMiddlewares...).
MethodFunc("GET", "/events", h.subscribeEvents)
h.withMetrics(router, "wait_stream_active").
MethodFunc("GET", "/wait-active", h.waitStreamActive)

return router
}
Expand Down Expand Up @@ -563,22 +558,6 @@ func (h *apiHandler) subscribeEvents(rw http.ResponseWriter, r *http.Request) {
}
}

func (h *apiHandler) waitStreamActive(rw http.ResponseWriter, r *http.Request) {
if h.core == nil {
respondError(rw, http.StatusNotImplemented, errors.New("stream healthcore is unavailable"))
return
}

streamID := apiParam(r, streamIDParam)
err := h.core.WaitActive(r.Context(), streamID)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

rw.WriteHeader(http.StatusNoContent)
}

func makeSSEEventChan(ctx context.Context, pastEvents []data.Event, subscription <-chan data.Event) <-chan jsse.Event {
if subscription == nil {
events := make(chan jsse.Event, len(pastEvents))
Expand Down
24 changes: 0 additions & 24 deletions health/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,6 @@ func (c *Core) handleSingleEvent(evt data.Event) (err error) {
glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts)
}
}

for _, cond := range record.LastStatus.Conditions {
if cond.Type != ConditionActive {
continue
}
// We flag the record as initialized unless, from the received events,
// we know for sure that the stream is inactive.
isInactive := cond.Status != nil && !*cond.Status
if !isInactive {
record.FlagInitialized()
}
break
}

return nil
}

Expand Down Expand Up @@ -261,16 +247,6 @@ func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID
return pastEvents, subs, nil
}

func (c *Core) WaitActive(ctx context.Context, manifestID string) error {
// We actually create the record here if it doesn't exist, so that we can
// wait for it to be initialized.
record := c.storage.GetOrCreate(manifestID, c.conditionTypes)
if err := record.WaitInitialized(ctx); err != nil {
return err
}
return nil
}

func getPastEventsLocked(record *Record, lastEvtID *uuid.UUID, from, to *time.Time) ([]data.Event, error) {
fromIdx, toIdx := 0, len(record.PastEvents)
if lastEvtID != nil {
Expand Down
50 changes: 7 additions & 43 deletions health/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ type Record struct {
Conditions []data.ConditionType

sync.RWMutex
initialized chan struct{}
disposed chan struct{}
disposed chan struct{}

PastEvents []data.Event
EventsByID map[uuid.UUID]data.Event
Expand All @@ -33,43 +32,11 @@ func NewRecord(id string, conditionTypes []data.ConditionType) *Record {
conditions[i] = data.NewCondition(cond, time.Time{}, nil, nil)
}
return &Record{
ID: id,
Conditions: conditionTypes,
initialized: make(chan struct{}),
disposed: make(chan struct{}),
EventsByID: map[uuid.UUID]data.Event{},
LastStatus: data.NewHealthStatus(id, conditions),
}
}

// FlagInitialized will flag the record as initialized. It is meant to be called
// after the first event is processed, meaning the record is not empty anymore.
//
// This is used to allow waiting until a stream is started by creating its
// record in an uninitialized state first and calling `WaitInitialized`. The
// initialization flag is simply a channel that is closed, which will unblock
// all goroutines waiting to receive from it (`WaitInitialized`).
func (r *Record) FlagInitialized() {
if !r.IsInitialized() {
close(r.initialized)
}
}

func (r *Record) IsInitialized() bool {
select {
case <-r.initialized:
return true
default:
return false
}
}

func (r *Record) WaitInitialized(ctx context.Context) error {
select {
case <-r.initialized:
return nil
case <-ctx.Done():
return ctx.Err()
ID: id,
Conditions: conditionTypes,
disposed: make(chan struct{}),
EventsByID: map[uuid.UUID]data.Event{},
LastStatus: data.NewHealthStatus(id, conditions),
}
}

Expand Down Expand Up @@ -135,10 +102,7 @@ func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)

func (s *RecordStorage) Get(id string) (*Record, bool) {
if saved, ok := s.records.Load(id); ok {
// Until Initialize is called, the record is considered inexistent
if record := saved.(*Record); record.IsInitialized() {
return record, true
}
return saved.(*Record), true
}
return nil, false
}
Expand Down
2 changes: 0 additions & 2 deletions health/reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"github.com/livepeer/livepeer-data/pkg/event"
)

const ConditionActive data.ConditionType = "Active"

type Reducer interface {
Bindings() []event.BindingArgs
Conditions() []data.ConditionType
Expand Down
9 changes: 5 additions & 4 deletions health/reducers/stream_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/livepeer-data/health"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/event"
)

const (
streamStateBindingKey = "stream.state.#"

ConditionActive data.ConditionType = "Active"
)

type ActiveConditionExtraData struct {
Expand All @@ -27,7 +28,7 @@ func (t StreamStateReducer) Bindings() []event.BindingArgs {
}

func (t StreamStateReducer) Conditions() []data.ConditionType {
return []data.ConditionType{health.ConditionActive}
return []data.ConditionType{ConditionActive}
}

func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, evtIface data.Event) (*data.HealthStatus, interface{}) {
Expand All @@ -54,7 +55,7 @@ func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, ev
current = data.NewHealthStatus(current.ID, conditions)
}
for i, cond := range conditions {
if cond.Type == health.ConditionActive {
if cond.Type == ConditionActive {
newCond := data.NewCondition(cond.Type, evt.Timestamp(), &isActive, cond)
newCond.ExtraData = ActiveConditionExtraData{NodeID: evt.NodeID, Region: evt.Region}
conditions[i] = newCond
Expand All @@ -74,7 +75,7 @@ func clearConditions(conditions []*data.Condition) []*data.Condition {
}

func GetLastActiveData(status *data.HealthStatus) ActiveConditionExtraData {
data, ok := status.Condition(health.ConditionActive).ExtraData.(ActiveConditionExtraData)
data, ok := status.Condition(ConditionActive).ExtraData.(ActiveConditionExtraData)
if !ok {
return ActiveConditionExtraData{}
}
Expand Down

0 comments on commit da6403e

Please sign in to comment.