Skip to content

Commit

Permalink
Merge pull request #406 from flaviocirillo/fogflow-fcfork
Browse files Browse the repository at this point in the history
Manage DEBUG logs more intelligently. Possibility to discard to impro…
  • Loading branch information
smartfog authored Sep 26, 2024
2 parents 9be1751 + 671b715 commit be9b523
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 63 deletions.
8 changes: 6 additions & 2 deletions broker/ngsiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
subReq := SubscribeContextRequest{}
subReq.Attributes = make([]string, 0)

DEBUG.Println("Subscription request from: ", r.RemoteAddr)
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("Subscription request from: ", r.RemoteAddr)
}

err := r.DecodeJsonPayload(&subReq)
if err != nil {
Expand Down Expand Up @@ -255,7 +257,9 @@ func (tb *ThinBroker) NGSIV1_NotifyContextAvailability(w rest.ResponseWriter, r
tb.subLinks_lock.Lock()
mainSubID, exist := tb.availabilitySub2MainSub[subID]
if !exist {
DEBUG.Println("put it into the tempCache and handle it later")
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("put it into the tempCache and handle it later")
}
tb.tmpNGSI9NotifyCache[subID] = &notifyContextAvailabilityReq
}
tb.subLinks_lock.Unlock()
Expand Down
18 changes: 7 additions & 11 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type ThinBroker struct {
entityId2Subcriptions map[string][]string
e2sub_lock sync.RWMutex

isDebugEnabled bool

//counter of heartbeat
counter int64
}
Expand Down Expand Up @@ -70,8 +68,6 @@ func (tb *ThinBroker) Start(cfg *Config) {
tb.myProfile.BID = tb.myEntityId
tb.myProfile.MyURL = cfg.GetExternalBrokerURL()

tb.isDebugEnabled = cfg.Logging.DebugEnabled

// register itself to the IoT discovery
tb.registerMyself()
}
Expand Down Expand Up @@ -207,7 +203,7 @@ func (tb *ThinBroker) getEntity(eid string) *ContextElement {
}

func (tb *ThinBroker) deleteEntity(eid string) error {
if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println(" TO REMOVE ENTITY ", eid)
}

Expand Down Expand Up @@ -450,15 +446,15 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri
originator := subscription.Subscriber.Correlator
if correlator != "" && originator != "" && correlator == originator {
beTheSame = true
if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("session ID from producer ", correlator, ", subscriber ", originator)
}
}
}
tb.subscriptions_lock.RUnlock()

if beTheSame {
if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println(" ======= producer and subscriber are the same ===========")
}
continue
Expand Down Expand Up @@ -544,7 +540,7 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
Tenant := subscription.Subscriber.Tenant

if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 {
if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("resend notify: ", len(subscription.Subscriber.NotifyCache))
}
for _, pCtxElem := range subscription.Subscriber.NotifyCache {
Expand All @@ -560,7 +556,7 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
err := postNotifyContext(elements, sid, subscriberURL, DestinationBroker, Tenant, tb.SecurityCfg)

if err != nil {
if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL)
}

Expand Down Expand Up @@ -685,7 +681,7 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
action = "DELETE"
}

if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println(action, " subID ", mainSubID, " subscription isSimpleByType ", tb.subscriptions[mainSubID].IsSimpleByType())
DEBUG.Println(tb.entityId2Subcriptions)
}
Expand All @@ -694,7 +690,7 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
registration := registrationResp.ContextRegistration
for _, eid := range registration.EntityIdList {

if tb.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("===> ", eid, " , ", mainSubID)
}

Expand Down
4 changes: 3 additions & 1 deletion common/datamodel/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,9 @@ func (worker *WorkerProfile) IsLive(duration int) bool {
delta := time.Since(worker.Last_Heartbeat_Update)

if int(delta.Seconds()) >= duration {
DEBUG.Println("Worker delta failed ", int(delta.Seconds()), " while duration is ", duration)
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("Worker delta failed ", int(delta.Seconds()), " while duration is ", duration)
}
return false
} else {
return true
Expand Down
8 changes: 7 additions & 1 deletion common/ngsi/ngsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ var (
DEBUG *log.Logger
)

func LoggerIsEnabled(l *log.Logger) bool {
return (fmt.Sprintf("%T", l.Writer()) != "io.discard")
}

type SiteInfo struct {
ExternalAddress string `json:"externalAddress"`
GeohashID string `json:"geohashID"`
Expand Down Expand Up @@ -786,7 +790,9 @@ func (restriction *Restriction) GetScope() OperationScope {
func (restriction *Restriction) GetNearbyFilter() *NearBy {
for _, scope := range restriction.Scopes {

DEBUG.Println(" SCOPE: ", scope)
if LoggerIsEnabled(DEBUG) {
DEBUG.Println(" SCOPE: ", scope)
}

if scope.Type == "nearby" {
nearby := scope.Value.(NearBy)
Expand Down
16 changes: 9 additions & 7 deletions discovery/fastDiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -50,8 +51,6 @@ type FastDiscovery struct {
delayStoreOnFile int
storeOnDisk bool

isDebugEnabled bool

// lock to control the update subscriptions in database
subscriptionsDbLock sync.RWMutex
storeSubscriptionsOnFileScheduled bool
Expand All @@ -74,7 +73,6 @@ func (fd *FastDiscovery) Init(config *Config) {
fd.storeSubscriptionsOnFileScheduled = false
fd.storeBrokersOnFileScheduled = false
fd.storeOnDisk = config.Discovery.StoreOnDisk
fd.isDebugEnabled = config.Logging.DebugEnabled
//INFO.Println("config.Discovery.DelayStoreRegistrationsOnFile ", config.Discovery.DelayStoreStoreOnFile)
fd.delayStoreOnFile = config.Discovery.DelayStoreOnFile

Expand Down Expand Up @@ -600,7 +598,9 @@ func (fd *FastDiscovery) updateSubscriptionsOnDisk() {
// somebody has the lock

if fd.storeSubscriptionsOnFileScheduled {
INFO.Println("A store on file for registrations is already scheduled")
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("A store on file for registrations is already scheduled")
}
return
}

Expand Down Expand Up @@ -665,7 +665,9 @@ func (fd *FastDiscovery) updateBrokersOnDisk() {
// somebody has the lock

if fd.storeBrokersOnFileScheduled {
INFO.Println("A store on file for registrations is already scheduled")
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("A store on file for registrations is already scheduled")
}
return
}

Expand All @@ -690,7 +692,7 @@ func (fd *FastDiscovery) updateBrokersOnDisk() {
ERROR.Println(err)
}
// err = ioutil.WriteFile("brokers.json", content, 0644)
err = ioutil.WriteFile(fd.dbFiles["brokers"], content, 0644)
err = os.WriteFile(fd.dbFiles["brokers"], content, 0644)
if err != nil {
ERROR.Println(err)
}
Expand All @@ -707,7 +709,7 @@ func (fd *FastDiscovery) readBrokersFromDisk() {
defer fd.brokersDbLock.Unlock()

// content, err := ioutil.ReadFile("brokers.json")
content, err := ioutil.ReadFile(fd.dbFiles["brokers"])
content, err := os.ReadFile(fd.dbFiles["brokers"])
if err != nil {
ERROR.Println(err)
}
Expand Down
6 changes: 1 addition & 5 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ type Master struct {
prevNumOfTask int
counter_lock sync.RWMutex

isDebugEnabled bool

//type of subscribed entities
subID2Type map[string]string
}
Expand All @@ -72,8 +70,6 @@ func (master *Master) Start(configuration *Config) {
master.discoveryURL = configuration.GetDiscoveryURL()
master.designerURL = configuration.GetDesignerURL()

master.isDebugEnabled = configuration.Logging.DebugEnabled

master.workers = make(map[string]*WorkerProfile)

master.operatorList = make(map[string]Operator)
Expand Down Expand Up @@ -526,7 +522,7 @@ func (master *Master) SelectWorker(locations []Point) string {
for _, worker := range master.workers {
// if this worker is already overloaded, check the next one
if worker.IsOverloaded() {
if master.isDebugEnabled {
if LoggerIsEnabled(DEBUG) {
DEBUG.Println("Worker", worker.WID, " has reached its capacity of ", worker.Capacity, " with ", worker.Workload, " tasks running")
}
continue
Expand Down
Loading

0 comments on commit be9b523

Please sign in to comment.