Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instantiate Device Plugin #2979

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 129 additions & 3 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"runtime"
"strconv"
"strings"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/Azure/azure-container-networking/cns/cnireconciler"
"github.com/Azure/azure-container-networking/cns/common"
"github.com/Azure/azure-container-networking/cns/configuration"
"github.com/Azure/azure-container-networking/cns/deviceplugin"
"github.com/Azure/azure-container-networking/cns/fsnotify"
"github.com/Azure/azure-container-networking/cns/grpc"
"github.com/Azure/azure-container-networking/cns/healthserver"
Expand Down Expand Up @@ -102,9 +104,11 @@ const (
// envVarEnableCNIConflistGeneration enables cni conflist generation if set (value doesn't matter)
envVarEnableCNIConflistGeneration = "CNS_ENABLE_CNI_CONFLIST_GENERATION"

cnsReqTimeout = 15 * time.Second
defaultLocalServerIP = "localhost"
defaultLocalServerPort = "10090"
cnsReqTimeout = 15 * time.Second
defaultLocalServerIP = "localhost"
defaultLocalServerPort = "10090"
defaultDevicePluginRetryInterval = 2 * time.Second
defaultNodeInfoCRDPollInterval = 5 * time.Second
)

type cniConflistScenario string
Expand Down Expand Up @@ -880,6 +884,43 @@ func main() {
}
}

if cnsconfig.EnableSwiftV2 && cnsconfig.EnableK8sDevicePlugin {
initialVnetNICCount := 0
initialIBNICCount := 0
aggarwal0009 marked this conversation as resolved.
Show resolved Hide resolved
// Create device plugin manager instance
pluginManager := deviceplugin.NewPluginManager(z)
pluginManager.AddPlugin(mtv1alpha1.DeviceTypeVnetNIC, initialVnetNICCount)
pluginManager.AddPlugin(mtv1alpha1.DeviceTypeInfiniBandNIC, initialIBNICCount)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start device plugin manager in a separate goroutine
go func() {
for {
timraymond marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ctx.Done():
z.Info("Context canceled, stopping plugin manager")
return
default:
if pluginErr := pluginManager.Run(ctx); pluginErr != nil {
z.Error("plugin manager exited with error", zap.Error(pluginErr))
time.Sleep(defaultDevicePluginRetryInterval)
} else {
return
}
timraymond marked this conversation as resolved.
Show resolved Hide resolved
}
}
}()

// go routine to poll node info crd and update device counts
go func() {
if pollErr := pollNodeInfoCRDAndUpdatePlugin(ctx, z, pluginManager); pollErr != nil {
z.Error("Error in pollNodeInfoCRDAndUpdatePlugin", zap.Error(pollErr))
}
}()
}

// Conditionally initialize and start the gRPC server
if cnsconfig.GRPCSettings.Enable {
// Define gRPC server settings
Expand Down Expand Up @@ -1037,6 +1078,91 @@ func main() {
logger.Close()
}

// Poll CRD until it's set and update PluginManager
func pollNodeInfoCRDAndUpdatePlugin(ctx context.Context, zlog *zap.Logger, pluginManager *deviceplugin.PluginManager) error {
kubeConfig, err := ctrl.GetConfig()
if err != nil {
logger.Errorf("Failed to get kubeconfig for request controller: %v", err)
return errors.Wrap(err, "failed to get kubeconfig")
}
kubeConfig.UserAgent = "azure-cns-" + version

clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return errors.Wrap(err, "failed to build clientset")
}

nodeName, err := configuration.NodeName()
if err != nil {
return errors.Wrap(err, "failed to get NodeName")
}

node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get node %s", nodeName)
}

// check the Node labels for Swift V2
if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; !ok {
zlog.Info("Node is not labeled for Swift V2, skipping polling nodeinfo crd")
return nil
}

directcli, err := client.New(kubeConfig, client.Options{Scheme: multitenancy.Scheme})
if err != nil {
return errors.Wrap(err, "failed to create ctrl client")
}

nodeInfoCli := multitenancy.NodeInfoClient{
Cli: directcli,
}

for {
select {
case <-ctx.Done():
zlog.Info("Polling context canceled, exiting")
return nil
default:
// Fetch the CRD status
nodeInfo, err := nodeInfoCli.Get(ctx, node.Name)
if err != nil {
zlog.Error("Error fetching nodeinfo CRD", zap.Error(err))
return errors.Wrap(err, "failed to get nodeinfo crd")
}

// Check if the status is set
if !reflect.DeepEqual(nodeInfo.Status, mtv1alpha1.NodeInfoStatus{}) && len(nodeInfo.Status.DeviceInfos) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github.com/google/go-cmp/cmp.Equal is better than reflect.DeepEqual for a lot of reasons. It's a drop-in replacement, and it does the right thing more consistently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this recommendation prior to learning that cmp was intended for tests only. Given that the intention here is to test whether this type is a zero value, I believe it should be sufficient to test whether nodeInfo.Status.DeviceInfos != nil && len(nodeInfo.Status.DeviceInfos) > 0. Second opinion might be nice though. @rbtr ?

// Create a map to count devices by type
deviceCounts := map[mtv1alpha1.DeviceType]int{
mtv1alpha1.DeviceTypeVnetNIC: 0,
mtv1alpha1.DeviceTypeInfiniBandNIC: 0,
}

// Aggregate device counts from the CRD
for _, deviceInfo := range nodeInfo.Status.DeviceInfos {
switch deviceInfo.DeviceType {
case mtv1alpha1.DeviceTypeVnetNIC, mtv1alpha1.DeviceTypeInfiniBandNIC:
deviceCounts[deviceInfo.DeviceType]++
default:
zlog.Error("Unknown device type", zap.String("deviceType", string(deviceInfo.DeviceType)))
}
}

// Update the plugin manager with device counts
for deviceType, count := range deviceCounts {
pluginManager.TrackDevices(deviceType, count)
}

// Exit polling loop once the CRD status is successfully processed
return nil
}

// Wait before polling again
time.Sleep(defaultNodeInfoCRDPollInterval)
timraymond marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) error {
var multiTenantController multitenantcontroller.RequestController
kubeConfig, err := ctrl.GetConfig()
Expand Down
9 changes: 9 additions & 0 deletions crd/multitenancy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,12 @@ func (n *NodeInfoClient) CreateOrUpdate(ctx context.Context, nodeInfo *v1alpha1.
}
return nil
}

// Get retrieves the NodeInfo CRD by name.
func (n *NodeInfoClient) Get(ctx context.Context, name string) (*v1alpha1.NodeInfo, error) {
var nodeInfo v1alpha1.NodeInfo
if err := n.Cli.Get(ctx, client.ObjectKey{Name: name}, &nodeInfo); err != nil {
return nil, errors.Wrap(err, "error getting nodeinfo crd")
}
return &nodeInfo, nil
}
Loading