Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bypass some operations when a fogfunction is looking only for a type … #405

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions broker/ngsiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,16 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
tb.subscriptions[subID] = &subReq
tb.subscriptions_lock.Unlock()
// take actions
if subReq.Subscriber.IsInternal == true {
if subReq.Subscriber.IsInternal {
INFO.Println("internal subscription coming from another broker")

for _, entity := range subReq.Entities {
tb.e2sub_lock.Lock()
tb.entityId2Subcriptions[entity.ID] = append(tb.entityId2Subcriptions[entity.ID], subID)
if tb.subscriptions[subID].IsSimpleByType() {
tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], subID)
} else {
tb.entityId2Subcriptions[entity.ID] = append(tb.entityId2Subcriptions[entity.ID], subID)
}
tb.e2sub_lock.Unlock()
}
tb.notifyOneSubscriberWithCurrentStatus(subReq.Entities, subID)
Expand Down Expand Up @@ -250,13 +254,13 @@ func (tb *ThinBroker) NGSIV1_NotifyContextAvailability(w rest.ResponseWriter, r
//map it to the main subscription
tb.subLinks_lock.Lock()
mainSubID, exist := tb.availabilitySub2MainSub[subID]
if exist == false {
if !exist {
DEBUG.Println("put it into the tempCache and handle it later")
tb.tmpNGSI9NotifyCache[subID] = &notifyContextAvailabilityReq
}
tb.subLinks_lock.Unlock()

if exist == true {
if exist {
tb.handleNGSI9Notify(mainSubID, &notifyContextAvailabilityReq)
}
}
91 changes: 63 additions & 28 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func (tb *ThinBroker) OnTimer() { // for every 2 second
hasCachedNotification := false
tb.subscriptions_lock.Lock()
if subscription, exist := tb.subscriptions[sid]; exist {
if subscription.Subscriber.RequireReliability == true && len(subscription.Subscriber.NotifyCache) > 0 {
if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 {
hasCachedNotification = true
}
}
tb.subscriptions_lock.Unlock()

if hasCachedNotification == true {
if hasCachedNotification {
elements := make([]ContextElement, 0)
tb.sendReliableNotify(elements, sid)
}
Expand Down Expand Up @@ -663,7 +663,17 @@ func (tb *ThinBroker) UnsubscribeContextAvailability(sid string) error {
return err
}

func stringsContains(slice []string, e string) bool {
for _, sliceElement := range slice {
if sliceElement == e {
return true
}
}
return false
}

func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabilityReq *NotifyContextAvailabilityRequest) {

var action string
notifyContextAvailabilityReq.ErrorCode.Code = 301
switch notifyContextAvailabilityReq.ErrorCode.Code {
Expand All @@ -674,15 +684,30 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
case 410:
action = "DELETE"
}
INFO.Println(action, " subID ", mainSubID)

if tb.isDebugEnabled {
DEBUG.Println(action, " subID ", mainSubID, " subscription isSimpleByType ", tb.subscriptions[mainSubID].IsSimpleByType())
DEBUG.Println(tb.entityId2Subcriptions)
}

for _, registrationResp := range notifyContextAvailabilityReq.ContextRegistrationResponseList {
registration := registrationResp.ContextRegistration
for _, eid := range registration.EntityIdList {
INFO.Println("===> ", eid, " , ", mainSubID)

if tb.isDebugEnabled {
DEBUG.Println("===> ", eid, " , ", mainSubID)
}

tb.e2sub_lock.Lock()

if action == "CREATE" {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
if tb.subscriptions[mainSubID].IsSimpleByType() {
if !stringsContains(tb.entityId2Subcriptions["*"], mainSubID) {
tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], mainSubID)
}
} else {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
}
} else if action == "DELETE" {
subList := tb.entityId2Subcriptions[eid.ID]
for i, id := range subList {
Expand All @@ -692,16 +717,16 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
}
}
} else if action == "UPDATE" {
existFlag := false
for _, subID := range tb.entityId2Subcriptions[eid.ID] {
if subID == mainSubID {
existFlag = true
break
if tb.subscriptions[mainSubID].IsSimpleByType() {
if !stringsContains(tb.entityId2Subcriptions["*"], mainSubID) {
tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], mainSubID)
}
} else {
if !stringsContains(tb.entityId2Subcriptions[eid.ID], mainSubID) {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
}
}
if existFlag == false {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
}

}

tb.e2sub_lock.Unlock()
Expand All @@ -717,24 +742,34 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
tb.notifyOneSubscriberWithCurrentStatus(registration.EntityIdList, mainSubID)
}
} else {
//for matched entities provided by other IoT Brokers
newSubscription := SubscribeContextRequest{}
newSubscription.Entities = registration.EntityIdList
newSubscription.Reference = tb.MyURL
newSubscription.Subscriber.BrokerURL = registration.ProvidingApplication

if action == "CREATE" || action == "UPDATE" {
sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg)
if err == nil {
// INFO.Println("issue a new subscription ", sid)
// this check is to subscribe to the data only for complex subscription
if !tb.subscriptions[mainSubID].IsSimpleByType() || stringsContains(tb.entityId2Subcriptions["*"], mainSubID) {

//for matched entities provided by other IoT Brokers
newSubscription := SubscribeContextRequest{}
if tb.subscriptions[mainSubID].IsSimpleByType() {
entity := tb.subscriptions[mainSubID].Entities[0]
newSubscription.Entities = append(newSubscription.Entities, entity)
} else {
newSubscription.Entities = registration.EntityIdList
}
newSubscription.Reference = tb.MyURL
newSubscription.Subscriber.BrokerURL = registration.ProvidingApplication

if action == "CREATE" || action == "UPDATE" {
sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg)
if err == nil {
// INFO.Println("issue a new subscription ", sid)

tb.subscriptions_lock.Lock()
tb.subscriptions[sid] = &newSubscription
tb.subscriptions_lock.Unlock()
tb.subscriptions_lock.Lock()
tb.subscriptions[sid] = &newSubscription
tb.subscriptions_lock.Unlock()

tb.subLinks_lock.Lock()
tb.main2Other[mainSubID] = append(tb.main2Other[mainSubID], sid)
tb.subLinks_lock.Unlock()
tb.subLinks_lock.Lock()
tb.main2Other[mainSubID] = append(tb.main2Other[mainSubID], sid)
tb.subLinks_lock.Unlock()
}
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -227,7 +226,7 @@ func (c *Config) GetMessageBus() string {
var logTargets map[string]io.Writer = map[string]io.Writer{
"stdout": os.Stdout,
"stderr": os.Stderr,
"discard": ioutil.Discard,
"discard": io.Discard,
}

func (c *Config) SetLogTargets() {
Expand All @@ -238,7 +237,7 @@ func (c *Config) SetLogTargets() {
INFO = log.New(target, "INFO: ", log.Ldate|log.Ltime)
target, ok = logTargets[c.Logging.Protocol]
if !ok {
target = ioutil.Discard
target = io.Discard
}
PROTOCOL = log.New(target, "PROTOCOL: ", log.Ldate|log.Ltime)
target, ok = logTargets[c.Logging.Errlog]
Expand All @@ -248,7 +247,7 @@ func (c *Config) SetLogTargets() {
ERROR = log.New(target, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile)
target, ok = logTargets[c.Logging.Debug]
if !ok {
target = ioutil.Discard
target = io.Discard
}
DEBUG = log.New(target, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Expand Down
9 changes: 9 additions & 0 deletions common/datamodel/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ type InputStreamConfig struct {
Scoped bool `json:"scoped"`
}

// This is to state that we do not need to save all registrations if the subscription is simply by type
func (inputSelect *InputStreamConfig) IsSimpleByType() bool {
if !inputSelect.Scoped && inputSelect.GroupBy != "EntityID" {
return true
} else {
return false
}
}

type OutputStreamConfig struct {
EntityType string `json:"entity_type"`
}
Expand Down
20 changes: 15 additions & 5 deletions common/ngsi/ngsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,7 @@ func (registredEntity *EntityRegistration) GetLocation() Point {
return Point{0.0, 0.0}
}

//
// used by master to group the received input
//
func (registredEntity *EntityRegistration) IsMatched(restrictions map[string]interface{}) bool {
matched := true

Expand Down Expand Up @@ -938,6 +936,20 @@ type SubscribeContextRequest struct {
Subscriber Subscriber
}

func (subscribeContextRequest *SubscribeContextRequest) IsSimpleByType() bool {
var flag = true

if len(subscribeContextRequest.Restriction.Scopes) == 0 {
if len(subscribeContextRequest.Entities) == 1 {
if subscribeContextRequest.Entities[0].ID == "" {
flag = true
}
}
}

return flag
}

type SubscriptionRequest struct {
Attributes []string `json:"attributes,omitempty"`
Subscriber Subscriber
Expand Down Expand Up @@ -1375,7 +1387,7 @@ type ConfigCommand struct {
CorrelatorID string `json:"correlatorID"`
}

//To handle RegisterContextRequest coming from IoT Agent
// To handle RegisterContextRequest coming from IoT Agent
type RegisterContextRequest1 struct {
ContextRegistrations []ContextRegistration1 `json:"contextRegistrations,omitempty"`
Duration string `json:"duration,omitempty"`
Expand Down Expand Up @@ -1441,9 +1453,7 @@ type LDNotifyContextRequest struct {
NotifyAt string `json:"notifiedAt,omitempty"`
}

//
// the part to deal with NGSI v1 update supported by Orion Context Broker
//
func (element *ContextElement) SetEntityID() {
if element.ID != "" {
element.Entity.ID = element.ID
Expand Down
Loading
Loading