diff --git a/broker/ngsiv1.go b/broker/ngsiv1.go index 6663b0a8..a817ff30 100644 --- a/broker/ngsiv1.go +++ b/broker/ngsiv1.go @@ -174,12 +174,16 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req tb.subscriptions[subID] = &subReq tb.subscriptions_lock.Unlock() // take actions - if subReq.Subscriber.IsInternal == true { + if subReq.Subscriber.IsInternal { INFO.Println("internal subscription coming from another broker") for _, entity := range subReq.Entities { tb.e2sub_lock.Lock() - tb.entityId2Subcriptions[entity.ID] = append(tb.entityId2Subcriptions[entity.ID], subID) + if tb.subscriptions[subID].IsSimpleByType() { + tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], subID) + } else { + tb.entityId2Subcriptions[entity.ID] = append(tb.entityId2Subcriptions[entity.ID], subID) + } tb.e2sub_lock.Unlock() } tb.notifyOneSubscriberWithCurrentStatus(subReq.Entities, subID) @@ -250,13 +254,13 @@ func (tb *ThinBroker) NGSIV1_NotifyContextAvailability(w rest.ResponseWriter, r //map it to the main subscription tb.subLinks_lock.Lock() mainSubID, exist := tb.availabilitySub2MainSub[subID] - if exist == false { + if !exist { DEBUG.Println("put it into the tempCache and handle it later") tb.tmpNGSI9NotifyCache[subID] = ¬ifyContextAvailabilityReq } tb.subLinks_lock.Unlock() - if exist == true { + if exist { tb.handleNGSI9Notify(mainSubID, ¬ifyContextAvailabilityReq) } } diff --git a/broker/thinBroker.go b/broker/thinBroker.go index 25df2de0..698618fb 100644 --- a/broker/thinBroker.go +++ b/broker/thinBroker.go @@ -92,13 +92,13 @@ func (tb *ThinBroker) OnTimer() { // for every 2 second hasCachedNotification := false tb.subscriptions_lock.Lock() if subscription, exist := tb.subscriptions[sid]; exist { - if subscription.Subscriber.RequireReliability == true && len(subscription.Subscriber.NotifyCache) > 0 { + if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 { hasCachedNotification = true } } tb.subscriptions_lock.Unlock() - if hasCachedNotification == true { + if hasCachedNotification { elements := make([]ContextElement, 0) tb.sendReliableNotify(elements, sid) } @@ -663,7 +663,17 @@ func (tb *ThinBroker) UnsubscribeContextAvailability(sid string) error { return err } +func stringsContains(slice []string, e string) bool { + for _, sliceElement := range slice { + if sliceElement == e { + return true + } + } + return false +} + func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabilityReq *NotifyContextAvailabilityRequest) { + var action string notifyContextAvailabilityReq.ErrorCode.Code = 301 switch notifyContextAvailabilityReq.ErrorCode.Code { @@ -674,15 +684,30 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil case 410: action = "DELETE" } - INFO.Println(action, " subID ", mainSubID) + + if tb.isDebugEnabled { + DEBUG.Println(action, " subID ", mainSubID, " subscription isSimpleByType ", tb.subscriptions[mainSubID].IsSimpleByType()) + DEBUG.Println(tb.entityId2Subcriptions) + } + for _, registrationResp := range notifyContextAvailabilityReq.ContextRegistrationResponseList { registration := registrationResp.ContextRegistration for _, eid := range registration.EntityIdList { - INFO.Println("===> ", eid, " , ", mainSubID) + + if tb.isDebugEnabled { + DEBUG.Println("===> ", eid, " , ", mainSubID) + } + tb.e2sub_lock.Lock() if action == "CREATE" { - tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID) + if tb.subscriptions[mainSubID].IsSimpleByType() { + if !stringsContains(tb.entityId2Subcriptions["*"], mainSubID) { + tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], mainSubID) + } + } else { + tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID) + } } else if action == "DELETE" { subList := tb.entityId2Subcriptions[eid.ID] for i, id := range subList { @@ -692,16 +717,16 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil } } } else if action == "UPDATE" { - existFlag := false - for _, subID := range tb.entityId2Subcriptions[eid.ID] { - if subID == mainSubID { - existFlag = true - break + if tb.subscriptions[mainSubID].IsSimpleByType() { + if !stringsContains(tb.entityId2Subcriptions["*"], mainSubID) { + tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], mainSubID) + } + } else { + if !stringsContains(tb.entityId2Subcriptions[eid.ID], mainSubID) { + tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID) } } - if existFlag == false { - tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID) - } + } tb.e2sub_lock.Unlock() @@ -717,24 +742,34 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil tb.notifyOneSubscriberWithCurrentStatus(registration.EntityIdList, mainSubID) } } else { - //for matched entities provided by other IoT Brokers - newSubscription := SubscribeContextRequest{} - newSubscription.Entities = registration.EntityIdList - newSubscription.Reference = tb.MyURL - newSubscription.Subscriber.BrokerURL = registration.ProvidingApplication - if action == "CREATE" || action == "UPDATE" { - sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg) - if err == nil { - // INFO.Println("issue a new subscription ", sid) + // this check is to subscribe to the data only for complex subscription + if !tb.subscriptions[mainSubID].IsSimpleByType() || stringsContains(tb.entityId2Subcriptions["*"], mainSubID) { + + //for matched entities provided by other IoT Brokers + newSubscription := SubscribeContextRequest{} + if tb.subscriptions[mainSubID].IsSimpleByType() { + entity := tb.subscriptions[mainSubID].Entities[0] + newSubscription.Entities = append(newSubscription.Entities, entity) + } else { + newSubscription.Entities = registration.EntityIdList + } + newSubscription.Reference = tb.MyURL + newSubscription.Subscriber.BrokerURL = registration.ProvidingApplication + + if action == "CREATE" || action == "UPDATE" { + sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg) + if err == nil { + // INFO.Println("issue a new subscription ", sid) - tb.subscriptions_lock.Lock() - tb.subscriptions[sid] = &newSubscription - tb.subscriptions_lock.Unlock() + tb.subscriptions_lock.Lock() + tb.subscriptions[sid] = &newSubscription + tb.subscriptions_lock.Unlock() - tb.subLinks_lock.Lock() - tb.main2Other[mainSubID] = append(tb.main2Other[mainSubID], sid) - tb.subLinks_lock.Unlock() + tb.subLinks_lock.Lock() + tb.main2Other[mainSubID] = append(tb.main2Other[mainSubID], sid) + tb.subLinks_lock.Unlock() + } } } } diff --git a/common/config/config.go b/common/config/config.go index 45508468..2bec15a3 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "log" "os" "path/filepath" @@ -227,7 +226,7 @@ func (c *Config) GetMessageBus() string { var logTargets map[string]io.Writer = map[string]io.Writer{ "stdout": os.Stdout, "stderr": os.Stderr, - "discard": ioutil.Discard, + "discard": io.Discard, } func (c *Config) SetLogTargets() { @@ -238,7 +237,7 @@ func (c *Config) SetLogTargets() { INFO = log.New(target, "INFO: ", log.Ldate|log.Ltime) target, ok = logTargets[c.Logging.Protocol] if !ok { - target = ioutil.Discard + target = io.Discard } PROTOCOL = log.New(target, "PROTOCOL: ", log.Ldate|log.Ltime) target, ok = logTargets[c.Logging.Errlog] @@ -248,7 +247,7 @@ func (c *Config) SetLogTargets() { ERROR = log.New(target, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile) target, ok = logTargets[c.Logging.Debug] if !ok { - target = ioutil.Discard + target = io.Discard } DEBUG = log.New(target, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile) } diff --git a/common/datamodel/datamodel.go b/common/datamodel/datamodel.go index a49a3c80..0998cef9 100644 --- a/common/datamodel/datamodel.go +++ b/common/datamodel/datamodel.go @@ -106,6 +106,15 @@ type InputStreamConfig struct { Scoped bool `json:"scoped"` } +// This is to state that we do not need to save all registrations if the subscription is simply by type +func (inputSelect *InputStreamConfig) IsSimpleByType() bool { + if !inputSelect.Scoped && inputSelect.GroupBy != "EntityID" { + return true + } else { + return false + } +} + type OutputStreamConfig struct { EntityType string `json:"entity_type"` } diff --git a/common/ngsi/ngsi.go b/common/ngsi/ngsi.go index 703fa651..f5548b5f 100644 --- a/common/ngsi/ngsi.go +++ b/common/ngsi/ngsi.go @@ -835,9 +835,7 @@ func (registredEntity *EntityRegistration) GetLocation() Point { return Point{0.0, 0.0} } -// // used by master to group the received input -// func (registredEntity *EntityRegistration) IsMatched(restrictions map[string]interface{}) bool { matched := true @@ -938,6 +936,20 @@ type SubscribeContextRequest struct { Subscriber Subscriber } +func (subscribeContextRequest *SubscribeContextRequest) IsSimpleByType() bool { + var flag = true + + if len(subscribeContextRequest.Restriction.Scopes) == 0 { + if len(subscribeContextRequest.Entities) == 1 { + if subscribeContextRequest.Entities[0].ID == "" { + flag = true + } + } + } + + return flag +} + type SubscriptionRequest struct { Attributes []string `json:"attributes,omitempty"` Subscriber Subscriber @@ -1375,7 +1387,7 @@ type ConfigCommand struct { CorrelatorID string `json:"correlatorID"` } -//To handle RegisterContextRequest coming from IoT Agent +// To handle RegisterContextRequest coming from IoT Agent type RegisterContextRequest1 struct { ContextRegistrations []ContextRegistration1 `json:"contextRegistrations,omitempty"` Duration string `json:"duration,omitempty"` @@ -1441,9 +1453,7 @@ type LDNotifyContextRequest struct { NotifyAt string `json:"notifiedAt,omitempty"` } -// // the part to deal with NGSI v1 update supported by Orion Context Broker -// func (element *ContextElement) SetEntityID() { if element.ID != "" { element.Entity.ID = element.ID diff --git a/master/taskMgr.go b/master/taskMgr.go index 5bf6d0ae..e13672cc 100644 --- a/master/taskMgr.go +++ b/master/taskMgr.go @@ -99,7 +99,9 @@ func (gf *GroupInfo) GetHash() string { sortedpairs := make([]*KVPair, 0) for k, v := range *gf { - DEBUG.Println("group k: %s, v: %+v\r\n", k, v) + if fmt.Sprintf("%T", DEBUG.Writer()) != "io.discard" { + DEBUG.Printf("group k: %s, v: %+v\r\n", k, v) + } kvpair := KVPair{} kvpair.Key = k @@ -118,7 +120,7 @@ func (gf *GroupInfo) GetHash() string { } } - // generate the has code + // generate the hash code text := "" for _, pair := range sortedpairs { temp, _ := json.Marshal(pair.Value) @@ -150,41 +152,49 @@ func (flow *FogFlow) Init() { } // to update the execution plan based on the changes of registered context availability -func (flow *FogFlow) MetadataDrivenTaskOrchestration(subID string, entityAction string, registredEntity *EntityRegistration, workerSelection ProximityWorkerSelectionFn) []*DeploymentAction { - if _, exist := flow.Subscriptions[subID]; exist == false { +func (flow *FogFlow) MetadataDrivenTaskOrchestration(subID string, entityAction string, registeredEntity *EntityRegistration, workerSelection ProximityWorkerSelectionFn) []*DeploymentAction { + if _, exist := flow.Subscriptions[subID]; !exist { DEBUG.Println(subID, "subscription does not exist any more") return nil } inputSubscription := flow.Subscriptions[subID] - entityID := registredEntity.ID + entityID := registeredEntity.ID + if inputSubscription.InputSelector.IsSimpleByType() { + // This check is to see if the selection is only type (no entityId, no scope) + entityID = "*" + } DEBUG.Println(entityAction, " entity ", entityID, "from Subscription ", subID) switch entityAction { case "CREATE", "UPDATE": //update context availability if _, exist := inputSubscription.ReceivedEntityRegistrations[entityID]; exist { - existEntityRegistration := inputSubscription.ReceivedEntityRegistrations[entityID] - existEntityRegistration.Update(registredEntity) + if !inputSubscription.InputSelector.IsSimpleByType() { + // This check is to see if the selection is only type (no entityId, no scope) + // if it is not the case, then the registeredEntity is important + existEntityRegistration := inputSubscription.ReceivedEntityRegistrations[entityID] + existEntityRegistration.Update(registeredEntity) + } } else { - inputSubscription.ReceivedEntityRegistrations[entityID] = registredEntity + inputSubscription.ReceivedEntityRegistrations[entityID] = registeredEntity } //update the group key-value table for orchestration flow.updateGroupedKeyValueTable(inputSubscription, entityID) //check what needs to be instantiated when all required inputs are available - if flow.checkInputAvailability() == true { + if flow.checkInputAvailability() { return flow.expandExecutionPlan(entityID, inputSubscription, workerSelection) } case "DELETE": _, exist := inputSubscription.ReceivedEntityRegistrations[entityID] - if exist == false { - INFO.Println("entity registration has not arrived yet") + if !exist { + INFO.Println("entity registration has not arrived yet, entityId: ", registeredEntity.ID) return nil } - if flow.checkInputAvailability() == true { + if flow.checkInputAvailability() { return flow.removeExecutionPlan(entityID, inputSubscription) } @@ -242,12 +252,13 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp // check if the associated task instance is already created if task, exist := flow.ExecutionPlan[hashID]; exist { entitiesList := flow.searchRelevantEntities(&group, entityID) + DEBUG.Println("[entitiesList]: ", entitiesList) for _, entity := range entitiesList { newInput := true for _, input := range task.Inputs { + inputSubscription.InputSelector.IsSimpleByType() // If (input.ID != "") it means that (selector.Scoped || selector.GroupBy == "EntityID") - // see later in this file why that - if input.ID != "" { + if !inputSubscription.InputSelector.IsSimpleByType() { if input.ID == entity.ID { newInput = false break @@ -289,6 +300,7 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp } else { // check if the location in this input entity is changed locationChanged := false + DEBUG.Printf("Length of Task Input %d", len(task.Inputs)) for i := 0; i < len(task.Inputs); i++ { if task.Inputs[i].ID == entity.ID && !(task.Inputs[i].Location.IsEqual(&entity.Location)) { locationChanged = true @@ -502,18 +514,18 @@ func (flow *FogFlow) removeGroupKeyFromTable(groupInfo *GroupInfo) { func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID string) { selector := sub.InputSelector - name := selector.EntityType + entityType := selector.EntityType groupKey := selector.GroupBy if groupKey == "ALL" { - key := name + "-" + groupKey + key := entityType + "-" + groupKey _, exist := flow.UniqueKeys[key] if !exist { flow.UniqueKeys[key] = make([]interface{}, 0) flow.UniqueKeys[key] = append(flow.UniqueKeys[key], "ALL") } } else { - key := name + "-" + groupKey + key := entityType + "-" + groupKey entity := sub.ReceivedEntityRegistrations[entityID] var value interface{} @@ -636,15 +648,16 @@ func (flow *FogFlow) searchRelevantEntities(group *GroupInfo, updatedEntityID st // filtering entityloop: for _, entityRegistration := range inputSub.ReceivedEntityRegistrations { + + DEBUG.Println("entityRegistration", entityRegistration) + if entityRegistration.IsMatched(restrictions) { inputEntity := InputEntity{} // The following if is to check if it is really necessary to subscribe for each matching entity (that might thousands) // it is necessary it is grouped per entityID // and it is necessary if it is scoped because only the entities within the scope - if selector.Scoped || selector.GroupBy == "EntityID" { - inputEntity.ID = entityRegistration.ID - } else { + if selector.IsSimpleByType() { // if we are here, it is because we want to subscribe per type and not care the entityId // In this way the subscription will be minimal and faster for _, entity := range entities { @@ -652,6 +665,8 @@ func (flow *FogFlow) searchRelevantEntities(group *GroupInfo, updatedEntityID st continue entityloop } } + } else { + inputEntity.ID = entityRegistration.ID } inputEntity.Type = entityRegistration.Type @@ -737,7 +752,7 @@ func (tMgr *TaskMgr) handleTaskIntent(taskIntent *TaskIntent) { } func (tMgr *TaskMgr) handleSynchronousTaskIntent(taskIntent *TaskIntent) { - INFO.Println("[SYNC]orchestrating task intent: %+v", taskIntent) + INFO.Printf("[SYNC]orchestrating task intent: %+v", taskIntent) fogflow := FogFlow{} @@ -858,13 +873,13 @@ func (tMgr *TaskMgr) selector2Subscription(inputSelector *InputStreamConfig, geo // apply the required attributes availabilitySubscription.Attributes = make([]string, 0) for _, attribute := range inputSelector.SelectedAttributes { - if strings.EqualFold(attribute, "all") == false { + if !strings.EqualFold(attribute, "all") { availabilitySubscription.Attributes = append(availabilitySubscription.Attributes, attribute) } } // apply the required geoscope - if inputSelector.Scoped == true { + if inputSelector.Scoped { availabilitySubscription.Restriction.Scopes = append(availabilitySubscription.Restriction.Scopes, geoscope) } @@ -882,7 +897,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction tMgr.subID2FogFunc_lock.RLock() funcName, fogFunctionExist := tMgr.subID2FogFunc[subID] if !fogFunctionExist { - INFO.Println("this subscripption is not issued by me") + INFO.Println("this subscription is not issued by me") tMgr.subID2FogFunc_lock.RUnlock() return } @@ -901,7 +916,9 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction // derive the deployment actions according to the received registration deploymentActions := fogflow.MetadataDrivenTaskOrchestration(subID, entityAction, entityRegistration, tMgr.master.SelectWorker) if deploymentActions == nil || len(deploymentActions) == 0 { - DEBUG.Println("nothing is triggered!!!") + if tMgr.master.isDebugEnabled { + DEBUG.Println("nothing is triggered!!!") + } return } diff --git a/worker/dockerengine.go b/worker/dockerengine.go index 58486533..c3017b8b 100644 --- a/worker/dockerengine.go +++ b/worker/dockerengine.go @@ -59,12 +59,11 @@ func (dockerengine *DockerEngine) Init(cfg *Config) bool { func (dockerengine *DockerEngine) PullImage(dockerImage string) (string, error) { auth := docker.AuthConfiguration{} - if dockerengine.workerCfg.Worker.Registry.IsConfigured() == true { + if dockerengine.workerCfg.Worker.Registry.IsConfigured() { auth.Username = dockerengine.workerCfg.Worker.Registry.Username auth.Password = dockerengine.workerCfg.Worker.Registry.Password auth.Email = dockerengine.workerCfg.Worker.Registry.Email auth.ServerAddress = dockerengine.workerCfg.Worker.Registry.ServerAddress - dockerImage = dockerImage } DEBUG.Printf("options : %+v\r\n", auth) @@ -127,14 +126,14 @@ func (dockerengine *DockerEngine) findFreePortNumber() int { return l.Addr().(*net.TCPAddr).Port } -//functionCode string, taskID string, adminCfg []interface{}, servicePorts []string) +// functionCode string, taskID string, adminCfg []interface{}, servicePorts []string) func (dockerengine *DockerEngine) StartTask(task *ScheduledTaskInstance, brokerURL string) (string, string, error) { dockerImage := task.DockerImage INFO.Println("to execute Task [", task.ID, "] to perform Operation [", dockerImage, "] with parameters [", task.Parameters, "]") // first check the image locally - if dockerengine.InspectImage(dockerImage) == false { + if !dockerengine.InspectImage(dockerImage) { // if the image does not exist locally, try to fetch it from docker hub _, pullError := dockerengine.PullImage(dockerImage) if pullError != nil {