Skip to content

Commit

Permalink
fix(evpn-bridge): added resourceVer check and unit test fix
Browse files Browse the repository at this point in the history
Signed-off-by: Saikumar, Banoth <[email protected]>
  • Loading branch information
Inbanoth committed Apr 18, 2024
1 parent 1ee4c48 commit dd31062
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 34 deletions.
15 changes: 15 additions & 0 deletions pkg/LinuxCIModule/lci.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ func handlebp(objectData *eventbus.ObjectData) {
log.Printf("LCI : GetBP error: %s\n", err)
return
}
if objectData.ResourceVersion != BP.ResourceVersion {
log.Printf("LVM: Mismatch in resoruce version %+v\n and bp resource version %+v\n", objectData.ResourceVersion, BP.ResourceVersion)
comp.Name = lciComp
comp.CompStatus = common.ComponentStatusError
if comp.Timer == 0 {
comp.Timer = 2 * time.Second
} else {
comp.Timer *= 2
}
err := infradb.UpdateBPStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp)
if err != nil {
log.Printf("error in updating bp status: %s\n", err)
}
return
}
if len(BP.Status.Components) != 0 {
for i := 0; i < len(BP.Status.Components); i++ {
if BP.Status.Components[i].Name == "lci" {
Expand Down
27 changes: 18 additions & 9 deletions pkg/LinuxGeneralModule/lgm.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,25 @@ func (h *ModulelgmHandler) HandleEvent(eventType string, objectData *eventbus.Ob
func handleLB(objectData *eventbus.ObjectData) {
var comp common.Component
lb, err := infradb.GetLB(objectData.Name)
if err == nil {
log.Printf("LGM : GetLB Name: %s\n", lb.Name)
} else {
if err != nil {
log.Printf("LGM: GetLB error: %s %s\n", err, objectData.Name)
return
}
if objectData.ResourceVersion != lb.ResourceVersion {
log.Printf("LGM: Mismatch in resoruce version %+v\n and lb resource version %+v\n", objectData.ResourceVersion, lb.ResourceVersion)
comp.Name = lgmComp
comp.CompStatus = common.ComponentStatusError
if comp.Timer == 0 {
comp.Timer = 2 * time.Second
} else {
comp.Timer *= 2
}
err := infradb.UpdateLBStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp)
if err != nil {
log.Printf("error in updating lb status: %s\n", err)
}
return
}
if len(lb.Status.Components) != 0 {
for i := 0; i < len(lb.Status.Components); i++ {
if lb.Status.Components[i].Name == lgmComp {
Expand Down Expand Up @@ -148,9 +161,7 @@ func handleLB(objectData *eventbus.ObjectData) {
func handlesvi(objectData *eventbus.ObjectData) {
var comp common.Component
svi, err := infradb.GetSvi(objectData.Name)
if err == nil {
log.Printf("LGM : GetSvi Name: %s\n", svi.Name)
} else {
if err != nil {
log.Printf("LGM: GetSvi error: %s %s\n", err, objectData.Name)
return
}
Expand Down Expand Up @@ -221,9 +232,7 @@ func handlesvi(objectData *eventbus.ObjectData) {
func handlevrf(objectData *eventbus.ObjectData) {
var comp common.Component
vrf, err := infradb.GetVrf(objectData.Name)
if err == nil {
log.Printf("LGM : GetVRF Name: %s\n", vrf.Name)
} else {
if err != nil {
log.Printf("LGM: GetVRF error: %s %s\n", err, objectData.Name)
return
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/LinuxVendorModule/ipu/ipu.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ func handlebp(objectData *eventbus.ObjectData) {
log.Printf("LVM : GetBP error: %s\n", err)
return
}
if objectData.ResourceVersion != bp.ResourceVersion {
log.Printf("LVM: Mismatch in resoruce version %+v\n and bp resource version %+v\n", objectData.ResourceVersion, bp.ResourceVersion)
comp.Name = lvmComp
comp.CompStatus = common.ComponentStatusError
if comp.Timer == 0 {
comp.Timer = 2 * time.Second
} else {
comp.Timer *= 2
}
err := infradb.UpdateBPStatus(objectData.Name, objectData.ResourceVersion, objectData.NotificationID, nil, comp)
if err != nil {
log.Printf("error in updating bp status: %s\n", err)
}
return
}
if len(bp.Status.Components) != 0 {
for i := 0; i < len(bp.Status.Components); i++ {
if bp.Status.Components[i].Name == lvmComp {
Expand Down
6 changes: 6 additions & 0 deletions pkg/bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ var (
Len: 24,
},
},
Status: &pb.LogicalBridgeStatus{
OperStatus: pb.LBOperStatus_LB_OPER_STATUS_DOWN,
Components: []*pb.Component{{Name: "dummy", Status: pb.CompStatus_COMP_STATUS_PENDING}},
},
}
testLogicalBridgeWithStatus = pb.LogicalBridge{
Name: testLogicalBridgeName,
Spec: testLogicalBridge.Spec,
Status: &pb.LogicalBridgeStatus{
OperStatus: pb.LBOperStatus_LB_OPER_STATUS_DOWN,
Components: []*pb.Component{{Name: "dummy", Status: pb.CompStatus_COMP_STATUS_PENDING}},
},
}
)
Expand Down Expand Up @@ -118,6 +123,7 @@ func Test_CreateLogicalBridge(t *testing.T) {
},
Status: &pb.LogicalBridgeStatus{
OperStatus: pb.LBOperStatus_LB_OPER_STATUS_DOWN,
Components: []*pb.Component{{Name: "dummy", Status: pb.CompStatus_COMP_STATUS_PENDING}},
},
},
errCode: codes.OK,
Expand Down
3 changes: 3 additions & 0 deletions pkg/bridge/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
pb "github.com/opiproject/opi-api/network/evpn-gw/v1alpha1/gen/go"

"github.com/opiproject/opi-evpn-bridge/pkg/infradb"
"github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus"
"github.com/opiproject/opi-evpn-bridge/pkg/utils/mocks"
)

Expand Down Expand Up @@ -144,6 +145,8 @@ func newTestEnv(ctx context.Context, t *testing.T) *testEnv {
env.mockNetlink = mocks.NewNetlink(t)
env.mockFrr = mocks.NewFrr(t)
env.opi = NewServer()
eb := eventbus.EBus
eb.StartSubscriber("dummy", "logical-bridge", 1, nil)
_ = infradb.NewInfraDB("", "gomap")
conn, err := grpc.DialContext(ctx,
"",
Expand Down
4 changes: 0 additions & 4 deletions pkg/frr/frr.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func handlesvi(objectData *eventbus.ObjectData) {
return
}

log.Printf("FRR :GetSvi Name: %s\n", svi.Name)

if objectData.ResourceVersion != svi.ResourceVersion {
log.Printf("FRR: Mismatch in resoruce version %+v\n and svi resource version %+v\n", objectData.ResourceVersion, svi.ResourceVersion)
comp.Name = frrComp
Expand Down Expand Up @@ -132,8 +130,6 @@ func handlevrf(objectData *eventbus.ObjectData) {
return
}

log.Printf("FRR :GetVRF Name: %s\n", vrf.Name)

if len(vrf.Status.Components) != 0 {
for i := 0; i < len(vrf.Status.Components); i++ {
if vrf.Status.Components[i].Name == frrComp {
Expand Down
12 changes: 12 additions & 0 deletions pkg/infradb/infradb.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func CreateLB(lb *LogicalBridge) error {
subscribers := eventbus.EBus.GetSubscribers("logical-bridge")
if subscribers == nil {
log.Println("CreateLB(): No subscribers for Logical Bridge objects")
return errors.New("no subscribers found for logical bridge")
}

log.Printf("CreateLB(): Create Logical Bridge: %+v\n", lb)
Expand Down Expand Up @@ -144,6 +145,7 @@ func DeleteLB(name string) error {
subscribers := eventbus.EBus.GetSubscribers("logical-bridge")
if subscribers == nil {
log.Println("DeleteLB(): No subscribers for Logical Bridge objects")
return errors.New("no subscribers found for logical bridge")
}

lb := LogicalBridge{}
Expand Down Expand Up @@ -242,6 +244,7 @@ func UpdateLB(lb *LogicalBridge) error {
subscribers := eventbus.EBus.GetSubscribers("logical-bridge")
if subscribers == nil {
log.Println("UpdateLB(): No subscribers for Logical Bridge objects")
return errors.New("no subscribers found for logical bridge")
}

err := infradb.client.Set(lb.Name, lb)
Expand Down Expand Up @@ -386,6 +389,7 @@ func CreateBP(bp *BridgePort) error {
subscribers := eventbus.EBus.GetSubscribers("bridge-port")
if subscribers == nil {
log.Println("CreateBP(): No subscribers for Bridge Port objects")
return errors.New("no subscribers found for bridge port")
}

// Dimitris: Do I need to add here a check for MAC uniquness in of BP ?
Expand Down Expand Up @@ -481,6 +485,7 @@ func DeleteBP(name string) error {
subscribers := eventbus.EBus.GetSubscribers("bridge-port")
if subscribers == nil {
log.Println("DeleteBP(): No subscribers for Bridge Port objects")
return errors.New("no subscribers found for bridge port")
}

bp := BridgePort{}
Expand Down Expand Up @@ -571,6 +576,7 @@ func UpdateBP(bp *BridgePort) error {
subscribers := eventbus.EBus.GetSubscribers("bridge-port")
if subscribers == nil {
log.Println("UpdateBP(): No subscribers for Bridge Port objects")
return errors.New("no subscribers found for bridge port")
}

err := infradb.client.Set(bp.Name, bp)
Expand Down Expand Up @@ -726,6 +732,7 @@ func CreateVrf(vrf *Vrf) error {
subscribers := eventbus.EBus.GetSubscribers("vrf")
if subscribers == nil {
log.Println("CreateVrf(): No subscribers for Vrf objects")
return errors.New("no subscribers found for vrf")
}

log.Printf("CreateVrf(): Create Vrf: %+v\n", vrf)
Expand Down Expand Up @@ -798,6 +805,7 @@ func DeleteVrf(name string) error {
subscribers := eventbus.EBus.GetSubscribers("vrf")
if subscribers == nil {
log.Println("DeleteVrf(): No subscribers for Vrf objects")
return errors.New("no subscribers found for vrf")
}

vrf := Vrf{}
Expand Down Expand Up @@ -890,6 +898,7 @@ func UpdateVrf(vrf *Vrf) error {
subscribers := eventbus.EBus.GetSubscribers("vrf")
if subscribers == nil {
log.Println("CreateVrf(): No subscribers for Vrf objects")
return errors.New("no subscribers found for vrf")
}

err := infradb.client.Set(vrf.Name, vrf)
Expand Down Expand Up @@ -1035,6 +1044,7 @@ func CreateSvi(svi *Svi) error {
subscribers := eventbus.EBus.GetSubscribers("svi")
if subscribers == nil {
log.Println("CreateSvi(): No subscribers for SVI objects")
return errors.New("no subscribers found for svi")
}

log.Printf("CreateSvi(): Create SVI: %+v\n", svi)
Expand Down Expand Up @@ -1125,6 +1135,7 @@ func DeleteSvi(name string) error {
subscribers := eventbus.EBus.GetSubscribers("svi")
if subscribers == nil {
log.Println("DeleteSvi(): No subscribers for SVI objects")
return errors.New("no subscribers found for svi")
}

svi := Svi{}
Expand Down Expand Up @@ -1212,6 +1223,7 @@ func UpdateSvi(svi *Svi) error {
subscribers := eventbus.EBus.GetSubscribers("svi")
if subscribers == nil {
log.Println("UpdateSvi(): No subscribers for SVI objects")
return errors.New("no subscribers found for svi")
}

err := infradb.client.Set(svi.Name, svi)
Expand Down
56 changes: 35 additions & 21 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,33 @@ type ObjectData struct {

// StartSubscriber will be called by the modules to initialize and start listening for events
func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, eventHandler EventHandler) {
subscriber := e.Subscribe(moduleName, eventType, priority, eventHandler)

go func() {
for {
select {
case event := <-subscriber.Ch:
log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType)

handlerKey := moduleName + "." + eventType
if handler, ok := e.eventHandlers[handlerKey]; ok {
if objectData, ok := event.(*ObjectData); ok {
handler.HandleEvent(eventType, objectData)
if !e.subscriberExist(eventType, moduleName) {
subscriber := e.Subscribe(moduleName, eventType, priority, eventHandler)

go func() {
for {
select {
case event := <-subscriber.Ch:
log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType)

handlerKey := moduleName + "." + eventType
if handler, ok := e.eventHandlers[handlerKey]; ok {
if objectData, ok := event.(*ObjectData); ok {
handler.HandleEvent(eventType, objectData)
} else {
subscriber.Ch <- "error: unexpected event type"
}
// handler.HandleEvent(eventType, event)
} else {
subscriber.Ch <- "error: unexpected event type"
subscriber.Ch <- "error: no event handler found"
}
// handler.HandleEvent(eventType, event)
} else {
subscriber.Ch <- "error: no event handler found"
case <-subscriber.Quit:
close(subscriber.Ch)
return
}
case <-subscriber.Quit:
close(subscriber.Ch)
return
}
}
}()
}()
}
}

// NewEventBus initializes ann EventBus object
Expand Down Expand Up @@ -113,6 +115,18 @@ func (e *EventBus) GetSubscribers(eventType string) []*Subscriber {
return e.subscribers[eventType]
}

func (e *EventBus) subscriberExist(eventType string, moduleName string) bool {
subList := e.GetSubscribers(eventType)
if len(subList) != 0 {
for _, s := range subList {
if s.Name == moduleName {
return true
}
}
}
return false
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) {
e.publishL.RLock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/port/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/opiproject/opi-evpn-bridge/pkg/bridge"
"github.com/opiproject/opi-evpn-bridge/pkg/infradb"
"github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus"
"github.com/opiproject/opi-evpn-bridge/pkg/utils/mocks"
)

Expand Down Expand Up @@ -151,6 +152,9 @@ func newTestEnv(ctx context.Context, t *testing.T) *testEnv {
env.mockFrr = mocks.NewFrr(t)
env.opi = NewServer()
env.lbServer = bridge.NewServer()
eb := eventbus.EBus
eb.StartSubscriber("dummy", "logical-bridge", 1, nil)
eb.StartSubscriber("dummy", "bridge-port", 1, nil)
_ = infradb.NewInfraDB("", "gomap")
conn, err := grpc.DialContext(ctx,
"",
Expand Down
5 changes: 5 additions & 0 deletions pkg/port/port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ var (
Ptype: pb.BridgePortType_BRIDGE_PORT_TYPE_TRUNK,
LogicalBridges: []string{testLogicalBridgeName},
},
Status: &pb.BridgePortStatus{
OperStatus: pb.BPOperStatus_BP_OPER_STATUS_DOWN,
Components: []*pb.Component{{Name: "dummy", Status: pb.CompStatus_COMP_STATUS_PENDING}},
},
}
testBridgePortWithStatus = pb.BridgePort{
Name: testBridgePortName,
Spec: testBridgePort.Spec,
Status: &pb.BridgePortStatus{
OperStatus: pb.BPOperStatus_BP_OPER_STATUS_DOWN,
Components: []*pb.Component{{Name: "dummy", Status: pb.CompStatus_COMP_STATUS_PENDING}},
},
}
)
Expand Down
5 changes: 5 additions & 0 deletions pkg/svi/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/opiproject/opi-evpn-bridge/pkg/bridge"
"github.com/opiproject/opi-evpn-bridge/pkg/infradb"
"github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus"
"github.com/opiproject/opi-evpn-bridge/pkg/utils/mocks"
"github.com/opiproject/opi-evpn-bridge/pkg/vrf"
)
Expand Down Expand Up @@ -179,6 +180,10 @@ func newTestEnv(ctx context.Context, t *testing.T) *testEnv {
env.opi = NewServer()
env.lbServer = bridge.NewServer()
env.vrfServer = vrf.NewServer()
eb := eventbus.EBus
eb.StartSubscriber("dummy", "logical-bridge", 1, nil)
eb.StartSubscriber("dummy", "vrf", 1, nil)
eb.StartSubscriber("dummy", "svi", 1, nil)
_ = infradb.NewInfraDB("", "gomap")
conn, err := grpc.DialContext(ctx,
"",
Expand Down
1 change: 1 addition & 0 deletions pkg/svi/svi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
Spec: testSvi.Spec,
Status: &pb.SviStatus{
OperStatus: pb.SVIOperStatus_SVI_OPER_STATUS_DOWN,
Components: []*pb.Component{{Name: "dummy", Status: pb.CompStatus_COMP_STATUS_PENDING}},
},
}
)
Expand Down
Loading

0 comments on commit dd31062

Please sign in to comment.