Skip to content

Commit

Permalink
feat: add flink example and use it in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rriski committed Nov 21, 2024
1 parent ce5b351 commit 8c256e3
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 37 deletions.
29 changes: 29 additions & 0 deletions docs/docs/api-reference/examples/flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: aiven.io/v1alpha1
kind: Flink
metadata:
name: my-flink
spec:
authSecretRef:
name: aiven-token
key: token

connInfoSecretTarget:
name: flink-secret
annotations:
foo: bar
labels:
baz: egg

project: my-aiven-project
cloudName: google-europe-west1
plan: business-4

maintenanceWindowDow: sunday
maintenanceWindowTime: 11:00:00

userConfig:
number_of_task_slots: 10
ip_filter:
- network: 0.0.0.0/32
description: whatever
- network: 10.20.0.0/16
81 changes: 81 additions & 0 deletions docs/docs/api-reference/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,87 @@
title: "Flink"
---

## Usage example

??? example
```yaml
apiVersion: aiven.io/v1alpha1
kind: Flink
metadata:
name: my-flink
spec:
authSecretRef:
name: aiven-token
key: token

connInfoSecretTarget:
name: flink-secret
prefix: MY_SECRET_PREFIX_
annotations:
foo: bar
labels:
baz: egg

project: my-aiven-project
cloudName: google-europe-west1
plan: business-4

maintenanceWindowDow: sunday
maintenanceWindowTime: 11:00:00

userConfig:
number_of_task_slots: 10
ip_filter:
- network: 0.0.0.0
description: whatever
- network: 10.20.0.0/16
```

!!! info
To create this resource, a `Secret` containing Aiven token must be [created](/aiven-operator/authentication.html) first.

Apply the resource with:

```shell
kubectl apply -f example.yaml
```

Verify the newly created `Flink`:

```shell
kubectl get flinks my-flink
```

The output is similar to the following:
```shell
Name Project Region Plan State
my-flink my-aiven-project google-europe-west1 business-4 RUNNING
```

To view the details of the `Secret`, use the following command:
```shell
kubectl describe secret flink-secret
```

You can use the [jq](https://github.com/jqlang/jq) to quickly decode the `Secret`:

```shell
kubectl get secret flink-secret -o json | jq '.data | map_values(@base64d)'
```

The output is similar to the following:

```{ .json .no-copy }
{
"FLINK_HOST": "<secret>",
"FLINK_PORT": "<secret>",
"FLINK_USER": "<secret>",
"FLINK_PASSWORD": "<secret>",
"FLINK_URI": "<secret>",
"FLINK_HOSTS": "<secret>",
}
```

## Flink {: #Flink }

Flink is the Schema for the flinks API.
Expand Down
46 changes: 9 additions & 37 deletions tests/flink_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tests

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -11,35 +10,6 @@ import (
flinkuserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/flink"
)

func getFlinkYaml(project, name, cloudName string) string {
return fmt.Sprintf(`
apiVersion: aiven.io/v1alpha1
kind: Flink
metadata:
name: %[2]s
spec:
authSecretRef:
name: aiven-token
key: token
project: %[1]s
cloudName: %[3]s
plan: business-4
tags:
env: test
instance: foo
userConfig:
number_of_task_slots: 10
ip_filter:
- network: 0.0.0.0/32
description: bar
- network: 10.20.0.0/16
`, project, name, cloudName)
}

func TestFlink(t *testing.T) {
t.Parallel()
defer recoverPanic(t)
Expand All @@ -49,7 +19,13 @@ func TestFlink(t *testing.T) {
defer cancel()

name := randName("flink")
yml := getFlinkYaml(cfg.Project, name, cfg.PrimaryCloudName)
yml, err := loadExampleYaml("flink.yaml", map[string]string{
"google-europe-west1": cfg.PrimaryCloudName,
"my-aiven-project": cfg.Project,
"my-flink": name,
})
require.NoError(t, err)

s := NewSession(ctx, k8sClient, cfg.Project)

// Cleans test afterward
Expand All @@ -71,10 +47,6 @@ func TestFlink(t *testing.T) {
assert.Contains(t, serviceRunningStatesAiven, flinkAvn.State)
assert.Equal(t, flinkAvn.Plan, flink.Spec.Plan)
assert.Equal(t, flinkAvn.CloudName, flink.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, flink.Spec.Tags)
flinkResp, err := avnClient.ServiceTags.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, flinkResp.Tags, flink.Spec.Tags)

// UserConfig test
assert.Equal(t, anyPointer(10), flink.Spec.UserConfig.NumberOfTaskSlots)
Expand All @@ -84,7 +56,7 @@ func TestFlink(t *testing.T) {

// First entry
assert.Equal(t, "0.0.0.0/32", flink.Spec.UserConfig.IpFilter[0].Network)
assert.Equal(t, "bar", *flink.Spec.UserConfig.IpFilter[0].Description)
assert.Equal(t, "whatever", *flink.Spec.UserConfig.IpFilter[0].Description)

// Second entry
assert.Equal(t, "10.20.0.0/16", flink.Spec.UserConfig.IpFilter[1].Network)
Expand All @@ -96,7 +68,7 @@ func TestFlink(t *testing.T) {
assert.Equal(t, ipFilterAvn, flink.Spec.UserConfig.IpFilter)

// Secrets test
secret, err := s.GetSecret(flink.GetName())
secret, err := s.GetSecret(flink.Spec.ConnInfoSecretTarget.Name)
require.NoError(t, err)
assert.NotEmpty(t, secret.Data["FLINK_HOST"])
assert.NotEmpty(t, secret.Data["FLINK_USER"])
Expand Down

0 comments on commit 8c256e3

Please sign in to comment.