Skip to content

Commit

Permalink
fix: do not block rebalancing service
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Jul 1, 2024
1 parent 7440c96 commit d3adb28
Show file tree
Hide file tree
Showing 24 changed files with 98 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Ignore `ClickhouseRole` deletion error (missing database)
- Ignore `ClickhouseGrant` deletion errors (missing database, service, role)
- Do not block service operations in `REBALANCING` state

## v0.21.0 - 2024-06-25

Expand Down
12 changes: 11 additions & 1 deletion controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
"github.com/liip/sheriff"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,7 +58,16 @@ func checkServiceIsRunning(ctx context.Context, _ *aiven.Client, avnGen avngen.C
}
return false, err
}
return s.State == "RUNNING", nil
return serviceIsRunning(s.State), nil
}

// serviceIsRunning returns "true" when a service is in operational state, i.e. "running"
func serviceIsRunning[T service.ServiceStateType | string](state T) bool {
switch service.ServiceStateType(state) {
case service.ServiceStateTypeRunning, service.ServiceStateTypeRebalancing:
return true
}
return false
}

func getInitializedCondition(reason, message string) metav1.Condition {
Expand Down
64 changes: 33 additions & 31 deletions controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,42 +180,44 @@ func (h *genericServiceHandler) get(ctx context.Context, avn *aiven.Client, avnG
}

status := o.getServiceStatus()
status.State = s.State
if s.State == "RUNNING" {
meta.SetStatusCondition(&status.Conditions,
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))

metav1.SetMetaDataAnnotation(o.getObjectMeta(), instanceIsRunningAnnotation, "true")

// Some services get secrets after they are running only,
// like ip addresses (hosts)
secret, err := o.newSecret(ctx, s)
if err != nil || secret == nil {
return secret, err
}
if !serviceIsRunning(s.State) {
status.State = s.State
return nil, nil
}

switch o.getServiceType() {
case "kafka", "pg", "mysql", "cassandra":
// CA_CERT can be used with these service types only
default:
return secret, nil
}
status.State = "RUNNING" // overrides REBALANCING
meta.SetStatusCondition(&status.Conditions,
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))

cert, err := avnGen.ProjectKmsGetCA(ctx, spec.Project)
if err != nil {
return nil, fmt.Errorf("cannot retrieve project CA certificate: %w", err)
}
metav1.SetMetaDataAnnotation(o.getObjectMeta(), instanceIsRunningAnnotation, "true")

// We don't expect the StringData map to be empty, it must panic.
prefix := getSecretPrefix(o)
secret.StringData[prefix+"CA_CERT"] = cert
if o.getServiceType() == "kafka" {
// todo: backward compatibility, remove in future releases
secret.StringData["CA_CERT"] = cert
}
// Some services get secrets after they are running only,
// like ip addresses (hosts)
secret, err := o.newSecret(ctx, s)
if err != nil || secret == nil {
return secret, err
}

switch o.getServiceType() {
case "kafka", "pg", "mysql", "cassandra":
// CA_CERT can be used with these service types only
default:
return secret, nil
}
return nil, nil

cert, err := avnGen.ProjectKmsGetCA(ctx, spec.Project)
if err != nil {
return nil, fmt.Errorf("cannot retrieve project CA certificate: %w", err)
}

// We don't expect the StringData map to be empty, it must panic.
prefix := getSecretPrefix(o)
secret.StringData[prefix+"CA_CERT"] = cert
if o.getServiceType() == "kafka" {
// todo: backward compatibility, remove in future releases
secret.StringData["CA_CERT"] = cert
}
return secret, nil
}

// checkPreconditions not required for now by services to be implemented
Expand Down
4 changes: 2 additions & 2 deletions tests/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func TestCassandra(t *testing.T) {
csAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, csAvn.Name, cs.GetName())
assert.Equal(t, "RUNNING", cs.Status.State)
assert.Equal(t, csAvn.State, cs.Status.State)
assert.Equal(t, serviceRunningState, cs.Status.State)
assert.Contains(t, serviceRunningStatesAiven, csAvn.State)
assert.Equal(t, csAvn.Plan, cs.Spec.Plan)
assert.Equal(t, csAvn.CloudName, cs.Spec.CloudName)
assert.Equal(t, "450GiB", cs.Spec.DiskSpace)
Expand Down
4 changes: 2 additions & 2 deletions tests/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func TestClickhouse(t *testing.T) {
chAvn, err := avnClient.Services.Get(ctx, cfg.Project, chName)
require.NoError(t, err)
assert.Equal(t, chAvn.Name, ch.GetName())
assert.Equal(t, "RUNNING", ch.Status.State)
assert.Equal(t, chAvn.State, ch.Status.State)
assert.Equal(t, serviceRunningState, ch.Status.State)
assert.Contains(t, serviceRunningStatesAiven, chAvn.State)
assert.Equal(t, chAvn.Plan, ch.Spec.Plan)
assert.Equal(t, chAvn.CloudName, ch.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, ch.Spec.Tags)
Expand Down
4 changes: 2 additions & 2 deletions tests/clickhouseuser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestClickhouseUser(t *testing.T) {
chAvn, err := avnClient.Services.Get(ctx, cfg.Project, chName)
require.NoError(t, err)
assert.Equal(t, chAvn.Name, ch.GetName())
assert.Equal(t, "RUNNING", ch.Status.State)
assert.Equal(t, chAvn.State, ch.Status.State)
assert.Equal(t, serviceRunningState, ch.Status.State)
assert.Contains(t, serviceRunningStatesAiven, chAvn.State)
assert.Equal(t, chAvn.Plan, ch.Spec.Plan)
assert.Equal(t, chAvn.CloudName, ch.Spec.CloudName)

Expand Down
4 changes: 2 additions & 2 deletions tests/connectionpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func TestConnectionPool(t *testing.T) {
pgAvn, err := avnClient.Services.Get(ctx, cfg.Project, pgName)
require.NoError(t, err)
assert.Equal(t, pgAvn.Name, pg.GetName())
assert.Equal(t, "RUNNING", pg.Status.State)
assert.Equal(t, pgAvn.State, pg.Status.State)
assert.Equal(t, serviceRunningState, pg.Status.State)
assert.Contains(t, serviceRunningStatesAiven, pgAvn.State)
assert.Equal(t, pgAvn.Plan, pg.Spec.Plan)
assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName)

Expand Down
4 changes: 2 additions & 2 deletions tests/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func TestDatabase(t *testing.T) {
pgAvn, err := avnClient.Services.Get(ctx, cfg.Project, pgName)
require.NoError(t, err)
assert.Equal(t, pgAvn.Name, pg.GetName())
assert.Equal(t, "RUNNING", pg.Status.State)
assert.Equal(t, pgAvn.State, pg.Status.State)
assert.Equal(t, serviceRunningState, pg.Status.State)
assert.Contains(t, serviceRunningStatesAiven, pgAvn.State)
assert.Equal(t, pgAvn.Plan, pg.Spec.Plan)
assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName)

Expand Down
5 changes: 5 additions & 0 deletions tests/generic_service_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
"github.com/aiven/aiven-operator/api/v1alpha1"
)

const serviceRunningState = "RUNNING"

// serviceRunningStatesAiven these Aiven service states match to RUNNING state in kube
var serviceRunningStatesAiven = []string{"RUNNING", "REBALANCING"}

func getCreateServiceYaml(project, pgName string) string {
return fmt.Sprintf(`
apiVersion: aiven.io/v1alpha1
Expand Down
4 changes: 2 additions & 2 deletions tests/grafana_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestGrafana(t *testing.T) {
grafanaAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, grafanaAvn.Name, grafana.GetName())
assert.Equal(t, "RUNNING", grafana.Status.State)
assert.Equal(t, grafanaAvn.State, grafana.Status.State)
assert.Equal(t, serviceRunningState, grafana.Status.State)
assert.Contains(t, serviceRunningStatesAiven, grafanaAvn.State)
assert.Equal(t, grafanaAvn.Plan, grafana.Spec.Plan)
assert.Equal(t, grafanaAvn.CloudName, grafana.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, grafana.Spec.Tags)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestKafka(t *testing.T) {
ksAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, ksAvn.Name, ks.GetName())
assert.Equal(t, "RUNNING", ks.Status.State)
assert.Equal(t, ksAvn.State, ks.Status.State)
assert.Equal(t, serviceRunningState, ks.Status.State)
assert.Contains(t, serviceRunningStatesAiven, ksAvn.State)
assert.Equal(t, ksAvn.Plan, ks.Spec.Plan)
assert.Equal(t, ksAvn.CloudName, ks.Spec.CloudName)
assert.Equal(t, "600GiB", ks.Spec.DiskSpace)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafka_with_projectvpc_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func TestKafkaWithProjectVPCRef(t *testing.T) {
kafkaAvn, err := avnClient.Services.Get(ctx, cfg.Project, kafkaName)
require.NoError(t, err)
assert.Equal(t, kafkaAvn.Name, kafka.GetName())
assert.Equal(t, "RUNNING", kafka.Status.State)
assert.Equal(t, kafkaAvn.State, kafka.Status.State)
assert.Equal(t, serviceRunningState, kafka.Status.State)
assert.Contains(t, serviceRunningStatesAiven, kafkaAvn.State)
assert.Equal(t, kafkaAvn.Plan, kafka.Spec.Plan)
assert.Equal(t, kafkaAvn.CloudName, kafka.Spec.CloudName)

Expand Down
4 changes: 2 additions & 2 deletions tests/kafkaacl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func TestKafkaACL(t *testing.T) {
kafkaAvn, err := avnClient.Services.Get(ctx, cfg.Project, kafkaName)
require.NoError(t, err)
assert.Equal(t, kafkaAvn.Name, kafka.GetName())
assert.Equal(t, "RUNNING", kafka.Status.State)
assert.Equal(t, kafkaAvn.State, kafka.Status.State)
assert.Equal(t, serviceRunningState, kafka.Status.State)
assert.Contains(t, serviceRunningStatesAiven, kafkaAvn.State)
assert.Equal(t, kafkaAvn.Plan, kafka.Spec.Plan)
assert.Equal(t, kafkaAvn.CloudName, kafka.Spec.CloudName)

Expand Down
4 changes: 2 additions & 2 deletions tests/kafkaconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestKafkaConnect(t *testing.T) {
kcAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, kcAvn.Name, kc.GetName())
assert.Equal(t, "RUNNING", kc.Status.State)
assert.Equal(t, kcAvn.State, kc.Status.State)
assert.Equal(t, serviceRunningState, kc.Status.State)
assert.Contains(t, serviceRunningStatesAiven, kcAvn.State)
assert.Equal(t, kcAvn.Plan, kc.Spec.Plan)
assert.Equal(t, kcAvn.CloudName, kc.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, kc.Spec.Tags)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkaschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func TestKafkaSchema(t *testing.T) {
kafkaAvn, err := avnClient.Services.Get(ctx, cfg.Project, kafkaName)
require.NoError(t, err)
assert.Equal(t, kafkaAvn.Name, kafka.GetName())
assert.Equal(t, "RUNNING", kafka.Status.State)
assert.Equal(t, kafkaAvn.State, kafka.Status.State)
assert.Equal(t, serviceRunningState, kafka.Status.State)
assert.Contains(t, serviceRunningStatesAiven, kafkaAvn.State)
assert.Equal(t, kafkaAvn.Plan, kafka.Spec.Plan)
assert.Equal(t, kafkaAvn.CloudName, kafka.Spec.CloudName)
require.NotNil(t, kafka.Spec.UserConfig)
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestKafkaTopic(t *testing.T) {
ksAvn, err := avnClient.Services.Get(ctx, cfg.Project, ksName)
require.NoError(t, err)
assert.Equal(t, ksAvn.Name, ks.GetName())
assert.Equal(t, ksAvn.State, ks.Status.State)
assert.Contains(t, serviceRunningStatesAiven, ksAvn.State)
assert.Equal(t, ksAvn.Plan, ks.Spec.Plan)
assert.Equal(t, ksAvn.CloudName, ks.Spec.CloudName)

Expand Down
2 changes: 1 addition & 1 deletion tests/kafkschemaregistryaacl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestKafkaSchemaRegistryACL(t *testing.T) {
kafkaAvn, err := avnGen.ServiceGet(ctx, cfg.Project, kafkaName)
require.NoError(t, err)
assert.Equal(t, kafkaAvn.ServiceName, kafka.GetName())
assert.Equal(t, "RUNNING", kafka.Status.State)
assert.Equal(t, serviceRunningState, kafka.Status.State)
assert.EqualValues(t, kafkaAvn.State, kafka.Status.State)
assert.Equal(t, kafkaAvn.Plan, kafka.Spec.Plan)
assert.Equal(t, kafkaAvn.CloudName, kafka.Spec.CloudName)
Expand Down
4 changes: 2 additions & 2 deletions tests/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestMySQL(t *testing.T) {
msAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, msAvn.Name, ms.GetName())
assert.Equal(t, "RUNNING", ms.Status.State)
assert.Equal(t, msAvn.State, ms.Status.State)
assert.Equal(t, serviceRunningState, ms.Status.State)
assert.Contains(t, serviceRunningStatesAiven, msAvn.State)
assert.Equal(t, msAvn.Plan, ms.Spec.Plan)
assert.Equal(t, msAvn.CloudName, ms.Spec.CloudName)
assert.Equal(t, "100GiB", ms.Spec.DiskSpace)
Expand Down
4 changes: 2 additions & 2 deletions tests/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func TestOpenSearch(t *testing.T) {
osAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, osAvn.Name, os.GetName())
assert.Equal(t, "RUNNING", os.Status.State)
assert.Equal(t, osAvn.State, os.Status.State)
assert.Equal(t, serviceRunningState, os.Status.State)
assert.Contains(t, serviceRunningStatesAiven, osAvn.State)
assert.Equal(t, osAvn.Plan, os.Spec.Plan)
assert.Equal(t, osAvn.CloudName, os.Spec.CloudName)
assert.Equal(t, "240GiB", os.Spec.DiskSpace)
Expand Down
12 changes: 6 additions & 6 deletions tests/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func TestPgReadReplica(t *testing.T) {
masterAvn, err := avnClient.Services.Get(ctx, cfg.Project, masterName)
require.NoError(t, err)
assert.Equal(t, masterAvn.Name, master.GetName())
assert.Equal(t, "RUNNING", master.Status.State)
assert.Equal(t, masterAvn.State, master.Status.State)
assert.Equal(t, serviceRunningState, master.Status.State)
assert.Contains(t, serviceRunningStatesAiven, masterAvn.State)
assert.Equal(t, masterAvn.Plan, master.Spec.Plan)
assert.Equal(t, masterAvn.CloudName, master.Spec.CloudName)
assert.NotNil(t, masterAvn.UserConfig) // "Aiven instance has defaults set"
Expand All @@ -109,8 +109,8 @@ func TestPgReadReplica(t *testing.T) {
replicaAvn, err := avnClient.Services.Get(ctx, cfg.Project, replicaName)
require.NoError(t, err)
assert.Equal(t, replicaAvn.Name, replica.GetName())
assert.Equal(t, "RUNNING", replica.Status.State)
assert.Equal(t, replicaAvn.State, replica.Status.State)
assert.Equal(t, serviceRunningState, replica.Status.State)
assert.Contains(t, serviceRunningStatesAiven, replicaAvn.State)
assert.Equal(t, replicaAvn.Plan, replica.Spec.Plan)
assert.Equal(t, replicaAvn.CloudName, replica.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "test", "instance": "replica"}, replica.Spec.Tags)
Expand Down Expand Up @@ -210,8 +210,8 @@ func TestPgCustomPrefix(t *testing.T) {
pgAvn, err := avnClient.Services.Get(ctx, cfg.Project, pgName)
require.NoError(t, err)
assert.Equal(t, pgAvn.Name, pg.GetName())
assert.Equal(t, "RUNNING", pg.Status.State)
assert.Equal(t, pgAvn.State, pg.Status.State)
assert.Equal(t, serviceRunningState, pg.Status.State)
assert.Contains(t, serviceRunningStatesAiven, pgAvn.State)
assert.Equal(t, pgAvn.Plan, pg.Spec.Plan)
assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "prod", "instance": "pg"}, pg.Spec.Tags)
Expand Down
4 changes: 2 additions & 2 deletions tests/projectvpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func TestProjectVPCID(t *testing.T) {
kafkaAvn, err := avnClient.Services.Get(ctx, cfg.Project, kafkaName)
require.NoError(t, err)
assert.Equal(t, kafkaAvn.Name, kafka.GetName())
assert.Equal(t, "RUNNING", kafka.Status.State)
assert.Equal(t, kafkaAvn.State, kafka.Status.State)
assert.Equal(t, serviceRunningState, kafka.Status.State)
assert.Contains(t, serviceRunningStatesAiven, kafkaAvn.State)
assert.Equal(t, kafkaAvn.Plan, kafka.Spec.Plan)
assert.Equal(t, kafkaAvn.CloudName, kafka.Spec.CloudName)

Expand Down
4 changes: 2 additions & 2 deletions tests/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestRedis(t *testing.T) {
rsAvn, err := avnClient.Services.Get(ctx, cfg.Project, name)
require.NoError(t, err)
assert.Equal(t, rsAvn.Name, rs.GetName())
assert.Equal(t, "RUNNING", rs.Status.State)
assert.Equal(t, rsAvn.State, rs.Status.State)
assert.Equal(t, serviceRunningState, rs.Status.State)
assert.Contains(t, serviceRunningStatesAiven, rsAvn.State)
assert.Equal(t, rsAvn.Plan, rs.Spec.Plan)
assert.Equal(t, rsAvn.CloudName, rs.Spec.CloudName)
assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, rs.Spec.Tags)
Expand Down
12 changes: 6 additions & 6 deletions tests/serviceintegration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestServiceIntegrationClickhousePostgreSQL(t *testing.T) {
chAvn, err := avnClient.Services.Get(ctx, cfg.Project, chName)
require.NoError(t, err)
assert.Equal(t, chAvn.Name, ch.GetName())
assert.Equal(t, chAvn.State, ch.Status.State)
assert.Contains(t, serviceRunningStatesAiven, chAvn.State)
assert.Equal(t, chAvn.Plan, ch.Spec.Plan)
assert.Equal(t, chAvn.CloudName, ch.Spec.CloudName)
assert.Equal(t, chAvn.MaintenanceWindow.DayOfWeek, ch.Spec.MaintenanceWindowDow)
Expand All @@ -66,7 +66,7 @@ func TestServiceIntegrationClickhousePostgreSQL(t *testing.T) {
pgAvn, err := avnClient.Services.Get(ctx, cfg.Project, pgName)
require.NoError(t, err)
assert.Equal(t, pgAvn.Name, pg.GetName())
assert.Equal(t, pgAvn.State, pg.Status.State)
assert.Contains(t, serviceRunningStatesAiven, pgAvn.State)
assert.Equal(t, pgAvn.Plan, pg.Spec.Plan)
assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName)
assert.Equal(t, pgAvn.MaintenanceWindow.DayOfWeek, pg.Spec.MaintenanceWindowDow)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestServiceIntegrationKafkaLogs(t *testing.T) {
ksAvn, err := avnClient.Services.Get(ctx, cfg.Project, ksName)
require.NoError(t, err)
assert.Equal(t, ksAvn.Name, ks.GetName())
assert.Equal(t, ksAvn.State, ks.Status.State)
assert.Contains(t, serviceRunningStatesAiven, ksAvn.State)
assert.Equal(t, ksAvn.Plan, ks.Spec.Plan)
assert.Equal(t, ksAvn.CloudName, ks.Spec.CloudName)

Expand Down Expand Up @@ -197,15 +197,15 @@ func TestServiceIntegrationKafkaConnect(t *testing.T) {
ksAvn, err := avnClient.Services.Get(ctx, cfg.Project, ksName)
require.NoError(t, err)
assert.Equal(t, ksAvn.Name, ks.GetName())
assert.Equal(t, ksAvn.State, ks.Status.State)
assert.Contains(t, serviceRunningStatesAiven, ksAvn.State)
assert.Equal(t, ksAvn.Plan, ks.Spec.Plan)
assert.Equal(t, ksAvn.CloudName, ks.Spec.CloudName)

// Validates KafkaConnect
kcAvn, err := avnClient.Services.Get(ctx, cfg.Project, kcName)
require.NoError(t, err)
assert.Equal(t, kcAvn.Name, kc.GetName())
assert.Equal(t, kcAvn.State, kc.Status.State)
assert.Contains(t, serviceRunningStatesAiven, kcAvn.State)
assert.Equal(t, kcAvn.Plan, kc.Spec.Plan)
assert.Equal(t, kcAvn.CloudName, kc.Spec.CloudName)
assert.Equal(t, "read_committed", *kc.Spec.UserConfig.KafkaConnect.ConsumerIsolationLevel)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestServiceIntegrationDatadog(t *testing.T) {
pgAvn, err := avnClient.Services.Get(ctx, cfg.Project, pgName)
require.NoError(t, err)
assert.Equal(t, pgAvn.Name, pg.GetName())
assert.Equal(t, pgAvn.State, pg.Status.State)
assert.Contains(t, serviceRunningStatesAiven, pgAvn.State)
assert.Equal(t, pgAvn.Plan, pg.Spec.Plan)

// Validates Datadog
Expand Down
Loading

0 comments on commit d3adb28

Please sign in to comment.