Skip to content

Commit

Permalink
Merge pull request #404 from flaviocirillo/fogflow-fcfork
Browse files Browse the repository at this point in the history
Fixed bug when removing bad coding habits on checking boolean
  • Loading branch information
smartfog authored Sep 18, 2024
2 parents c10648f + 76c76e5 commit e72afcd
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
1 change: 1 addition & 0 deletions common/datamodel/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (worker *WorkerProfile) IsLive(duration int) bool {
delta := time.Since(worker.Last_Heartbeat_Update)

if int(delta.Seconds()) >= duration {
DEBUG.Println("Worker delta failed ", int(delta.Seconds()), " while duration is ", duration)
return false
} else {
return true
Expand Down
4 changes: 2 additions & 2 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (master *Master) onTimer() {
master.workerList_lock.Lock()
for workerID, worker := range master.workers {
duration := master.cfg.Worker.HeartbeatInterval * master.cfg.Worker.DetectionDuration
if worker.IsLive(duration) {
if !worker.IsLive(duration) {
delete(master.workers, workerID)
INFO.Println("REMOVE worker " + workerID + " from the list")
INFO.Println("REMOVE worker " + workerID + " from the list, because worker heartbeat " + strconv.Itoa(duration) + " is not live")
}
}

Expand Down
12 changes: 6 additions & 6 deletions master/taskMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp
// check if the location in this input entity is changed
locationChanged := false
for i := 0; i < len(task.Inputs); i++ {
if task.Inputs[i].ID == entity.ID && task.Inputs[i].Location.IsEqual(&entity.Location) {
if task.Inputs[i].ID == entity.ID && !(task.Inputs[i].Location.IsEqual(&entity.Location)) {
locationChanged = true
DEBUG.Println("[location changed] entity: ", entity.ID)
// update the input entities with the new location
Expand Down Expand Up @@ -432,7 +432,7 @@ func (flow *FogFlow) removeExecutionPlan(entityID string, inputSubscription *Inp
task.removeInput(entityID)

//if any of the input streams is delete, the task will be terminated
if flow.checkInputsOfTaskInstance(task) {
if !flow.checkInputsOfTaskInstance(task) {
// remove this task
DEBUG.Printf("removing an existing task %+v\r\n", task)

Expand Down Expand Up @@ -508,7 +508,7 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID
if groupKey == "ALL" {
key := name + "-" + groupKey
_, exist := flow.UniqueKeys[key]
if exist {
if !exist {
flow.UniqueKeys[key] = make([]interface{}, 0)
flow.UniqueKeys[key] = append(flow.UniqueKeys[key], "ALL")
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID
}
}

if inList {
if !inList {
flow.UniqueKeys[key] = append(flow.UniqueKeys[key], value)
}
} else { // create a new key
Expand Down Expand Up @@ -881,7 +881,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction

tMgr.subID2FogFunc_lock.RLock()
funcName, fogFunctionExist := tMgr.subID2FogFunc[subID]
if fogFunctionExist {
if !fogFunctionExist {
INFO.Println("this subscripption is not issued by me")
tMgr.subID2FogFunc_lock.RUnlock()
return
Expand All @@ -893,7 +893,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction
defer tMgr.fogFlows_lock.Unlock()

fogflow, fogFlowExist := tMgr.fogFlows[funcName]
if fogFlowExist {
if !fogFlowExist {
INFO.Println("no flow established for this function: ", funcName)
return
}
Expand Down

0 comments on commit e72afcd

Please sign in to comment.