Skip to content

Commit

Permalink
feat: Dynamic module discovery in kymastatsreceiver (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
skhalash authored Jul 16, 2024
1 parent 90e6910 commit 62836db
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 64 deletions.
4 changes: 4 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ linters-settings:
alias:
- pkg: "k8s.io/apimachinery/pkg/apis/meta/v1"
alias: metav1
- pkg: "k8s.io/client-go/discovery/fake"
alias: discoveryfake
- pkg: "k8s.io/client-go/dynamic/fake"
alias: dynamicfake
- pkg: "k8s.io/client-go/testing"
alias: clienttesting

Expand Down
19 changes: 19 additions & 0 deletions internal/k8sconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"

"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"

"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -146,3 +147,21 @@ func MakeDynamicClient(apiConf APIConfig) (dynamic.Interface, error) {

return client, nil
}

func MakeDiscoveryClient(apiConf APIConfig) (discovery.DiscoveryInterface, error) {
if err := apiConf.Validate(); err != nil {
return nil, err
}

authConf, err := CreateRestConfig(apiConf)
if err != nil {
return nil, err
}

client, err := discovery.NewDiscoveryClientForConfig(authConf)
if err != nil {
return nil, err
}

return client, nil
}
2 changes: 2 additions & 0 deletions receiver/kymastatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The following settings are required:
- `auth_type` (default = `serviceAccount`): Specifies the authentication method for accessing the Kubernetes API server.
Options include `none` (no authentication), `serviceAccount` (uses the default service account token assigned to the Pod), or `kubeConfig` (uses credentials from `~/.kube/config`).

- `module_groups`: The list of API groups to be used for Kyma module resource discovery.

The following settings are optional:

- `collection_interval` (default = `60s`): The Kyma Stats Receiver monitors Kyma custom resources using the Kubernetes API. It emits the collected metrics only once per collection interval. The `collection_interval` setting determines how frequently these metrics are emitted.
Expand Down
31 changes: 27 additions & 4 deletions receiver/kymastatsreceiver/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package kymastatsreceiver

import (
"errors"

"go.opentelemetry.io/collector/receiver/scraperhelper"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"

"github.com/kyma-project/opentelemetry-collector-components/internal/k8sconfig"
Expand All @@ -14,16 +17,36 @@ type Config struct {
scraperhelper.ControllerConfig `mapstructure:",squash"`
metadata.MetricsBuilderConfig `mapstructure:",squash"`

ModuleGroups []string `mapstructure:"module_groups"`

// Used for unit testing only
makeDynamicClient func() (dynamic.Interface, error)
makeDiscoveryClient func() (discovery.DiscoveryInterface, error)
makeDynamicClient func() (dynamic.Interface, error)
}

var errEmptyModuleGroups = errors.New("empty module groups")

func (cfg *Config) Validate() error {
err := cfg.ControllerConfig.Validate()
if err != nil {
if err := cfg.APIConfig.Validate(); err != nil {
return err
}
return cfg.APIConfig.Validate()

if err := cfg.ControllerConfig.Validate(); err != nil {
return err
}

if len(cfg.ModuleGroups) == 0 {
return errEmptyModuleGroups
}

return nil
}

func (cfg *Config) getDiscoveryClient() (discovery.DiscoveryInterface, error) {
if cfg.makeDynamicClient != nil {
return cfg.makeDiscoveryClient()
}
return k8sconfig.MakeDiscoveryClient(cfg.APIConfig)
}

func (cfg *Config) getK8sDynamicClient() (dynamic.Interface, error) {
Expand Down
20 changes: 14 additions & 6 deletions receiver/kymastatsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,38 @@ func TestLoadConfig(t *testing.T) {
expectErr bool
}{
{
id: component.NewIDWithName(metadata.Type, "default"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: duration, InitialDelay: delay},
APIConfig: k8sconfig.APIConfig{
AuthType: "serviceAccount",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: duration, InitialDelay: delay},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
ModuleGroups: []string{"operator.kyma-project.io"},
},
},

{
id: component.NewIDWithName(metadata.Type, "k8s"),
id: component.NewIDWithName(metadata.Type, "kubeconfig"),
expected: &Config{
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 30 * time.Second, InitialDelay: delay},
APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
Context: "k8s-context",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 30 * time.Second, InitialDelay: delay},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
ModuleGroups: []string{"operator.kyma-project.io"},
},
},
{
id: component.NewIDWithName(metadata.Type, "sa"),
expected: &Config{
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 10 * time.Second, InitialDelay: delay},
APIConfig: k8sconfig.APIConfig{
AuthType: "serviceAccount",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 10 * time.Second, InitialDelay: delay},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
ModuleGroups: []string{"operator.kyma-project.io"},
},
},
{
Expand All @@ -72,13 +75,18 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "none"),
expected: &Config{
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: duration, InitialDelay: delay},
APIConfig: k8sconfig.APIConfig{
AuthType: "none",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: duration, InitialDelay: delay},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
ModuleGroups: []string{"operator.kyma-project.io"},
},
},
{
id: component.NewIDWithName(metadata.Type, "nomodulegroups"),
expectErr: true,
},
}

for _, tt := range tests {
Expand Down
27 changes: 16 additions & 11 deletions receiver/kymastatsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kyma-project/opentelemetry-collector-components/internal/k8sconfig"
"github.com/kyma-project/opentelemetry-collector-components/receiver/kymastatsreceiver/internal/metadata"
"github.com/kyma-project/opentelemetry-collector-components/receiver/kymastatsreceiver/internal/modulediscovery"
)

var (
typeStr = component.MustNewType("kymastats")
defaultModuleGVRs = []schema.GroupVersionResource{
{
Group: "operator.kyma-project.io",
Version: "v1alpha1",
Resource: "telemetries",
},
}
typeStr = component.MustNewType("kymastats")
)

func createDefaultConfig() component.Config {
Expand All @@ -48,11 +41,23 @@ func createMetricsReceiver(_ context.Context, params receiver.Settings, baseCfg
if !ok {
return nil, errors.New("invalid configuration")
}
client, err := config.getK8sDynamicClient()

discovery, err := config.getDiscoveryClient()
if err != nil {
return nil, err
}

dynamic, err := config.getK8sDynamicClient()
if err != nil {
return nil, err
}
scrp, err := newKymaScraper(client, params, defaultModuleGVRs, config.MetricsBuilderConfig)

scrp, err := newKymaScraper(
modulediscovery.New(discovery, params.Logger, config.ModuleGroups),
dynamic,
params,
config.MetricsBuilderConfig,
)
if err != nil {
return nil, err
}
Expand Down
22 changes: 12 additions & 10 deletions receiver/kymastatsreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery"
discoveryfake "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/fake"
dynamicfake "k8s.io/client-go/dynamic/fake"

"github.com/kyma-project/opentelemetry-collector-components/internal/k8sconfig"
"github.com/kyma-project/opentelemetry-collector-components/receiver/kymastatsreceiver/internal/metadata"
Expand All @@ -34,16 +36,20 @@ func TestCreateMetricsReceiver(t *testing.T) {
{
name: "valid",
cfg: &Config{
APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
},
ControllerConfig: scraperhelper.ControllerConfig{
CollectionInterval: 10 * time.Second,
InitialDelay: time.Second,
},

APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
makeDynamicClient: func() (dynamic.Interface, error) { return fake.NewSimpleDynamicClient(runtime.NewScheme()), nil },
makeDiscoveryClient: func() (discovery.DiscoveryInterface, error) {
return &discoveryfake.FakeDiscovery{}, nil
},
makeDynamicClient: func() (dynamic.Interface, error) {
return dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), nil
},
},
},
{
Expand Down Expand Up @@ -80,7 +86,6 @@ func TestCreateTraceReceiver(t *testing.T) {
context.Background(),
receivertest.NewNopSettings(),
&Config{

APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
},
Expand All @@ -97,7 +102,6 @@ func TestCreateLogsReceiver(t *testing.T) {
context.Background(),
receivertest.NewNopSettings(),
&Config{

APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
},
Expand All @@ -111,7 +115,6 @@ func TestCreateLogsReceiver(t *testing.T) {
func TestFactoryBadAuthType(t *testing.T) {
factory := NewFactory()
cfg := &Config{

APIConfig: k8sconfig.APIConfig{
AuthType: "none",
},
Expand All @@ -130,7 +133,6 @@ func TestFactoryNoneAuthType(t *testing.T) {
t.Setenv("KUBERNETES_SERVICE_PORT", "443")
factory := NewFactory()
cfg := &Config{

APIConfig: k8sconfig.APIConfig{
AuthType: "none",
},
Expand Down
66 changes: 66 additions & 0 deletions receiver/kymastatsreceiver/internal/modulediscovery/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package modulediscovery

import (
"fmt"
"slices"
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
)

type Client struct {
discovery discovery.DiscoveryInterface
logger *zap.Logger
moduleGroups []string
}

func New(discovery discovery.DiscoveryInterface, logger *zap.Logger, moduleGroups []string) *Client {
return &Client{
discovery: discovery,
logger: logger,
moduleGroups: moduleGroups,
}
}

func (c *Client) Discover() ([]schema.GroupVersionResource, error) {
// ServerPreferredResources returns API resources/groups of the preferred (usually, stored) API version.
// It guarantees that only version per resource/group is returned.
resourceLists, err := c.discovery.ServerPreferredResources()
if err != nil {
return nil, fmt.Errorf("failed to discover preferred resources: %w", err)
}

var gvrs []schema.GroupVersionResource
for _, resourceList := range resourceLists {
groupVersion, err := schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
return nil, fmt.Errorf("failed to parse groupVersion %s: %w", resourceList.GroupVersion, err)
}

if !slices.Contains(c.moduleGroups, groupVersion.Group) {
continue
}

c.logger.Debug("Discovered module group", zap.Any("groupVersion", groupVersion))

for _, resource := range resourceList.APIResources {
gvr := groupVersion.WithResource(resource.Name)
if isSubresource(resource.Name) {
c.logger.Debug("Skipping subresource", zap.Any("groupVersionResource", gvr))
continue
}

gvrs = append(gvrs, gvr)

c.logger.Debug("Discovered module resource", zap.Any("groupVersionResource", gvr))
}
}

return gvrs, nil
}

func isSubresource(resourceName string) bool {
return strings.Contains(resourceName, "/")
}
Loading

0 comments on commit 62836db

Please sign in to comment.