Skip to content

Commit

Permalink
scheduler: delays closing of deleted lifo queues (#1953)
Browse files Browse the repository at this point in the history
Closing of a lifo queue on route deletion while in-flight requests try
to get a slot from it results in `Unknown error for route based LIFO: queue closed for host`
error and 500 response to the client.

To mitigate the problem this change puts deleted queues aside and closes them after one minute.
The proper fix would be to close queue when all route's in-flight requests complete, see #202.

Updates #1851

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored Feb 21, 2022
1 parent 1a1d1eb commit 196a5a5
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 77 deletions.
5 changes: 5 additions & 0 deletions scheduler/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package scheduler

var (
ExportQueueCloseDelay = &queueCloseDelay
)
148 changes: 90 additions & 58 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Queue struct {
// Options provides options for the registry.
type Options struct {

// MetricsUpdateTimeout defines the frequence of how often the LIFO metrics
// MetricsUpdateTimeout defines the frequency of how often the LIFO metrics
// are updated when they are enabled. Defaults to 1s.
MetricsUpdateTimeout time.Duration

Expand All @@ -95,11 +95,22 @@ type Options struct {
//
type Registry struct {
options Options
queues *sync.Map
measuring bool
quit chan struct{}

mu sync.Mutex
queues map[queueId]*Queue
deleted map[*Queue]time.Time
}

type queueId struct {
name string
grouped bool
}

// Amount of time to wait before closing the deleted queues
var queueCloseDelay = 1 * time.Minute

// LIFOFilter is the interface that needs to be implemented by the filters that
// use a LIFO queue maintained by the registry.
type LIFOFilter interface {
Expand Down Expand Up @@ -184,8 +195,9 @@ func RegistryWith(o Options) *Registry {

return &Registry{
options: o,
queues: new(sync.Map),
quit: make(chan struct{}),
queues: make(map[queueId]*Queue),
deleted: make(map[*Queue]time.Time),
}
}

Expand All @@ -194,6 +206,23 @@ func NewRegistry() *Registry {
return RegistryWith(Options{})
}

func (r *Registry) getQueue(id queueId, c Config) *Queue {
r.mu.Lock()
defer r.mu.Unlock()

q, ok := r.queues[id]
if ok {
if q.config != c {
q.config = c
q.reconfigure()
}
} else {
q = r.newQueue(id.name, c)
r.queues[id] = q
}
return q
}

func (r *Registry) newQueue(name string, c Config) *Queue {
q := &Queue{
config: c,
Expand Down Expand Up @@ -222,6 +251,28 @@ func (r *Registry) newQueue(name string, c Config) *Queue {
return q
}

func (r *Registry) deleteUnused(inUse map[queueId]struct{}) {
r.mu.Lock()
defer r.mu.Unlock()

now := time.Now()
closeCutoff := now.Add(-queueCloseDelay)

for q, deleted := range r.deleted {
if deleted.Before(closeCutoff) {
delete(r.deleted, q)
q.close()
}
}

for id, q := range r.queues {
if _, ok := inUse[id]; !ok {
delete(r.queues, id)
r.deleted[q] = now
}
}
}

// Returns routing.PreProcessor that ensures single lifo filter instance per route
//
// Registry can not implement routing.PreProcessor directly due to unfortunate method name clash with routing.PostProcessor
Expand Down Expand Up @@ -261,7 +312,7 @@ func (registryPreProcessor) Do(routes []*eskip.Route) []*eskip.Route {
// It preserves the existing queue when available.
func (r *Registry) Do(routes []*routing.Route) []*routing.Route {
rr := make([]*routing.Route, len(routes))
existingKeys := make(map[string]bool)
inUse := make(map[queueId]struct{})
groups := make(map[string][]GroupedLIFOFilter)

for i, ri := range routes {
Expand All @@ -280,24 +331,11 @@ func (r *Registry) Do(routes []*routing.Route) []*routing.Route {
}

lifoCount++
var q *Queue
key := fmt.Sprintf("lifo::%s", ri.Id)
existingKeys[key] = true
c := lf.Config()
qi, ok := r.queues.Load(key)
if ok {
// Will not reach here if routes were pre-processed
// because key is derived from the unique route id and
// pre-processor ensures single lifo filter instance per route
q = qi.(*Queue)
if q.config != c {
q.config = c
q.reconfigure()
}
} else {
q = r.newQueue(ri.Id, c)
r.queues.Store(key, q)
}

id := queueId{ri.Id, false}
inUse[id] = struct{}{}

q := r.getQueue(id, lf.Config())

lf.SetQueue(q)
}
Expand Down Expand Up @@ -327,34 +365,17 @@ func (r *Registry) Do(routes []*routing.Route) []*routing.Route {
foundConfig = true
}

var q *Queue
key := fmt.Sprintf("group-lifo::%s", name)
existingKeys[key] = true
qi, ok := r.queues.Load(key)
if ok {
q = qi.(*Queue)
if q.config != c {
q.config = c
q.reconfigure()
}
} else {
q = r.newQueue(name, c)
r.queues.Store(key, q)
}
id := queueId{name, true}
inUse[id] = struct{}{}

q := r.getQueue(id, c)

for _, glf := range group {
glf.SetQueue(q)
}
}

r.queues.Range(func(key, qi interface{}) bool {
if !existingKeys[key.(string)] {
qi.(*Queue).close()
r.queues.Delete(key)
}

return true
})
r.deleteUnused(inUse)

return rr
}
Expand All @@ -367,30 +388,41 @@ func (r *Registry) measure() {
r.measuring = true
go func() {
for {
r.queues.Range(func(_, value interface{}) bool {
q := value.(*Queue)
s := q.Status()
r.options.Metrics.UpdateGauge(q.activeRequestsMetricsKey, float64(s.ActiveRequests))
r.options.Metrics.UpdateGauge(q.queuedRequestsMetricsKey, float64(s.QueuedRequests))
return true
})

select {
case <-time.After(r.options.MetricsUpdateTimeout):
r.updateMetrics()
case <-r.quit:
return
}
}
}()
}

// Close closes the registry, including gracefull tearing down the stored
// queues.
func (r *Registry) updateMetrics() {
r.mu.Lock()
defer r.mu.Unlock()

for _, q := range r.queues {
s := q.Status()
r.options.Metrics.UpdateGauge(q.activeRequestsMetricsKey, float64(s.ActiveRequests))
r.options.Metrics.UpdateGauge(q.queuedRequestsMetricsKey, float64(s.QueuedRequests))
}
}

// Close closes the registry, including graceful tearing down the stored queues.
func (r *Registry) Close() {
r.queues.Range(func(_, value interface{}) bool {
value.(*Queue).close()
return true
})
r.mu.Lock()
defer r.mu.Unlock()

for q := range r.deleted {
delete(r.deleted, q)
q.close()
}

for id, q := range r.queues {
delete(r.queues, id)
q.close()
}

close(r.quit)
}
62 changes: 43 additions & 19 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func TestScheduler(t *testing.T) {

func TestConfig(t *testing.T) {
waitForStatus := func(t *testing.T, q *scheduler.Queue, s scheduler.QueueStatus) {
t.Helper()
timeout := time.After(120 * time.Millisecond)
for {
if q.Status() == s {
Expand All @@ -196,6 +197,9 @@ func TestConfig(t *testing.T) {
}
}

const testQueueCloseDelay = 1 * time.Second
*scheduler.ExportQueueCloseDelay = testQueueCloseDelay

initTest := func(doc string) (*routing.Routing, *testdataclient.Client, func()) {
cli, err := testdataclient.NewDoc(doc)
if err != nil {
Expand All @@ -220,6 +224,21 @@ func TestConfig(t *testing.T) {
}
}

updateDoc := func(t *testing.T, dc *testdataclient.Client, upsertDoc string, deletedIDs []string) {
t.Helper()
if err := dc.UpdateDoc(upsertDoc, deletedIDs); err != nil {
t.Fatal(err)
}
time.Sleep(120 * time.Millisecond)
}

getQueue := func(path string, rt *routing.Routing) *scheduler.Queue {
req := &http.Request{URL: &url.URL{Path: path}}
r, _ := rt.Route(req)
f := r.Filters[0]
return f.Filter.(scheduler.LIFOFilter).GetQueue()
}

t.Run("group config applied", func(t *testing.T) {
const doc = `
g1: Path("/one") -> lifoGroup("g", 2, 2) -> <shunt>;
Expand Down Expand Up @@ -273,10 +292,7 @@ func TestConfig(t *testing.T) {
waitForStatus(t, q, scheduler.QueueStatus{ActiveRequests: 2, QueuedRequests: 2})

// change the configuration, should decrease the queue size:
const updateDoc = `route: * -> lifo(2, 1) -> <shunt>`
if err := dc.UpdateDoc(updateDoc, nil); err != nil {
t.Fatal(err)
}
updateDoc(t, dc, `route: * -> lifo(2, 1) -> <shunt>`, nil)

waitForStatus(t, q, scheduler.QueueStatus{ActiveRequests: 2, QueuedRequests: 1})
})
Expand Down Expand Up @@ -309,19 +325,15 @@ func TestConfig(t *testing.T) {
waitForStatus(t, q, scheduler.QueueStatus{ActiveRequests: 2, QueuedRequests: 2})

// change the configuration, should decrease the queue size:
const updateDoc = `
updateDoc(t, dc, `
g1: Path("/one") -> lifoGroup("g", 2, 1) -> <shunt>;
g2: Path("/two") -> lifoGroup("g") -> <shunt>;
`

if err := dc.UpdateDoc(updateDoc, nil); err != nil {
t.Fatal(err)
}
`, nil)

waitForStatus(t, q, scheduler.QueueStatus{ActiveRequests: 2, QueuedRequests: 1})
})

t.Run("queue gets closed when removed", func(t *testing.T) {
t.Run("queue gets closed when removed after delay", func(t *testing.T) {
const doc = `
g1: Path("/one") -> lifo(2, 2) -> <shunt>;
g2: Path("/two") -> lifo(2, 2) -> <shunt>;
Expand All @@ -330,16 +342,28 @@ func TestConfig(t *testing.T) {
rt, dc, close := initTest(doc)
defer close()

req := &http.Request{URL: &url.URL{Path: "/one"}}
r, _ := rt.Route(req)
f := r.Filters[0]
q := f.Filter.(scheduler.LIFOFilter).GetQueue()
q1 := getQueue("/one", rt)
q2 := getQueue("/two", rt)

if err := dc.UpdateDoc("", []string{"g1"}); err != nil {
t.Fatal(err)
}
waitForStatus(t, q1, scheduler.QueueStatus{Closed: false})
waitForStatus(t, q2, scheduler.QueueStatus{Closed: false})

updateDoc(t, dc, "", []string{"g1"})

// Queue is not closed immediately when deleted
waitForStatus(t, q1, scheduler.QueueStatus{Closed: false})
waitForStatus(t, q2, scheduler.QueueStatus{Closed: false})

// An update triggers closing of the deleted queue if it
// was deleted more than testQueueCloseDelay ago
time.Sleep(testQueueCloseDelay)
updateDoc(t, dc, `g3: Path("/three") -> lifo(2, 2) -> <shunt>;`, nil)

q3 := getQueue("/three", rt)

waitForStatus(t, q, scheduler.QueueStatus{Closed: true})
waitForStatus(t, q1, scheduler.QueueStatus{Closed: true})
waitForStatus(t, q2, scheduler.QueueStatus{Closed: false})
waitForStatus(t, q3, scheduler.QueueStatus{Closed: false})
})
}

Expand Down

0 comments on commit 196a5a5

Please sign in to comment.