Skip to content

Commit

Permalink
Merge pull request #402 from flaviocirillo/fogflow-fcfork
Browse files Browse the repository at this point in the history
Loop avoidance and minimize subscription
  • Loading branch information
smartfog authored Sep 17, 2024
2 parents 2acdb0e + e10ec64 commit 2feca88
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 82 deletions.
9 changes: 9 additions & 0 deletions broker/ngsild.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (tb *ThinBroker) NGSILD_UpdateContext(w rest.ResponseWriter, r *rest.Reques

// DEBUG.Println(updateCtxReq)

// check and add the "Fiware-Correlator" header into the update message
updateCtxReq.Correlator = r.Header.Get("Fiware-Correlator")

if numUpdates > 0 {
tb.handleInternalUpdateContext(&updateCtxReq)
}
Expand All @@ -46,6 +49,9 @@ func (tb *ThinBroker) NGSILD_CreateEntity(w rest.ResponseWriter, r *rest.Request
updateCtxReq := UpdateContextRequest{}
numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert)

// check and add the "Fiware-Correlator" header into the update message
updateCtxReq.Correlator = r.Header.Get("Fiware-Correlator")

// DEBUG.Println(updateCtxReq)

if numUpdates > 0 {
Expand Down Expand Up @@ -153,6 +159,9 @@ func (tb *ThinBroker) NGSILD_NotifyContext(w rest.ResponseWriter, r *rest.Reques
updateCtxReq := UpdateContextRequest{}
numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert)

// check and add the "Fiware-Correlator" header into the update message
updateCtxReq.Correlator = r.Header.Get("Fiware-Correlator")

if numUpdates > 0 {
tb.handleInternalUpdateContext(&updateCtxReq)
}
Expand Down
2 changes: 1 addition & 1 deletion broker/ngsiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (tb *ThinBroker) NGSIV1_UpdateContext(w rest.ResponseWriter, r *rest.Reques
updateCtxReq := UpdateContextRequest{}
err := r.DecodeJsonPayload(&updateCtxReq)
if err != nil {
DEBUG.Println("not able to decode the orion updates")
ERROR.Println("not able to decode the orion updates")
rest.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
30 changes: 22 additions & 8 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type ThinBroker struct {
entityId2Subcriptions map[string][]string
e2sub_lock sync.RWMutex

isDebugEnabled bool

//counter of heartbeat
counter int64
}
Expand Down Expand Up @@ -68,6 +70,8 @@ func (tb *ThinBroker) Start(cfg *Config) {
tb.myProfile.BID = tb.myEntityId
tb.myProfile.MyURL = cfg.GetExternalBrokerURL()

tb.isDebugEnabled = cfg.Logging.DebugEnabled

// register itself to the IoT discovery
tb.registerMyself()
}
Expand Down Expand Up @@ -203,7 +207,9 @@ func (tb *ThinBroker) getEntity(eid string) *ContextElement {
}

func (tb *ThinBroker) deleteEntity(eid string) error {
DEBUG.Println(" TO REMOVE ENTITY ", eid)
if tb.isDebugEnabled {
DEBUG.Println(" TO REMOVE ENTITY ", eid)
}

//remove it from the local entity map
tb.entities_lock.Lock()
Expand Down Expand Up @@ -444,17 +450,21 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri
originator := subscription.Subscriber.Correlator
if correlator != "" && originator != "" && correlator == originator {
beTheSame = true
DEBUG.Println("session ID from producer ", correlator, ", subscriber ", originator)
if tb.isDebugEnabled {
DEBUG.Println("session ID from producer ", correlator, ", subscriber ", originator)
}
}
}
tb.subscriptions_lock.RUnlock()

if beTheSame == true {
DEBUG.Println(" ======= producer and subscriber are the same ===========")
if beTheSame {
if tb.isDebugEnabled {
DEBUG.Println(" ======= producer and subscriber are the same ===========")
}
continue
}

if checkSelectedAttributes == true {
if checkSelectedAttributes {
selectedAttributes := make([]string, 0)

tb.subscriptions_lock.RLock()
Expand Down Expand Up @@ -533,8 +543,10 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
DestinationBroker := subscription.Subscriber.DestinationType
Tenant := subscription.Subscriber.Tenant

if subscription.Subscriber.RequireReliability == true && len(subscription.Subscriber.NotifyCache) > 0 {
DEBUG.Println("resend notify: ", len(subscription.Subscriber.NotifyCache))
if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 {
if tb.isDebugEnabled {
DEBUG.Println("resend notify: ", len(subscription.Subscriber.NotifyCache))
}
for _, pCtxElem := range subscription.Subscriber.NotifyCache {
elements = append(elements, *pCtxElem)
}
Expand All @@ -548,7 +560,9 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
err := postNotifyContext(elements, sid, subscriberURL, DestinationBroker, Tenant, tb.SecurityCfg)

if err != nil {
DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL)
if tb.isDebugEnabled {
DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL)
}

tb.subscriptions_lock.Lock()
if subscription, exist := tb.subscriptions[sid]; exist {
Expand Down
9 changes: 5 additions & 4 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type Config struct {
Location PhysicalLocation `json:"physical_location"`
SiteID string `json:"site_id"`
Logging struct {
Info string `json:"info"`
Protocol string `json:"protocol"`
Errlog string `json:"error"`
Debug string `json:"debug"`
Info string `json:"info"`
Protocol string `json:"protocol"`
Errlog string `json:"error"`
Debug string `json:"debug"`
DebugEnabled bool `json:"debugEnabled"`
} `json:"logging"`
Discovery struct {
HostIP string `json:"host_ip"`
Expand Down
5 changes: 4 additions & 1 deletion discovery/fastDiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type FastDiscovery struct {
delayStoreOnFile int
storeOnDisk bool

isDebugEnabled bool

// lock to control the update subscriptions in database
subscriptionsDbLock sync.RWMutex
storeSubscriptionsOnFileScheduled bool
Expand All @@ -72,6 +74,7 @@ func (fd *FastDiscovery) Init(config *Config) {
fd.storeSubscriptionsOnFileScheduled = false
fd.storeBrokersOnFileScheduled = false
fd.storeOnDisk = config.Discovery.StoreOnDisk
fd.isDebugEnabled = config.Logging.DebugEnabled
//INFO.Println("config.Discovery.DelayStoreRegistrationsOnFile ", config.Discovery.DelayStoreStoreOnFile)
fd.delayStoreOnFile = config.Discovery.DelayStoreOnFile

Expand Down Expand Up @@ -281,7 +284,7 @@ func (fd *FastDiscovery) SubscribeContextAvailability(w rest.ResponseWriter, r *
go fd.handleSubscribeCtxAvailability(&subscribeCtxAvailabilityReq)
}

//receive updateContextAvailability for subscription
// receive updateContextAvailability for subscription
func (fd *FastDiscovery) UpdateLDContextAvailability(w rest.ResponseWriter, r *rest.Request) {
sid := r.PathParam("sid")
subscribeCtxAvailabilityReq := SubscribeContextAvailabilityRequest{}
Expand Down
81 changes: 39 additions & 42 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Master struct {
prevNumOfTask int
counter_lock sync.RWMutex

isDebugEnabled bool

//type of subscribed entities
subID2Type map[string]string
}
Expand All @@ -70,6 +72,8 @@ func (master *Master) Start(configuration *Config) {
master.discoveryURL = configuration.GetDiscoveryURL()
master.designerURL = configuration.GetDesignerURL()

master.isDebugEnabled = configuration.Logging.DebugEnabled

master.workers = make(map[string]*WorkerProfile)

master.operatorList = make(map[string]Operator)
Expand Down Expand Up @@ -154,7 +158,7 @@ func (master *Master) onTimer() {
master.workerList_lock.Lock()
for workerID, worker := range master.workers {
duration := master.cfg.Worker.HeartbeatInterval * master.cfg.Worker.DetectionDuration
if worker.IsLive(duration) == false {
if worker.IsLive(duration) {
delete(master.workers, workerID)
INFO.Println("REMOVE worker " + workerID + " from the list")
}
Expand Down Expand Up @@ -265,43 +269,43 @@ func (master *Master) contextRegistration2EntityRegistration(entityId *EntityId,
return &entityRegistration
}

func (master *Master) contextRegistration2EntityRegistration_tbd(entityId *EntityId, ctxRegistration *ContextRegistration) *EntityRegistration {
entityRegistration := EntityRegistration{}
// func (master *Master) contextRegistration2EntityRegistration_tbd(entityId *EntityId, ctxRegistration *ContextRegistration) *EntityRegistration {
// entityRegistration := EntityRegistration{}

ctxObj := master.RetrieveContextEntity(entityId.ID)
if ctxObj == nil {
entityRegistration.ID = entityId.ID
entityRegistration.Type = entityId.Type
// ctxObj := master.RetrieveContextEntity(entityId.ID)
// if ctxObj == nil {
// entityRegistration.ID = entityId.ID
// entityRegistration.Type = entityId.Type

entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute)
entityRegistration.MetadataList = make(map[string]ContextMetadata)
} else {
entityRegistration.ID = ctxObj.Entity.ID
entityRegistration.Type = ctxObj.Entity.Type
// entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute)
// entityRegistration.MetadataList = make(map[string]ContextMetadata)
// } else {
// entityRegistration.ID = ctxObj.Entity.ID
// entityRegistration.Type = ctxObj.Entity.Type

entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute)
for attrName, attrValue := range ctxObj.Attributes {
attributeRegistration := ContextRegistrationAttribute{}
attributeRegistration.Name = attrName
attributeRegistration.Type = attrValue.Type
entityRegistration.AttributesList[attrName] = attributeRegistration
}
// entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute)
// for attrName, attrValue := range ctxObj.Attributes {
// attributeRegistration := ContextRegistrationAttribute{}
// attributeRegistration.Name = attrName
// attributeRegistration.Type = attrValue.Type
// entityRegistration.AttributesList[attrName] = attributeRegistration
// }

entityRegistration.MetadataList = make(map[string]ContextMetadata)
for metaname, ctxmeta := range ctxObj.Metadata {
cm := ContextMetadata{}
cm.Name = metaname
cm.Type = ctxmeta.Type
cm.Value = ctxmeta.Value
// entityRegistration.MetadataList = make(map[string]ContextMetadata)
// for metaname, ctxmeta := range ctxObj.Metadata {
// cm := ContextMetadata{}
// cm.Name = metaname
// cm.Type = ctxmeta.Type
// cm.Value = ctxmeta.Value

entityRegistration.MetadataList[metaname] = cm
}
}
// entityRegistration.MetadataList[metaname] = cm
// }
// }

entityRegistration.ProvidingApplication = ctxRegistration.ProvidingApplication
// entityRegistration.ProvidingApplication = ctxRegistration.ProvidingApplication

return &entityRegistration
}
// return &entityRegistration
// }

func (master *Master) subscribeContextAvailability(availabilitySubscription *SubscribeContextAvailabilityRequest) string {
availabilitySubscription.Reference = master.myURL + "/notifyContextAvailability"
Expand All @@ -324,9 +328,7 @@ func (master *Master) unsubscribeContextAvailability(sid string) {
}
}

//
// to deal with the communication between master and workers via rabbitmq
//
func (master *Master) Process(msg *RecvMessage) error {
switch msg.Type {
case "WORKER_JOIN":
Expand Down Expand Up @@ -467,9 +469,7 @@ func (master *Master) RemoveInputEntity(flowInfo FlowInfo) {
master.communicator.Publish(&taskMsg)
}

//
// the shared functions for function manager and topology manager to call
//
func (master *Master) RetrieveContextEntity(eid string) *ContextObject {
query := QueryContextRequest{}

Expand Down Expand Up @@ -507,9 +507,7 @@ func (master *Master) GetStatus(w rest.ResponseWriter, r *rest.Request) {
w.WriteJson(profile)
}

//
// to select the worker that is closest to the given points
//
func (master *Master) SelectWorker(locations []Point) string {
master.workerList_lock.RLock()
defer master.workerList_lock.RUnlock()
Expand All @@ -527,7 +525,10 @@ func (master *Master) SelectWorker(locations []Point) string {
closestTotalDistance := uint64(math.MaxUint64)
for _, worker := range master.workers {
// if this worker is already overloaded, check the next one
if worker.IsOverloaded() == true {
if worker.IsOverloaded() {
master.isDebugEnabled {
DEBUG.Println("Worker", worker.WID, " has reached its capacity of ", worker.Capacity, " with ", worker.Workload, " tasks running")
}
continue
}

Expand Down Expand Up @@ -555,9 +556,7 @@ func (master *Master) SelectWorker(locations []Point) string {
return closestWorkerID
}

//
// query the topology from Designer based on the given name
//
func (master *Master) getTopologyByName(name string) *Topology {
designerURL := fmt.Sprintf("%s/topology/%s", master.cfg.GetDesignerURL(), name)
fmt.Println(designerURL)
Expand Down Expand Up @@ -598,9 +597,7 @@ func (master *Master) getTopologyByName(name string) *Topology {
return &topology
}

//
// to select the right docker image of an operator for the selected worker
//
func (master *Master) DetermineDockerImage(operatorName string, wID string) string {
master.workerList_lock.RLock()
wProfile := master.workers[wID]
Expand Down
Loading

0 comments on commit 2feca88

Please sign in to comment.