diff --git a/common/datamodel/datamodel.go b/common/datamodel/datamodel.go index b680eb5c..a49a3c80 100644 --- a/common/datamodel/datamodel.go +++ b/common/datamodel/datamodel.go @@ -296,6 +296,7 @@ func (worker *WorkerProfile) IsLive(duration int) bool { delta := time.Since(worker.Last_Heartbeat_Update) if int(delta.Seconds()) >= duration { + DEBUG.Println("Worker delta failed ", int(delta.Seconds()), " while duration is ", duration) return false } else { return true diff --git a/master/master.go b/master/master.go index 7794e4c4..10218600 100644 --- a/master/master.go +++ b/master/master.go @@ -158,9 +158,9 @@ func (master *Master) onTimer() { master.workerList_lock.Lock() for workerID, worker := range master.workers { duration := master.cfg.Worker.HeartbeatInterval * master.cfg.Worker.DetectionDuration - if worker.IsLive(duration) { + if !worker.IsLive(duration) { delete(master.workers, workerID) - INFO.Println("REMOVE worker " + workerID + " from the list") + INFO.Println("REMOVE worker " + workerID + " from the list, because worker heartbeat " + strconv.Itoa(duration) + " is not live") } } diff --git a/master/taskMgr.go b/master/taskMgr.go index 1754aab9..5bf6d0ae 100644 --- a/master/taskMgr.go +++ b/master/taskMgr.go @@ -290,7 +290,7 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp // check if the location in this input entity is changed locationChanged := false for i := 0; i < len(task.Inputs); i++ { - if task.Inputs[i].ID == entity.ID && task.Inputs[i].Location.IsEqual(&entity.Location) { + if task.Inputs[i].ID == entity.ID && !(task.Inputs[i].Location.IsEqual(&entity.Location)) { locationChanged = true DEBUG.Println("[location changed] entity: ", entity.ID) // update the input entities with the new location @@ -432,7 +432,7 @@ func (flow *FogFlow) removeExecutionPlan(entityID string, inputSubscription *Inp task.removeInput(entityID) //if any of the input streams is delete, the task will be terminated - if flow.checkInputsOfTaskInstance(task) { + if !flow.checkInputsOfTaskInstance(task) { // remove this task DEBUG.Printf("removing an existing task %+v\r\n", task) @@ -508,7 +508,7 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID if groupKey == "ALL" { key := name + "-" + groupKey _, exist := flow.UniqueKeys[key] - if exist { + if !exist { flow.UniqueKeys[key] = make([]interface{}, 0) flow.UniqueKeys[key] = append(flow.UniqueKeys[key], "ALL") } @@ -537,7 +537,7 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID } } - if inList { + if !inList { flow.UniqueKeys[key] = append(flow.UniqueKeys[key], value) } } else { // create a new key @@ -881,7 +881,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction tMgr.subID2FogFunc_lock.RLock() funcName, fogFunctionExist := tMgr.subID2FogFunc[subID] - if fogFunctionExist { + if !fogFunctionExist { INFO.Println("this subscripption is not issued by me") tMgr.subID2FogFunc_lock.RUnlock() return @@ -893,7 +893,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction defer tMgr.fogFlows_lock.Unlock() fogflow, fogFlowExist := tMgr.fogFlows[funcName] - if fogFlowExist { + if !fogFlowExist { INFO.Println("no flow established for this function: ", funcName) return }