Skip to content

Commit

Permalink
(feat): Add Fluent-bit to K8tls (#233)
Browse files Browse the repository at this point in the history
* feat: Add Fluent-bit to K8tls

Signed-off-by: Jones Jefferson <[email protected]>

* Fixed PR comments

Signed-off-by: Jones Jefferson <[email protected]>

---------

Signed-off-by: Jones Jefferson <[email protected]>
Co-authored-by: Jones Jefferson <[email protected]>
  • Loading branch information
JonesJefferson and Jones Jefferson authored Aug 14, 2024
1 parent 89835e7 commit 9476d9e
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 33 deletions.
14 changes: 14 additions & 0 deletions deployments/nimbus-k8tls/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ helm upgrade --install nimbus-k8tls . -n nimbus
| image.pullPolicy | string | Always | `nimbus-k8tls` adapter image pull policy |
| image.tag | string | latest | `nimbus-k8tls` adapter image tag |

Set the following values accordingly to send the k8tls report to elasticsearch (By default we send report to STDOUT)

##

| Key | Type | Default | Description |
|------------------------------|--------|--------------------|-----------------------------------------------------------------|
| output.elasticsearch.enabled | bool | false | Elasticsearch enabled or not |
| elasticsearch.host | string | localhost | Elasticsearch host |
| elasticsearch.user | string | elastic | Elastic user |
| elasticsearch.port | string | 9200 | Elasticsearch port |
| elasticsearch.index | string | findings | Elasticsearch index |
| output.elasticsearch.password| string | | The password in base64 encoded format |


## Verify if all the resources are up and running

Once done, the following resources will exist in your cluster:
Expand Down
39 changes: 39 additions & 0 deletions deployments/nimbus-k8tls/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: {{ include "nimbus-k8tls.fullname" . }}-env
data:
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level info
Parsers_File parsers.conf
[INPUT]
Name tail
Path /tmp/compact_report.json
Parser json
Tag json.data
DB /tmp/compact_report.db
Read_from_Head true
Exit_On_Eof true
{{- if .Values.output.elasticsearch.enabled }}
[OUTPUT]
Name es
Match *
Host {{ .Values.output.elasticsearch.host }}
Port {{ .Values.output.elasticsearch.port }}
Index {{ .Values.output.elasticsearch.index }}
HTTP_User {{ .Values.output.elasticsearch.user }}
HTTP_Passwd ${ES_PASSWORD}
tls On
tls.verify Off
Suppress_Type_Name On
Replace_Dots On
{{- end }}
[OUTPUT]
Name stdout
Match *
5 changes: 5 additions & 0 deletions deployments/nimbus-k8tls/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ spec:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
{{- if .Values.output.elasticsearch.enabled }}
- name: TTLSECONDSAFTERFINISHED
value: "{{ .Values.output.elasticsearch.ttlsecondsafterfinished }}"
{{- end }}
terminationGracePeriodSeconds: 10
4 changes: 4 additions & 0 deletions deployments/nimbus-k8tls/templates/namespace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: {{ include "nimbus-k8tls.fullname" . }}-env
6 changes: 6 additions & 0 deletions deployments/nimbus-k8tls/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ rules:
- delete
- get
- update
{{- if .Values.output.elasticsearch.enabled }}
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["elasticsearch-password"]
verbs: ["get"]
{{- end }}
10 changes: 10 additions & 0 deletions deployments/nimbus-k8tls/templates/secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{{- if .Values.output.elasticsearch.enabled -}}
apiVersion: v1
kind: Secret
metadata:
name: elasticsearch-password
namespace: {{ include "nimbus-k8tls.fullname" . }}-env
type: Opaque
data:
es_password: {{ .Values.output.elasticsearch.password }}
{{- end }}
11 changes: 11 additions & 0 deletions deployments/nimbus-k8tls/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,14 @@ serviceAccount:
securityContext:
runAsNonRoot: true
runAsUser: 65533

output:
elasticsearch:
enabled: false
host: "localhost"
user: elastic
port: 9200
index: "findings"
password: "" # Password in base64 encoded format
ttlsecondsafterfinished: "10" # Amount of time to keep the pod around after job has been completed

7 changes: 7 additions & 0 deletions pkg/adapter/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ type Request struct {
Name string
Namespace string
}

type ContextKey string

const (
K8sClientKey ContextKey = "k8sClient"
NamespaceNameKey ContextKey = "NamespaceName"
)
98 changes: 78 additions & 20 deletions pkg/adapter/nimbus-k8tls/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,31 @@ package builder
import (
"context"
"fmt"
"os"
"strconv"
"strings"

"github.com/5GSEC/nimbus/api/v1alpha1"
"github.com/5GSEC/nimbus/pkg/adapter/common"
"github.com/5GSEC/nimbus/pkg/adapter/idpool"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/5GSEC/nimbus/api/v1alpha1"
"github.com/5GSEC/nimbus/pkg/adapter/idpool"
)

var (
DefaultSchedule = "@weekly"
backOffLimit = int32(5)
hostPathDirectoryOrCreate = corev1.HostPathDirectoryOrCreate
DefaultSchedule = "@weekly"
backOffLimit = int32(5)
)

func BuildCronJob(ctx context.Context, cwnp v1alpha1.ClusterNimbusPolicy) (*batchv1.CronJob, *corev1.ConfigMap) {
logger := log.FromContext(ctx)
for _, nimbusRule := range cwnp.Spec.NimbusRules {
id := nimbusRule.ID
if idpool.IsIdSupportedBy(id, "k8tls") {
cronJob, configMap := cronJobFor(id, nimbusRule)
cronJob, configMap := cronJobFor(ctx, id, nimbusRule)
cronJob.SetName(cwnp.Name + "-" + strings.ToLower(id))
cronJob.SetAnnotations(map[string]string{
"app.kubernetes.io/managed-by": "nimbus-k8tls",
Expand All @@ -41,31 +43,32 @@ func BuildCronJob(ctx context.Context, cwnp v1alpha1.ClusterNimbusPolicy) (*batc
return nil, nil
}

func cronJobFor(id string, rule v1alpha1.NimbusRules) (*batchv1.CronJob, *corev1.ConfigMap) {
func cronJobFor(ctx context.Context, id string, rule v1alpha1.NimbusRules) (*batchv1.CronJob, *corev1.ConfigMap) {
switch id {
case idpool.EnsureTLS:
return ensureTlsCronJob(rule)
return ensureTlsCronJob(ctx, rule)
default:
return nil, nil
}
}

func ensureTlsCronJob(rule v1alpha1.NimbusRules) (*batchv1.CronJob, *corev1.ConfigMap) {
func ensureTlsCronJob(ctx context.Context, rule v1alpha1.NimbusRules) (*batchv1.CronJob, *corev1.ConfigMap) {
schedule, scheduleKeyExists := rule.Rule.Params["schedule"]
externalAddresses, addrKeyExists := rule.Rule.Params["external_addresses"]
if scheduleKeyExists && addrKeyExists {
return cronJobForEnsureTls(schedule[0], externalAddresses...)
return cronJobForEnsureTls(ctx, schedule[0], externalAddresses...)
}
if scheduleKeyExists {
return cronJobForEnsureTls(schedule[0])
return cronJobForEnsureTls(ctx, schedule[0])
}
if addrKeyExists {
return cronJobForEnsureTls(DefaultSchedule, externalAddresses...)
return cronJobForEnsureTls(ctx, DefaultSchedule, externalAddresses...)
}
return cronJobForEnsureTls(DefaultSchedule)
return cronJobForEnsureTls(ctx, DefaultSchedule)
}

func cronJobForEnsureTls(schedule string, externalAddresses ...string) (*batchv1.CronJob, *corev1.ConfigMap) {
func cronJobForEnsureTls(ctx context.Context, schedule string, externalAddresses ...string) (*batchv1.CronJob, *corev1.ConfigMap) {
logger := log.FromContext(ctx)
cj := &batchv1.CronJob{
Spec: batchv1.CronJobSpec{
Schedule: schedule,
Expand All @@ -75,7 +78,7 @@ func cronJobForEnsureTls(schedule string, externalAddresses ...string) (*batchv1
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
InitContainers: []corev1.Container{
{
Name: "k8tls",
Image: "kubearmor/k8tls:latest",
Expand All @@ -94,6 +97,25 @@ func cronJobForEnsureTls(schedule string, externalAddresses ...string) (*batchv1
},
},
},
Containers: []corev1.Container{
{
Name: "fluent-bit",
Image: "fluent/fluent-bit:latest",
ImagePullPolicy: corev1.PullAlways,
VolumeMounts: []corev1.VolumeMount{
{
Name: "fluent-bit-config",
MountPath: "/fluent-bit/etc/fluent-bit.conf",
SubPath: "fluent-bit.conf",
ReadOnly: true,
},
{
Name: "k8tls-report",
MountPath: "/tmp/",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "fips-config",
Expand All @@ -106,14 +128,21 @@ func cronJobForEnsureTls(schedule string, externalAddresses ...string) (*batchv1
},
},
{
Name: "k8tls-report",
Name: "fluent-bit-config",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/tmp/",
Type: &hostPathDirectoryOrCreate,
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "fluent-bit-config",
},
},
},
},
{
Name: "k8tls-report",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
},
Expand All @@ -122,6 +151,35 @@ func cronJobForEnsureTls(schedule string, externalAddresses ...string) (*batchv1
},
}

// Fetch the elasticsearch password secret. If the secret is present, set TTLSecondsAfterFinished and reference the secret in the cronjob templateZ
var elasticsearchPasswordSecret corev1.Secret
err := ctx.Value(common.K8sClientKey).(client.Client).Get(ctx, client.ObjectKey{Namespace: ctx.Value(common.NamespaceNameKey).(string), Name: "elasticsearch-password"}, &elasticsearchPasswordSecret)
if err == nil {
// Convert string to int
i, err := strconv.ParseInt(os.Getenv("TTLSECONDSAFTERFINISHED"), 10, 32)
if err != nil {
logger.Error(err, "Error converting string to int", "TTLSECONDSAFTERFINISHED: ", os.Getenv("TTLSECONDSAFTERFINISHED"))
return nil, nil
}
// Convert int to int32
ttlSecondsAfterFinished := int32(i)
// If we are sending the report to elasticsearch, then we delete the pod spawned by job after 1 hour. Else we keep the pod
cj.Spec.JobTemplate.Spec.TTLSecondsAfterFinished = &ttlSecondsAfterFinished
cj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "ES_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "elasticsearch-password",
},
Key: "es_password",
},
},
},
}
}

if len(externalAddresses) > 0 {
cm := buildConfigMap(externalAddresses)

Expand Down
20 changes: 8 additions & 12 deletions pkg/adapter/nimbus-k8tls/manager/k8tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@ import (
func setupK8tlsEnv(ctx context.Context, cwnp v1alpha1.ClusterNimbusPolicy, scheme *runtime.Scheme, k8sClient client.Client) error {
logger := log.FromContext(ctx)

ns := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: NamespaceName,
Labels: cwnp.Labels,
Annotations: map[string]string{
"app.kubernetes.io/managed-by": "nimbus-k8tls",
},
},
// Retrieve the namespace
ns := &corev1.Namespace{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: NamespaceName}, ns)
if err != nil {
if errors.IsNotFound(err) {
logger.Error(err, "failed to fetch Namespace", "Namespace.Name", NamespaceName)
}
return err
}

cm := &corev1.ConfigMap{
Expand Down
5 changes: 4 additions & 1 deletion pkg/adapter/nimbus-k8tls/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func Run(ctx context.Context) {
deletedCronJobCh := make(chan common.Request)
go watcher.WatchCronJobs(ctx, updateCronJobCh, deletedCronJobCh)

// Get the namespace name within which the k8tls environment needs to be set
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -106,7 +107,9 @@ func createOrUpdateCronJob(ctx context.Context, cwnpName string) {
}

deleteDanglingCj(ctx, logger, cwnp)
cronJob, configMap := builder.BuildCronJob(ctx, cwnp)
newCtx := context.WithValue(ctx, common.K8sClientKey, k8sClient)
newCtx = context.WithValue(newCtx, common.NamespaceNameKey, NamespaceName)
cronJob, configMap := builder.BuildCronJob(newCtx, cwnp)

if cronJob != nil {
if err := setupK8tlsEnv(ctx, cwnp, scheme, k8sClient); err != nil {
Expand Down

0 comments on commit 9476d9e

Please sign in to comment.