Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move schema check to allow expansions with external DCs #1273

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG/CHANGELOG-1.14.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ Changelog for the K8ssandra Operator, new PRs should update the `unreleased` sec

When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries.

## unreleased

* [BUGFIX] [#1272](https://github.com/k8ssandra/k8ssandra-operator/issues/1272) Move the schema before the CassandraDatacenter reaches the Ready state to allow migrating from an external DC

## v1.14.0 - 2024-04-02

* [FEATURE] [#1242](https://github.com/k8ssandra/k8ssandra-operator/issues/1242) Allow for creation of replicated secrets with a prefix, so that we can distinguish between multiple secrets with the same origin but targeting different clusters.
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ docker-push: ## Push docker image with the manager.
# - have enable BuildKit, More info: https://docs.docker.com/develop/develop-images/build_enhancements/
# - be able to push the image for your registry (i.e. if you do not inform a valid value via IMG=<myregistry/image:<tag>> than the export will fail)
# To properly provided solutions that supports more than one platform you should use this option.
PLATFORMS ?= linux/arm64,linux/amd64,linux/s390x,linux/ppc64le
PLATFORMS ?= linux/arm64,linux/amd64
.PHONY: docker-buildx
docker-buildx: test ## Build and push docker image for the manager for cross-platform support
docker-buildx: ## Build and push docker image for the manager for cross-platform support
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I needed to build amd64 images on my M2 pro and this was not working so I fixed it in the process.

# copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile
sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
- docker buildx create --name project-v3-builder
docker buildx use project-v3-builder
- docker buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross
- docker buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross .
- docker buildx rm project-v3-builder
rm Dockerfile.cross

Expand Down
12 changes: 12 additions & 0 deletions apis/k8ssandra/v1alpha1/k8ssandracluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ func (in *K8ssandraCluster) GetInitializedDatacenters() []CassandraDatacenterTem
return datacenters
}

func (in *K8ssandraCluster) GetHealthyDatacenters() []CassandraDatacenterTemplate {
datacenters := make([]CassandraDatacenterTemplate, 0)
if in != nil && in.Spec.Cassandra != nil {
for _, dc := range in.Spec.Cassandra.Datacenters {
if status, found := in.Status.Datacenters[dc.Meta.Name]; found && status.Cassandra.GetConditionStatus(cassdcapi.DatacenterHealthy) == corev1.ConditionTrue {
datacenters = append(datacenters, dc)
}
}
}
return datacenters
}

// SanitizedName returns a sanitized version of the name returned by CassClusterName()
func (in *K8ssandraCluster) SanitizedName() string {
return cassdcapi.CleanupForKubernetes(in.CassClusterName())
Expand Down
12 changes: 7 additions & 5 deletions controllers/k8ssandra/datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
return result.Done(), actualDcs
}
} else {
if len(actualDc.Status.NodeStatuses) != int(actualDc.Spec.Size) {
dcLogger.Info("Waiting for datacenter to have all nodes started")
return result.Done(), actualDcs
}
if recResult := r.checkSchemas(ctx, kc, actualDc, remoteClient, dcLogger); recResult.Completed() {
return recResult, actualDcs
}
if !cassandra.DatacenterReady(actualDc) {
dcLogger.Info("Waiting for datacenter to satisfy Ready condition")
return result.Done(), actualDcs
Expand All @@ -203,11 +210,6 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
actualDcs = append(actualDcs, actualDc)

if !actualDc.Spec.Stopped {

if recResult := r.checkSchemas(ctx, kc, actualDc, remoteClient, dcLogger); recResult.Completed() {
return recResult, actualDcs
}

if annotations.HasAnnotationWithValue(kc, api.RebuildDcAnnotation, dcKey.Name) {
if recResult := r.reconcileDcRebuild(ctx, kc, actualDc, remoteClient, dcLogger); recResult.Completed() {
return recResult, actualDcs
Expand Down
28 changes: 4 additions & 24 deletions controllers/k8ssandra/medusa_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,9 @@ func createMultiDcClusterWithMedusa(t *testing.T, ctx context.Context, f *framew
require.True(err != nil && errors.IsNotFound(err), "dc2 should not be created until dc1 is ready")

t.Log("update dc1 status to ready")
err = f.PatchDatacenterStatus(ctx, dc1Key, func(dc *cassdcapi.CassandraDatacenter) {
dc.Status.CassandraOperatorProgress = cassdcapi.ProgressReady
dc.SetCondition(cassdcapi.DatacenterCondition{
Type: cassdcapi.DatacenterReady,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
})
})
err = f.SetDatacenterStatusReady(ctx, dc1Key)
require.NoError(err, "failed to update dc1 status to ready")

require.Eventually(func() bool {
return f.UpdateDatacenterGeneration(ctx, t, dc1Key)
}, timeout, interval, "failed to update dc1 generation")

reconcileMedusaStandaloneDeployment(ctx, t, f, kc, "dc2", f.DataPlaneContexts[1])
t.Log("check that dc2 was created")
require.Eventually(f.DatacenterExists(ctx, dc2Key), timeout, interval)
Expand Down Expand Up @@ -290,20 +279,11 @@ func createMultiDcClusterWithMedusa(t *testing.T, ctx context.Context, f *framew
require.NoError(err, "failed to get dc2")

t.Log("update dc2 status to ready")
err = f.PatchDatacenterStatus(ctx, dc2Key, func(dc *cassdcapi.CassandraDatacenter) {
dc.Status.CassandraOperatorProgress = cassdcapi.ProgressReady
dc.SetCondition(cassdcapi.DatacenterCondition{
Type: cassdcapi.DatacenterReady,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
})
})
if err = f.SetDatacenterStatusReady(ctx, dc2Key); err != nil {
assert.Fail(t, "error setting status ready", err)
}
require.NoError(err, "failed to update dc2 status to ready")

require.Eventually(func() bool {
return f.UpdateDatacenterGeneration(ctx, t, dc2Key)
}, timeout, interval, "failed to update dc2 generation")

t.Log("check that dc2 was rebuilt")
verifyRebuildTaskCreated(ctx, t, f, dc2Key, dc1Key)
rebuildTaskKey := framework.NewClusterKey(f.DataPlaneContexts[1], kc.Namespace, "dc2-rebuild")
Expand Down
2 changes: 1 addition & 1 deletion controllers/k8ssandra/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *K8ssandraClusterReconciler) updateReplicationOfSystemKeyspaces(
return recResult
}

replication := cassandra.ComputeReplicationFromDatacenters(3, kc.Spec.ExternalDatacenters, kc.GetInitializedDatacenters()...)
replication := cassandra.ComputeReplicationFromDatacenters(3, kc.Spec.ExternalDatacenters, kc.GetHealthyDatacenters()...)

logger.Info("Preparing to update replication for system keyspaces", "replication", replication)

Expand Down
11 changes: 11 additions & 0 deletions controllers/medusa/medusatask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
})
dc.Status.NodeStatuses = cassdcapi.CassandraStatusMap{
"node1": cassdcapi.CassandraNodeStatus{
HostID: "host1",
},
"node2": cassdcapi.CassandraNodeStatus{
HostID: "host2",
},
"node3": cassdcapi.CassandraNodeStatus{
HostID: "host3",
},
}
})
require.NoError(err, "failed to patch datacenter status")

Expand Down
41 changes: 41 additions & 0 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func TestOperator(t *testing.T) {
additionalNamespaces: []string{"dc2"},
sutNamespace: "k8ssandra-operator",
}))
t.Run("AddExternalDcToCluster", e2eTest(ctx, &e2eTestOpts{
testFunc: addExternalDcToCluster,
fixture: framework.NewTestFixture("add-external-dc", controlPlane),
}))
t.Run("RemoveDcFromCluster", e2eTest(ctx, &e2eTestOpts{
testFunc: removeDcFromCluster,
fixture: framework.NewTestFixture("remove-dc", controlPlane),
Expand Down Expand Up @@ -1380,6 +1384,43 @@ func addDcToClusterSameDataplane(t *testing.T, ctx context.Context, namespace st
}
}

func addExternalDcToCluster(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework) {
require := require.New(t)

t.Log("check that dc1 is ready")

dc1Key := framework.ClusterKey{
K8sContext: f.DataPlaneContexts[0],
NamespacedName: types.NamespacedName{
Namespace: namespace,
Name: "dc1",
},
}
checkDatacenterReady(t, ctx, dc1Key, f)

t.Log("add dc2 to cluster")
// Get the IP address of the first Cassandra pod
pods, err := f.GetCassandraDatacenterPods(t, ctx, dc1Key, dc1Key.NamespacedName.Name)
require.NoError(err, "failed to get Cassandra pods")

kcKey := client.ObjectKey{Namespace: namespace, Name: "test"}
err = f.CreateExternalDc(namespace, pods[0].Status.PodIP)
require.NoError(err, "failed to create external DC")

kc := &api.K8ssandraCluster{}
err = f.Client.Get(ctx, kcKey, kc)
require.NoError(err, "failed to get K8ssandraCluster %s in namespace %s", kcKey.Name, namespace)

dc2Key := framework.ClusterKey{
K8sContext: f.DataPlaneContexts[0],
NamespacedName: types.NamespacedName{
Namespace: namespace,
Name: "dc2",
},
}
checkDatacenterReady(t, ctx, dc2Key, f)
}

func removeDcFromCluster(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework) {
require := require.New(t)
assert := assert.New(t)
Expand Down
34 changes: 34 additions & 0 deletions test/framework/e2e_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,40 @@ func (f *E2eFramework) CreateMedusaSecret(namespace string) error {
return nil
}

func (f *E2eFramework) CreateExternalDc(namespace string, seedIp string) error {
path := filepath.Join("..", "testdata", "fixtures", "add-external-dc", "k8ssandra2.yaml")

// Read the file to get the content
content, err := os.ReadFile(path)
if err != nil {
return err
}

// Replace the SEED value
updatedContent := strings.ReplaceAll(string(content), "SEED", seedIp)

// Write it back to a temporary file
tempFile, err := os.CreateTemp("", "k8ssandra2.yaml")
if err != nil {
return err
}
defer os.Remove(tempFile.Name())

_, err = tempFile.Write([]byte(updatedContent))
if err != nil {
return err
}

// Apply it
options := kubectl.Options{Namespace: namespace, Context: f.DataPlaneContexts[0]}
f.logger.Info("Create k8c with external DC", "Namespace", namespace, "Context", f.DataPlaneContexts[0])
if err := kubectl.Apply(options, tempFile.Name()); err != nil {
return err
}

return nil
}

type OperatorDeploymentConfig struct {
Namespace string
ClusterScoped bool
Expand Down
17 changes: 17 additions & 0 deletions test/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,19 @@ func (f *Framework) SetDatacenterStatusReady(ctx context.Context, key ClusterKey
Status: corev1.ConditionTrue,
LastTransitionTime: now,
})
dc.Status.SetCondition(cassdcapi.DatacenterCondition{
Type: cassdcapi.DatacenterHealthy,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
})
dc.Status.ObservedGeneration = dc.Generation
dc.Status.NodeStatuses = cassdcapi.CassandraStatusMap{}
for i := 0; i < int(dc.Spec.Size); i++ {
dc.Status.NodeStatuses[fmt.Sprintf("node%d", i)] = cassdcapi.CassandraNodeStatus{
HostID: fmt.Sprintf("host%d", i),
}

}
})
}

Expand Down Expand Up @@ -366,6 +378,11 @@ func (f *Framework) SetDatacenterStatusStopped(ctx context.Context, key ClusterK
Status: corev1.ConditionTrue,
LastTransitionTime: now,
})
dc.Status.SetCondition(cassdcapi.DatacenterCondition{
Type: cassdcapi.DatacenterHealthy,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
})
dc.Status.ObservedGeneration = dc.Generation
})
}
Expand Down
95 changes: 95 additions & 0 deletions test/testdata/fixtures/add-external-dc/cassdc1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
apiVersion: v1
kind: Secret
metadata:
name: test-superuser
data:
password: dDBwLVNlY3JldA==
username: dGVzdC1zdXBlcnVzZXI=
type: Opaque
---
apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
name: dc1
spec:
clusterName: test
config:
cassandra-env-sh:
additional-jvm-opts:
- >-
-Dcassandra.jmx.authorizer=org.apache.cassandra.auth.jmx.AuthorizationProxy
- >-
-Djava.security.auth.login.config=$CASSANDRA_HOME/conf/cassandra-jaas.config
- '-Dcassandra.jmx.remote.login.config=CassandraLogin'
- '-Dcom.sun.management.jmxremote.authenticate=true'
cassandra-yaml:
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer
auto_snapshot: false
cas_contention_timeout_in_ms: 60000
commitlog_segment_size_in_mb: 2
compaction_throughput_mb_per_sec: 0
concurrent_compactors: 1
concurrent_counter_writes: 2
concurrent_reads: 2
concurrent_writes: 2
counter_cache_size_in_mb: 0
counter_write_request_timeout_in_ms: 60000
key_cache_size_in_mb: 0
memtable_flush_writers: 1
num_tokens: 256
prepared_statements_cache_size_mb: 1
range_request_timeout_in_ms: 60000
read_request_timeout_in_ms: 60000
request_timeout_in_ms: 60000
role_manager: CassandraRoleManager
slow_query_log_timeout_in_ms: 0
sstable_preemptive_open_interval_in_mb: 0
start_rpc: false
thrift_prepared_statements_cache_size_mb: 1
truncate_request_timeout_in_ms: 60000
write_request_timeout_in_ms: 60000
jvm-options:
garbage_collector: G1GC
initial_heap_size: 402653184
max_heap_size: 402653184
networking:
hostNetwork: true
podTemplateSpec:
spec:
containers:
- env:
- name: METRIC_FILTERS
value: >-
deny:org.apache.cassandra.metrics.Table
deny:org.apache.cassandra.metrics.table
allow:org.apache.cassandra.metrics.table.live_ss_table_count
allow:org.apache.cassandra.metrics.Table.LiveSSTableCount
allow:org.apache.cassandra.metrics.table.live_disk_space_used
allow:org.apache.cassandra.metrics.table.LiveDiskSpaceUsed
allow:org.apache.cassandra.metrics.Table.Pending
allow:org.apache.cassandra.metrics.Table.Memtable
allow:org.apache.cassandra.metrics.Table.Compaction
allow:org.apache.cassandra.metrics.table.read
allow:org.apache.cassandra.metrics.table.write
allow:org.apache.cassandra.metrics.table.range
allow:org.apache.cassandra.metrics.table.coordinator
allow:org.apache.cassandra.metrics.table.dropped_mutations
- name: MANAGEMENT_API_HEAP_SIZE
value: '67108864'
name: cassandra
resources: {}
resources: {}
serverType: cassandra
serverVersion: 3.11.14
size: 2
storageConfig:
cassandraDataVolumeClaimSpec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard
superuserSecretName: test-superuser
systemLoggerResources: {}
Loading
Loading