From cfb06bf7eb4c860ebdd5025c0ff092032497a248 Mon Sep 17 00:00:00 2001 From: Dimitrios Markou Date: Fri, 12 Jul 2024 15:42:59 +0100 Subject: [PATCH] fix(evpn-bridge): fix system behaviour for pending objects Signed-off-by: Dimitrios Markou --- .../subscriberframework/eventbus/eventbus.go | 18 +++-- pkg/infradb/taskmanager/taskmanager.go | 74 ++++++++++++++----- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 68ad6ff4..c949cd7e 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -6,6 +6,7 @@ package eventbus import ( + "fmt" "log" "sort" "sync" @@ -21,7 +22,6 @@ type EventBus struct { subscribers map[string][]*Subscriber eventHandlers map[string]EventHandler subscriberL sync.RWMutex - publishL sync.RWMutex mutex sync.RWMutex } @@ -92,7 +92,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa subscriber := &Subscriber{ Name: moduleName, - Ch: make(chan interface{}, 1), + Ch: make(chan interface{}), Quit: make(chan bool), Priority: priority, } @@ -154,10 +154,16 @@ func (e *EventBus) UnsubscribeModule(moduleName string) bool { } // Publish api notifies the subscribers with certain eventType -func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) { - e.publishL.RLock() - defer e.publishL.RUnlock() - subscriber.Ch <- objectData +func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { + var err error + + select { + case subscriber.Ch <- objectData: + log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) + default: + err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name) + } + return err } // Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out) diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index 9cbc9754..e744bd6a 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -32,8 +32,9 @@ type Task struct { objectType string resourceVersion string subIndex int - retryTimer time.Duration - subs []*eventbus.Subscriber + // systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus + systemTimer time.Duration + subs []*eventbus.Subscriber } // TaskStatus holds info related to the status that has been received @@ -62,6 +63,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib objectType: objectType, resourceVersion: resourceVersion, subIndex: 0, + systemTimer: 1 * time.Second, subs: subs, } } @@ -96,13 +98,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs // StatusUpdated creates a task status and sends it for handling func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) { taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component) - - // Do we need to make this call happen in a goroutine in order to not make the - // StatusUpdated function stuck in case that nobody reads what is written on the channel ? - // Is there any case where this can happen - // (nobody reads what is written on the channel and the StatusUpdated gets stuck) ? - t.taskStatusChan <- taskStatus - log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus) + log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus) + + // We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel. + // e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile + // the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet + // as the timer has not expired and the queue is empty (We assume that there is only one task in the queue). + select { + case t.taskStatusChan <- taskStatus: + log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus) + default: + log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus) + } } // ReplayFinished notifies that the replay of objects has finished @@ -119,6 +126,9 @@ func (t *TaskManager) processTasks() { task := t.taskQueue.Dequeue() log.Printf("processTasks(): Task has been dequeued for processing: %+v\n", task) + // A new sub-list of the initial subscribers list will be generated based on the value of the subIndex. + // This sub-list can be equal to the initial list (subIndex is equal to zero) or smaller than the initial + // list (subIndex greater than zero) in case a requeue event occurred. subsToIterate := task.subs[task.subIndex:] loopTwo: for i, sub := range subsToIterate { @@ -131,7 +141,21 @@ func (t *TaskManager) processTasks() { // (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed) NotificationID: uuid.NewString(), } - eventbus.EBus.Publish(objectData, sub) + if err := eventbus.EBus.Publish(objectData, sub); err != nil { + log.Printf("processTasks(): Failed to sent notification: %+v\n", err) + log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing + // of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list + // of the remaining subscribers which is equal or smaller than the initial subscribers list. + task.subIndex += i + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) + break loopTwo + } log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData) loopThree: @@ -151,11 +175,17 @@ func (t *TaskManager) processTasks() { log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID) // We need a timeout in case that the subscriber doesn't update the status at all for whatever reason. - // If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer + // If that occurs then we just requeue the task with a timer case <-time.After(30 * time.Second): - log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus) + log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i - go t.taskQueue.Enqueue(task) + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) break loopThree } } @@ -168,19 +198,29 @@ func (t *TaskManager) processTasks() { break loopTwo } + // We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded + task.systemTimer = 1 * time.Second + switch taskStatus.component.CompStatus { case common.ComponentStatusSuccess: log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task) continue loopTwo - default: + case common.ComponentStatusError: log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task) + log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing + // of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list + // of the remaining subscribers which is equal or smaller than the initial subscribers list. task.subIndex += i - task.retryTimer = taskStatus.component.Timer - log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer) - time.AfterFunc(task.retryTimer, func() { + time.AfterFunc(taskStatus.component.Timer, func() { t.taskQueue.Enqueue(task) }) break loopTwo + default: + log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task) + log.Printf("processTasks(): The task %+v will be dropped\n", task) + break loopTwo } } }