Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic SASL and TLS support for Kafka cloud events #5814

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions charts/flyte-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ helm install gateway bitnami/contour -n flyte
| cloud_events.enable | bool | `false` | |
| cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | |
| cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | |
| cloud_events.kafka | object | `{"brokers":["mybroker:443"],"saslConfig":{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","user":"kafka"},"tlsConfig":{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"},"version":"3.7.0"}` | Configuration for sending cloud events to Kafka |
| cloud_events.kafka.brokers | list | `["mybroker:443"]` | The kafka brokers to talk to |
| cloud_events.kafka.saslConfig | object | `{"enabled":false,"handshake":true,"mechanism":"PLAIN","password":"","user":"kafka"}` | SASL based authentication |
| cloud_events.kafka.saslConfig.enabled | bool | `false` | Whether to use SASL authentication |
| cloud_events.kafka.saslConfig.handshake | bool | `true` | Whether the send the SASL handsahke first |
| cloud_events.kafka.saslConfig.mechanism | string | `"PLAIN"` | Which SASL mechanism to use. Defaults to PLAIN |
| cloud_events.kafka.saslConfig.password | string | `""` | The password for the kafka user |
| cloud_events.kafka.saslConfig.user | string | `"kafka"` | The kafka user |
| cloud_events.kafka.tlsConfig | object | `{"certPath":"/etc/ssl/certs/kafka-client.crt","enabled":false,"keyPath":"/etc/ssl/certs/kafka-client.key"}` | Certificate based authentication |
| cloud_events.kafka.tlsConfig.certPath | string | `"/etc/ssl/certs/kafka-client.crt"` | Path to the client certificate |
| cloud_events.kafka.tlsConfig.enabled | bool | `false` | Whether to use certificate based authentication or TLS |
| cloud_events.kafka.tlsConfig.keyPath | string | `"/etc/ssl/certs/kafka-client.key"` | Path to the client private key |
| cloud_events.kafka.version | string | `"3.7.0"` | The version of Kafka |
| cloud_events.secretName | string | `""` | The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the configuration contains secrets. |
| cloud_events.type | string | `"aws"` | |
| cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain |
| cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters |
Expand Down
2 changes: 1 addition & 1 deletion charts/flyte-core/templates/admin/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ data:
externalEvents: {{ tpl (toYaml .) $ | nindent 6 }}
{{- end }}
{{- end }}
{{- if .Values.cloud_events.enable }}
{{- if and .Values.cloud_events.enable (not .Values.cloud_events.secretName) }}
{{- with .Values.cloud_events }}
cloud_events.yaml: |
cloudEvents: {{ tpl (toYaml .) $ | nindent 6 }}
Expand Down
4 changes: 4 additions & 0 deletions charts/flyte-core/templates/admin/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ spec:
name: flyte-admin-base-config
- configMap:
name: flyte-admin-clusters-config
{{- if .Values.cloud_events.secretName }}
- secret:
name: {{ .Values.cloud_events.secretName }}
{{- end }}
name: clusters-config-volume
{{- if .Values.cluster_resource_manager.enabled }}
- configMap:
Expand Down
30 changes: 30 additions & 0 deletions charts/flyte-core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,9 @@ external_events:
# Cloud events are used to send events (unprocessed, as Admin see them) in cloud event format to
# an SNS topic (or gcp equivalent)
cloud_events:
# -- The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the
# configuration contains secrets.
secretName: ""
Sovietaced marked this conversation as resolved.
Show resolved Hide resolved
enable: false
type: aws
aws:
Expand All @@ -955,6 +958,33 @@ cloud_events:
topicName: "arn:aws:sns:us-east-2:123456:123-my-topic"
eventTypes:
- all # Or workflow, node, task. Or "*"
# -- Configuration for sending cloud events to Kafka
kafka:
# -- The version of Kafka
version: "3.7.0"
# -- The kafka brokers to talk to
brokers:
- mybroker:443
# -- SASL based authentication
saslConfig:
# -- Whether to use SASL authentication
enabled: false
# -- The kafka user
user: kafka
# -- The password for the kafka user
password: ""
# -- Whether the send the SASL handsahke first
handshake: true
# -- Which SASL mechanism to use. Defaults to PLAIN
mechanism: PLAIN
# -- Certificate based authentication
tlsConfig:
# -- Whether to use certificate based authentication or TLS
enabled: false
# -- Path to the client certificate
certPath: /etc/ssl/certs/kafka-client.crt
# -- Path to the client private key
keyPath: /etc/ssl/certs/kafka-client.key

# -- Configuration for the Cluster resource manager component. This is an optional component, that enables automatic
# cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ data:
disabled: false
seedProjects:
- flytesnacks
seedProjectsWithDetails:
- description: Default project setup.
name: flytesnacks
dataCatalog:
disabled: false
propeller:
Expand Down Expand Up @@ -816,7 +819,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: M0MwZjRDRlRMRVg5eFlNWA==
haSharedSecret: Mm96eUJNNUlWUzB6dG5xag==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1247,7 +1250,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: dc6e26fec37cad413a92bf06f2840ea1e497284312275ff06e22b152dee1566b
checksum/configuration: a823eaadac5f3a4358c8acf628ebeb3719f88312af520d2c253de2579dff262d
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1413,7 +1416,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 49b88f7ed6b4bec4cdb0305c1d990514d9b75690607d7ae75d5862da9a3b2a29
checksum/secret: d313245b88895f79af2db62a30442bfaf128d845a6f11fbec7d80e8b342ed247
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
9 changes: 6 additions & 3 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ data:
disabled: false
seedProjects:
- flytesnacks
seedProjectsWithDetails:
- description: Default project setup.
name: flytesnacks
dataCatalog:
disabled: false
propeller:
Expand Down Expand Up @@ -798,7 +801,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: ekx6Z2kxS3FBYjV5dExlMw==
haSharedSecret: RHRoWE50MnFPOEUxMmZuNA==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1196,7 +1199,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: a6f3ea502338c626b7824453ce7dc8b6fcd441d68865c075e2e74d797bc607fa
checksum/configuration: c2649df6bcb523f120c73b0fdeec5d9516f555eab12e4eae78b04dea2cf2abae
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1362,7 +1365,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 9b64bfe991cd6ce4394fa9c2651b0bbfe4834024ece293b3ac9688111d6fe5d3
checksum/secret: 3b81a8307491b87f506cfc21f0ba759872ef0b6666427399fa52db75188b6f7c
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: MW90empzaUNBd2FlV09QSw==
haSharedSecret: VVF2SndnTXM3cEVGbFM3Mw==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -934,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: ba78cd87d2f6685980b95bd20913088b3a07fa48e9a414693277e3df134710ad
checksum/secret: ca5ab8524ec246e8321ad14b55284b4c6f8a488ddfa80377989c1529fa51af45
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ issues:
exclude-rules:
- path: pkg/workflowengine/impl/prepare_execution.go
text: "copies lock"
- path: pkg/runtime/interfaces/application_configuration.go
text: "G402: TLS InsecureSkipVerify may be true."
7 changes: 1 addition & 6 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig)
kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
Expand Down
68 changes: 68 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package interfaces

import (
"context"
"crypto/tls"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -231,11 +235,75 @@
ProjectID string `json:"projectId"`
}

// This section holds SASL config for Kafka
type SASLConfig struct {
// Whether to use SASL
Enabled bool `json:"enabled"`
// The username
User string `json:"user"`
// The password
Password string `json:"password"`
Handshake bool `json:"handshake"`
// Which SASL Mechanism to use. Defaults to PLAIN
Mechanism sarama.SASLMechanism `json:"mechanism"`
}

// This section holds TLS config for Kafka clients
type TLSConfig struct {
// Whether to use TLS
Enabled bool `json:"enabled"`
// Whether to skip certificate verification
InsecureSkipVerify bool `json:"insecureSkipVerify"`
// The location of the client certificate
CertPath string `json:"certPath"`
// The location of the client private key
KeyPath string `json:"keyPath"`
}

// This section holds configs for Kafka clients
type KafkaConfig struct {
// The version of Kafka, e.g. 2.1.0, 0.8.2.0
Version string `json:"version"`
// kafka broker addresses
Brokers []string `json:"brokers"`
// sasl config
SASLConfig SASLConfig `json:"saslConfig"`
// tls config
TLSConfig TLSConfig `json:"tlsConfig"`
}

func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) {
var err error
s.Version, err = sarama.ParseKafkaVersion(k.Version)
if err != nil {
panic(err)

Check warning on line 279 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L279

Added line #L279 was not covered by tests
}

if k.SASLConfig.Enabled {
s.Net.SASL.Enable = true
s.Net.SASL.User = k.SASLConfig.User
s.Net.SASL.Password = k.SASLConfig.Password
s.Net.SASL.Handshake = k.SASLConfig.Handshake

if k.SASLConfig.Mechanism == "" {
k.SASLConfig.Mechanism = sarama.SASLTypePlaintext
}
s.Net.SASL.Mechanism = k.SASLConfig.Mechanism

Check warning on line 291 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L283-L291

Added lines #L283 - L291 were not covered by tests
}

if k.TLSConfig.Enabled {
s.Net.TLS.Enable = true
s.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify,
}
if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" {
cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath)
if err != nil {
panic(err)

Check warning on line 302 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L295-L302

Added lines #L295 - L302 were not covered by tests
}
s.Net.TLS.Config.Certificates = []tls.Certificate{cert}

Check warning on line 304 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L304

Added line #L304 was not covered by tests
}
}
}

// This section holds configuration for the event scheduler used to schedule workflow executions.
Expand Down
Loading