Skip to content

Commit

Permalink
fix(evpn-bridge): fix system behaviour for pending objects
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitrios Markou <[email protected]>
  • Loading branch information
mardim91 authored and vjambeka committed Nov 19, 2024
1 parent bd3c72b commit cfb06bf
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
18 changes: 12 additions & 6 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package eventbus

import (
"fmt"
"log"
"sort"
"sync"
Expand All @@ -21,7 +22,6 @@ type EventBus struct {
subscribers map[string][]*Subscriber
eventHandlers map[string]EventHandler
subscriberL sync.RWMutex
publishL sync.RWMutex
mutex sync.RWMutex
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa

subscriber := &Subscriber{
Name: moduleName,
Ch: make(chan interface{}, 1),
Ch: make(chan interface{}),
Quit: make(chan bool),
Priority: priority,
}
Expand Down Expand Up @@ -154,10 +154,16 @@ func (e *EventBus) UnsubscribeModule(moduleName string) bool {
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) {
e.publishL.RLock()
defer e.publishL.RUnlock()
subscriber.Ch <- objectData
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {
var err error

select {
case subscriber.Ch <- objectData:
log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name)
default:
err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name)
}
return err
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
Expand Down
74 changes: 57 additions & 17 deletions pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type Task struct {
objectType string
resourceVersion string
subIndex int
retryTimer time.Duration
subs []*eventbus.Subscriber
// systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus
systemTimer time.Duration
subs []*eventbus.Subscriber
}

// TaskStatus holds info related to the status that has been received
Expand Down Expand Up @@ -62,6 +63,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib
objectType: objectType,
resourceVersion: resourceVersion,
subIndex: 0,
systemTimer: 1 * time.Second,
subs: subs,
}
}
Expand Down Expand Up @@ -96,13 +98,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs
// StatusUpdated creates a task status and sends it for handling
func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) {
taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component)

// Do we need to make this call happen in a goroutine in order to not make the
// StatusUpdated function stuck in case that nobody reads what is written on the channel ?
// Is there any case where this can happen
// (nobody reads what is written on the channel and the StatusUpdated gets stuck) ?
t.taskStatusChan <- taskStatus
log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus)
log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus)

// We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel.
// e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile
// the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet
// as the timer has not expired and the queue is empty (We assume that there is only one task in the queue).
select {
case t.taskStatusChan <- taskStatus:
log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus)
default:
log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus)
}
}

// ReplayFinished notifies that the replay of objects has finished
Expand All @@ -119,6 +126,9 @@ func (t *TaskManager) processTasks() {
task := t.taskQueue.Dequeue()
log.Printf("processTasks(): Task has been dequeued for processing: %+v\n", task)

// A new sub-list of the initial subscribers list will be generated based on the value of the subIndex.
// This sub-list can be equal to the initial list (subIndex is equal to zero) or smaller than the initial
// list (subIndex greater than zero) in case a requeue event occurred.
subsToIterate := task.subs[task.subIndex:]
loopTwo:
for i, sub := range subsToIterate {
Expand All @@ -131,7 +141,21 @@ func (t *TaskManager) processTasks() {
// (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed)
NotificationID: uuid.NewString(),
}
eventbus.EBus.Publish(objectData, sub)
if err := eventbus.EBus.Publish(objectData, sub); err != nil {
log.Printf("processTasks(): Failed to sent notification: %+v\n", err)
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing
// of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list
// of the remaining subscribers which is equal or smaller than the initial subscribers list.
task.subIndex += i
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo
}
log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData)

loopThree:
Expand All @@ -151,11 +175,17 @@ func (t *TaskManager) processTasks() {
log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID)

// We need a timeout in case that the subscriber doesn't update the status at all for whatever reason.
// If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer
// If that occurs then we just requeue the task with a timer
case <-time.After(30 * time.Second):
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus)
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
go t.taskQueue.Enqueue(task)
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopThree
}
}
Expand All @@ -168,19 +198,29 @@ func (t *TaskManager) processTasks() {
break loopTwo
}

// We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded
task.systemTimer = 1 * time.Second

switch taskStatus.component.CompStatus {
case common.ComponentStatusSuccess:
log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task)
continue loopTwo
default:
case common.ComponentStatusError:
log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task)
log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing
// of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list
// of the remaining subscribers which is equal or smaller than the initial subscribers list.
task.subIndex += i
task.retryTimer = taskStatus.component.Timer
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer)
time.AfterFunc(task.retryTimer, func() {
time.AfterFunc(taskStatus.component.Timer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo
default:
log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task)
log.Printf("processTasks(): The task %+v will be dropped\n", task)
break loopTwo
}
}
}
Expand Down

0 comments on commit cfb06bf

Please sign in to comment.