Skip to content

Commit

Permalink
Merge pull request #408 from rksharma95/feat-secure-grpc
Browse files Browse the repository at this point in the history
feat(gRPC): add mtls support
  • Loading branch information
daemon1024 authored Mar 15, 2024
2 parents 4da2792 + 32e59fe commit bb54856
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 337 deletions.
4 changes: 4 additions & 0 deletions cmd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func init() {
rootCmd.AddCommand(logCmd)

logCmd.Flags().StringVar(&logOptions.GRPC, "gRPC", "", "gRPC server information")
logCmd.Flags().BoolVar(&logOptions.Insecure, "insecure", true, "connect to kubearmor on an insecure connection")
logCmd.Flags().StringVar(&logOptions.TlsCertPath, "tlsCertPath", "/var/lib/kubearmor/tls", "path to the ca.crt, client.crt, and client.key if certs are provided locally")
logCmd.Flags().StringVar(&logOptions.TlsCertProvider, "tlsCertProvider", "self", "{self|external} self: dynamically crete client certificates, external: provide client certificate and key with --tlsCertPath")
logCmd.Flags().BoolVar(&logOptions.ReadCAFromSecret, "readCAFromSecret", true, "true if ca cert to be read from k8s secret on cluster running kubearmor")
logCmd.Flags().StringVar(&logOptions.MsgPath, "msgPath", "none", "Output location for messages, {path|stdout|none}")
logCmd.Flags().StringVar(&logOptions.LogPath, "logPath", "stdout", "Output location for alerts and logs, {path|stdout|none}")
logCmd.Flags().StringVar(&logOptions.LogFilter, "logFilter", "policy", "Filter for what kinds of alerts and logs to receive, {policy|system|all}")
Expand Down
15 changes: 2 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,9 @@ module github.com/kubearmor/kubearmor-client

go 1.22

toolchain go1.22.1

replace (
github.com/etcd-io/bbolt => go.etcd.io/bbolt v1.3.6
github.com/optiopay/kafka => github.com/cilium/kafka v0.0.0-20180809090225-01ce283b732b
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.26.4
k8s.io/component-base => k8s.io/component-base v0.26.4
)

require (
github.com/blang/semver v3.5.1+incompatible
github.com/cilium/cilium v1.15.2
github.com/cilium/cilium v1.14.5
github.com/clarketm/json v1.17.1
github.com/docker/docker v25.0.4+incompatible
github.com/fatih/color v1.16.0
Expand Down Expand Up @@ -203,7 +194,6 @@ require (
github.com/google/go-github/v55 v55.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
Expand Down Expand Up @@ -358,7 +348,6 @@ require (
go.uber.org/dig v1.17.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
Expand All @@ -376,7 +365,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiserver v0.29.0 // indirect
k8s.io/apiserver v0.29.2 // indirect
k8s.io/component-base v0.29.2 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
Expand Down
321 changes: 31 additions & 290 deletions go.sum

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package k8s

import (
"context"

"github.com/rs/zerolog/log"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -32,6 +35,10 @@ var (
KubeConfig string
// ContextName specifies the name of kubeconfig context
ContextName string
// kubearmor-ca secret label
KubeArmorCALabels = map[string]string{
"kubearmor-app": "kubearmor-ca",
}
)

// ConnectK8sClient Function
Expand Down Expand Up @@ -80,3 +87,18 @@ func ConnectK8sClient() (*Client, error) {
Config: config,
}, nil
}

func GetKubeArmorCaSecret(client kubernetes.Interface) (string, string) {
secret, err := client.CoreV1().Secrets("").List(context.Background(), v1.ListOptions{
LabelSelector: v1.FormatLabelSelector(&v1.LabelSelector{MatchLabels: KubeArmorCALabels}),
})
if err != nil {
log.Error().Msgf("error getting kubearmor ca secret: %v", err)
return "", ""
}
if len(secret.Items) < 1 {
log.Error().Msgf("no kubearmor ca secret found in the cluster: %v", err)
return "", ""
}
return secret.Items[0].Name, secret.Items[0].Namespace
}
62 changes: 41 additions & 21 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (

type regexType *regexp.Regexp

const (
SelfCertProvider string = "self"
ExternalCertLoader string = "external"
)

// Regex Compiled Structs
var (
CNamespace regexType
Expand All @@ -34,21 +39,25 @@ var (

// Options Structure
type Options struct {
GRPC string
MsgPath string
LogPath string
LogFilter string
JSON bool
Namespace string
LogType string
Operation string
ContainerName string
PodName string
Source string
Resource string
Limit uint32
Selector []string
EventChan chan EventInfo // channel to send events on
GRPC string
Insecure bool
TlsCertPath string

Check warning on line 44 in log/log.go

View workflow job for this annotation

GitHub Actions / go-lint

struct field TlsCertPath should be TLSCertPath
TlsCertProvider string

Check warning on line 45 in log/log.go

View workflow job for this annotation

GitHub Actions / go-lint

struct field TlsCertProvider should be TLSCertProvider
ReadCAFromSecret bool
MsgPath string
LogPath string
LogFilter string
JSON bool
Namespace string
LogType string
Operation string
ContainerName string
PodName string
Source string
Resource string
Limit uint32
Selector []string
EventChan chan EventInfo // channel to send events on
}

// StopChan Channel
Expand Down Expand Up @@ -144,9 +153,20 @@ func StartObserver(c *k8s.Client, o Options) error {
}

// create client
logClient := NewClient(gRPC, o.MsgPath, o.LogPath, o.LogFilter, o.Limit)
if logClient == nil {
return errors.New("unable to create log client")
logClient, err := NewClient(gRPC, o, c.K8sClientset)
if err != nil {
if o.Insecure && !isDialingError(err) {
// retry connecting to the server on secured channel
fmt.Fprintf(os.Stderr, "Failed to connect on insecure channel\n(%s)\n", err)
fmt.Fprint(os.Stderr, "Trying to reconnect using secured channel...\n")
o.Insecure = false
logClient, err = NewClient(gRPC, o, c.K8sClientset)
if err != nil {
return fmt.Errorf("unable to create log client, error=%s", err)
}
} else {
return fmt.Errorf("unable to create log client, error=%s", err)
}
}

fmt.Fprintf(os.Stderr, "Created a gRPC client (%s)\n", gRPC)
Expand All @@ -160,10 +180,10 @@ func StartObserver(c *k8s.Client, o Options) error {
if o.MsgPath != "none" {
// watch messages
go logClient.WatchMessages(o.MsgPath, o.JSON)
fmt.Fprintln(os.Stderr, "Started to watch messages")
fmt.Fprintln(os.Stdout, "Started to watch messages")
}

err := regexCompile(o)
err = regexCompile(o)
if err != nil {
fmt.Print(err)
return err
Expand All @@ -174,7 +194,7 @@ func StartObserver(c *k8s.Client, o Options) error {
if o.LogFilter == "all" || o.LogFilter == "policy" {
// watch alerts
go logClient.WatchAlerts(o)
fmt.Fprintln(os.Stderr, "Started to watch alerts")
fmt.Fprintln(os.Stdout, "Started to watch alerts")
}

if o.LogFilter == "all" || o.LogFilter == "system" {
Expand Down
44 changes: 31 additions & 13 deletions log/logClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
pb "github.com/kubearmor/KubeArmor/protobuf"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/client-go/kubernetes"
)

// EventInfo Event data signalled on EventChan
Expand Down Expand Up @@ -100,18 +103,29 @@ type Feeder struct {
}

// NewClient Function
func NewClient(server, msgPath, logPath, logFilter string, limit uint32) *Feeder {
func NewClient(server string, o Options, c kubernetes.Interface) (*Feeder, error) {
fd := &Feeder{}

fd.Running = true

fd.server = server

fd.limit = limit
fd.limit = o.Limit

conn, err := grpc.Dial(fd.server, grpc.WithInsecure())
var creds credentials.TransportCredentials
if !o.Insecure {
tlsCreds, err := loadTLSCredentials(c, o)
if err != nil {
return nil, err
}
creds = tlsCreds
} else {
creds = insecure.NewCredentials()
}
conn, err := grpc.Dial(fd.server, grpc.WithTransportCredentials(creds))
if err != nil {
return nil
fmt.Fprintf(os.Stderr, "Error dialing the server: %s", err)
return nil, err
}
fd.conn = conn

Expand All @@ -120,39 +134,39 @@ func NewClient(server, msgPath, logPath, logFilter string, limit uint32) *Feeder
msgIn := pb.RequestMessage{}
msgIn.Filter = ""

if msgPath != "none" {
if o.MsgPath != "none" {
msgStream, err := fd.client.WatchMessages(context.Background(), &msgIn)
if err != nil {
return nil
return nil, err
}
fd.msgStream = msgStream
}

alertIn := pb.RequestMessage{}
alertIn.Filter = logFilter
alertIn.Filter = o.LogFilter

if logPath != "none" && (alertIn.Filter == "all" || alertIn.Filter == "policy") {
if o.LogPath != "none" && (alertIn.Filter == "all" || alertIn.Filter == "policy") {
alertStream, err := fd.client.WatchAlerts(context.Background(), &alertIn)
if err != nil {
return nil
return nil, err
}
fd.alertStream = alertStream
}

logIn := pb.RequestMessage{}
logIn.Filter = logFilter
logIn.Filter = o.LogFilter

if logPath != "none" && (logIn.Filter == "all" || logIn.Filter == "system") {
if o.LogPath != "none" && (logIn.Filter == "all" || logIn.Filter == "system") {
logStream, err := fd.client.WatchLogs(context.Background(), &logIn)
if err != nil {
return nil
return nil, err
}
fd.logStream = logStream
}

fd.WgClient = sync.WaitGroup{}

return fd
return fd, nil
}

// DoHealthCheck Function
Expand Down Expand Up @@ -477,3 +491,7 @@ func selectLabels(o Options, labels []string) error {
}
return errors.New("Not found any flag")
}

func isDialingError(err error) bool {
return strings.Contains(err.Error(), "Error while dialing")
}
47 changes: 47 additions & 0 deletions log/tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Authors of KubeArmor

// Package log connects and observes telemetry from KubeArmor
package log

import (
"crypto/tls"
"fmt"
"time"

"github.com/kubearmor/KubeArmor/KubeArmor/cert"
"github.com/kubearmor/kubearmor-client/k8s"
"google.golang.org/grpc/credentials"
"k8s.io/client-go/kubernetes"
)

func loadTLSCredentials(client kubernetes.Interface, o Options) (credentials.TransportCredentials, error) {
var secret, namespace string
var clientCertCfg cert.CertConfig
if o.ReadCAFromSecret {
secret, namespace = k8s.GetKubeArmorCaSecret(client)
if secret == "" || namespace == "" {
return credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12}), fmt.Errorf("error getting kubearmor ca secret")
}
}
if o.TlsCertProvider == SelfCertProvider {
// create certificate configurations
clientCertCfg = cert.DefaultKubeArmorClientConfig
clientCertCfg.NotAfter = time.Now().AddDate(1, 0, 0) //valid for 1 year
}
tlsConfig := cert.TlsConfig{
CertCfg: clientCertCfg,
ReadCACertFromSecret: o.ReadCAFromSecret,
Secret: secret,
Namespace: namespace,
K8sClient: client.(*kubernetes.Clientset),
CertPath: cert.GetClientCertPath(o.TlsCertPath),
CertProvider: o.TlsCertProvider,
CACertPath: cert.GetCACertPath(o.TlsCertPath),
}
creds, err := cert.NewTlsCredentialManager(&tlsConfig).CreateTlsClientCredentials()
if err != nil {
fmt.Println(err.Error())
}
return creds, err
}

0 comments on commit bb54856

Please sign in to comment.