Skip to content

Commit

Permalink
Merge branch 'main' into cronconfig
Browse files Browse the repository at this point in the history
  • Loading branch information
ericsmalling authored Jun 6, 2024
2 parents b413fb3 + d28e342 commit 1695ff4
Show file tree
Hide file tree
Showing 50 changed files with 245 additions and 182 deletions.
1 change: 1 addition & 0 deletions .github/workflows/kind_e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ jobs:
- CreateSingleDseDatacenterCluster
- CreateSingleDseSearchDatacenterCluster
- CreateSingleDseGraphDatacenterCluster
- CreateSingleHcdDatacenterCluster
- ChangeDseWorkload
- PerNodeConfig/UserDefined
- RemoveLocalDcFromCluster
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-1.17.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ When cutting a new release, update the `unreleased` heading to the tag being gen
* [BUGFIX] [#1322](https://github.com/k8ssandra/k8ssandra-operator/issues/1322) Fix bug where server-system-logger customisations from the Containers field would be overwritten when vector was enabled.
* [FEATURE] Add support for HCD 1.0
* [ENHANCEMENT] [#1329](https://github.com/k8ssandra/k8ssandra-operator/issues/1329) Add config emptyDir volume mount on Reaper deployment to allow read only root FS
* [BUGFIX] Fix HCD jvm options generation
* [ENHANCEMENT] [#1278](https://github.com/k8ssandra/k8ssandra-operator/issues/1278) Add toggle to allow the disabling of the Medusa purge CronJob creation
102 changes: 51 additions & 51 deletions apis/k8ssandra/v1alpha1/cassandraconfig_types.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions controllers/medusa/medusarestorejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func updateMedusaRestoreInitContainer(req *medusa.RestoreRequest) error {
if err := setRestoreMappingInRestoreContainer(req.RestoreJob.Status.RestoreMapping, req.Datacenter); err != nil {
return err
}

return setRestoreKeyInRestoreContainer(req.RestoreJob.Status.RestoreKey, req.Datacenter)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cassandra/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func AllowAlterRfDuringRangeMovement(dcConfig *DatacenterConfig) {
func EnableSmartTokenAllocation(template *DatacenterConfig) {
// Note: we put int64 values because even if int values can be marshaled just fine,
// Unstructured.DeepCopy() would reject them since int is not a supported json type.
if template.ServerType == api.ServerDistributionDse {
if template.ServerType == api.ServerDistributionDse || template.ServerType == api.ServerDistributionHcd {
template.CassandraConfig.CassandraYaml.PutIfAbsent("allocate_tokens_for_local_replication_factor", int64(3))
}
}
35 changes: 35 additions & 0 deletions pkg/cassandra/config_premarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func Test_preMarshalConfig(t *testing.T) {
Simple1 *string `cass-config:"*:foo/simple1"`
Simple2 bool `cass-config:"*:foo/simple2"`
SimpleDSE *bool `cass-config:"[email protected]:foo/simple/dse"`
SimpleHCD *bool `cass-config:"[email protected]:foo/simple/hcd"`
}
type komplex struct {
ManyRestrictions *string `cass-config:"^3.11.x:foo/many-restrictions-3x;cassandra@>=4.x:foo/many-restrictions-4x"`
Expand All @@ -31,6 +32,10 @@ func Test_preMarshalConfig(t *testing.T) {
ManyRestrictionsDSE *string `cass-config:">=4.x:many-restrictions-cassandra;dse@>=6.8.x:many-restrictions-dse"`
ChildRecurseDSE *simple `cass-config:"dse@*:parent/;recurse"`
}
type hcd struct {
ManyRestrictionsHCD *string `cass-config:">=4.x:many-restrictions-cassandra;dse@>=6.8.x:many-restrictions-dse;hcd@>=1.x.x:many-restrictions-hcd"`
ChildRecurseHCD *simple `cass-config:"hcd@*:parent/;recurse"`
}
type invalid1 struct {
Field1 string `cass-config:"dse@*:path:invalid tag"`
}
Expand Down Expand Up @@ -118,6 +123,14 @@ func Test_preMarshalConfig(t *testing.T) {
map[string]interface{}{"foo": map[string]interface{}{"simple": map[string]interface{}{"dse": ptr.To(true)}}},
assert.NoError,
},
{
"simple HCD",
reflect.ValueOf(&simple{SimpleHCD: ptr.To(true)}),
semver.MustParse("1.0.0"),
"hcd",
map[string]interface{}{"foo": map[string]interface{}{"simple": map[string]interface{}{"hcd": ptr.To(true)}}},
assert.NoError,
},
{
"simple server type mismatch",
reflect.ValueOf(&simple{SimpleDSE: ptr.To(true)}),
Expand Down Expand Up @@ -236,6 +249,28 @@ func Test_preMarshalConfig(t *testing.T) {
},
assert.NoError,
},
{
"complex HCD 1.0",
reflect.ValueOf(&hcd{
ManyRestrictionsHCD: ptr.To("qix"),
ChildRecurseHCD: &simple{
SimpleHCD: ptr.To(true),
},
}),
semver.MustParse("1.0.0"),
"hcd",
map[string]interface{}{
"many-restrictions-hcd": ptr.To("qix"),
"parent": map[string]interface{}{
"foo": map[string]interface{}{
"simple": map[string]interface{}{
"hcd": ptr.To(true),
},
},
},
},
assert.NoError,
},
{
"complex DSE 6.8 with cassandra server type",
reflect.ValueOf(&dse{
Expand Down
10 changes: 0 additions & 10 deletions pkg/cassandra/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,3 @@ func TestSmartTokenAllocCassandra(t *testing.T) {
_, exists := dcConfig.CassandraConfig.CassandraYaml["allocate_tokens_for_local_replication_factor"]
assert.False(t, exists, "allocate_tokens_for_local_replication_factor should not be set for Cassandra")
}

func TestSmartTokenAllocHcd(t *testing.T) {
dcConfig := &DatacenterConfig{
ServerType: api.ServerDistributionHcd,
}

EnableSmartTokenAllocation(dcConfig)
_, exists := dcConfig.CassandraConfig.CassandraYaml["allocate_tokens_for_local_replication_factor"]
assert.False(t, exists, "allocate_tokens_for_local_replication_factor should not be set for HCD")
}
8 changes: 5 additions & 3 deletions pkg/cassandra/datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ func NewDatacenter(klusterKey types.NamespacedName, template *DatacenterConfig)

dc := &cassdcapi.CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: template.Meta.Name,
Annotations: map[string]string{},
Namespace: namespace,
Name: template.Meta.Name,
Annotations: map[string]string{
cassdcapi.UpdateAllowedAnnotation: string(cassdcapi.AllowUpdateAlways),
},
Labels: utils.MergeMap(map[string]string{
api.NameLabel: api.NameLabelValue,
api.PartOfLabel: api.PartOfLabelValue,
Expand Down
27 changes: 27 additions & 0 deletions test/e2e/dse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"encoding/base64"
"fmt"
"github.com/k8ssandra/k8ssandra-operator/test/kubectl"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -75,6 +76,32 @@ func createSingleDseDatacenterCluster(t *testing.T, ctx context.Context, namespa
checkStargateK8cStatusReady(t, f, ctx, kcKey, dcKey)
}

// createSingleDseDatacenterCluster creates a K8ssandraCluster with one CassandraDatacenter running
func createSingleHcdDatacenterCluster(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework) {
t.Log("check that the K8ssandraCluster was created")
kc := &api.K8ssandraCluster{}
kcKey := types.NamespacedName{Namespace: namespace, Name: "test"}
err := f.Client.Get(ctx, kcKey, kc)
require.NoError(t, err, "failed to get K8ssandraCluster in namespace %s", namespace)
dcKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[0], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}}
checkDatacenterReady(t, ctx, dcKey, f)
checkDatacenterHasHeapSizeSet(t, ctx, dcKey, f)
assertCassandraDatacenterK8cStatusReady(ctx, t, f, kcKey, dcKey.Name)
dcPrefix := DcPrefix(t, f, dcKey)

t.Log("Check that we can communicate through CQL with HCD")
_, err = f.ExecuteCql(ctx, f.DataPlaneContexts[0], namespace, kc.SanitizedName(), dcPrefix+"-default-sts-0",
"SELECT * FROM system.local")
require.NoError(t, err, "failed to execute CQL query against HCD", err)
opts := kubectl.Options{
Namespace: namespace,
Context: f.DataPlaneContexts[0],
}
output, err := kubectl.Exec(opts, dcPrefix+"-default-sts-0", "ps", "-aux")
require.NoError(t, err, "failed to execute ps command")
assert.Contains(t, output, "java -Dhcd.server_process -Xms536870912 -Xmx536870912", "expected heap size to be set to 512M")
}

// createSingleDseSearchDatacenterCluster creates a K8ssandraCluster with one CassandraDatacenter running with search enabled
func createSingleDseSearchDatacenterCluster(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework) {
t.Log("check that the K8ssandraCluster was created")
Expand Down
103 changes: 23 additions & 80 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,23 @@ func TestOperator(t *testing.T) {
t.Run("CreateSingleDseDatacenterCluster", e2eTest(ctx, &e2eTestOpts{
testFunc: createSingleDseDatacenterCluster,
fixture: framework.NewTestFixture("single-dc-dse", controlPlane),
dse: true,
}))
t.Run("CreateSingleHcdDatacenterCluster", e2eTest(ctx, &e2eTestOpts{
testFunc: createSingleHcdDatacenterCluster,
fixture: framework.NewTestFixture("single-dc-hcd", controlPlane),
}))
t.Run("CreateSingleDseSearchDatacenterCluster", e2eTest(ctx, &e2eTestOpts{
testFunc: createSingleDseSearchDatacenterCluster,
fixture: framework.NewTestFixture("single-dc-dse-search", controlPlane),
dse: true,
installMinio: true,
}))
t.Run("CreateSingleDseGraphDatacenterCluster", e2eTest(ctx, &e2eTestOpts{
testFunc: createSingleDseGraphDatacenterCluster,
fixture: framework.NewTestFixture("single-dc-dse-graph", controlPlane),
dse: true,
}))
t.Run("ChangeDseWorkload", e2eTest(ctx, &e2eTestOpts{
testFunc: changeDseWorkload,
fixture: framework.NewTestFixture("single-dc-dse", controlPlane),
dse: true,
}))
t.Run("CreateStargateAndDatacenter", e2eTest(ctx, &e2eTestOpts{
testFunc: createStargateAndDatacenter,
Expand Down Expand Up @@ -450,9 +450,6 @@ type e2eTestOpts struct {
// an upgrade test.
initialVersion *string

// dse is used to specify if the e2e tests will run against DSE or Cassandra
dse bool

// installMinio is used to specify if the e2e tests will require to install Minio before creating the k8c object.
installMinio bool
}
Expand All @@ -462,7 +459,7 @@ type e2eTestFunc func(t *testing.T, ctx context.Context, namespace string, f *fr
func e2eTest(ctx context.Context, opts *e2eTestOpts) func(*testing.T) {
return func(t *testing.T) {

f, err := framework.NewE2eFramework(t, kubeconfigFile, opts.dse, controlPlane, dataPlanes...)
f, err := framework.NewE2eFramework(t, kubeconfigFile, controlPlane, dataPlanes...)
if err != nil {
t.Fatalf("failed to initialize test framework: %v", err)
}
Expand Down Expand Up @@ -604,7 +601,7 @@ func beforeTest(t *testing.T, f *framework.E2eFramework, opts *e2eTestOpts) erro
return nil
}

func upgradeToLatest(t *testing.T, ctx context.Context, f *framework.E2eFramework, namespace string) error {
func upgradeToLatest(t *testing.T, ctx context.Context, f *framework.E2eFramework, namespace, dcPrefix string) error {
deploymentConfig := framework.OperatorDeploymentConfig{
Namespace: namespace,
ClusterScoped: false,
Expand Down Expand Up @@ -716,7 +713,7 @@ func applyPollingDefaults() {
polling.medusaBackupDone.timeout = 10 * time.Minute
polling.medusaBackupDone.interval = 15 * time.Second

polling.medusaRestoreDone.timeout = 10 * time.Minute
polling.medusaRestoreDone.timeout = 15 * time.Minute
polling.medusaRestoreDone.interval = 15 * time.Second

polling.datacenterUpdating.timeout = 1 * time.Minute
Expand Down Expand Up @@ -955,51 +952,9 @@ func createSingleDatacenterClusterWithUpgrade(t *testing.T, ctx context.Context,
assertCassandraDatacenterK8cStatusReady(ctx, t, f, kcKey, dcKey.Name)
dcPrefix := DcPrefix(t, f, dcKey)

stargateKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[0], NamespacedName: types.NamespacedName{Namespace: namespace, Name: dcPrefix + "-stargate"}}
checkStargateReady(t, f, ctx, stargateKey)
checkStargateK8cStatusReady(t, f, ctx, kcKey, dcKey)

// Save the Stargate deployment resource hash to verify if it was modified by the upgrade.
// It'll allow to wait for the pod to be successfully upgraded before performing the Stargate API tests.
stargateDeploymentKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[0], NamespacedName: types.NamespacedName{Namespace: namespace, Name: dcPrefix + "-default-stargate-deployment"}}
initialStargateResourceHash := GetStargateResourceHash(t, f, ctx, stargateDeploymentKey)
initialStargatePodNames := GetStargatePodNames(t, f, ctx, stargateDeploymentKey)
require.Len(initialStargatePodNames, 1, "expected 1 Stargate pod in namespace %s", namespace)

t.Log("retrieve database credentials")
username, password, err := f.RetrieveDatabaseCredentials(ctx, f.DataPlaneContexts[0], namespace, k8ssandra.SanitizedName())
require.NoError(err, "failed to retrieve database credentials")

t.Log("deploying Stargate ingress routes in context", f.DataPlaneContexts[0])
stargateRestHostAndPort := ingressConfigs[f.DataPlaneContexts[0]].StargateRest
stargateGrpcHostAndPort := ingressConfigs[f.DataPlaneContexts[0]].StargateGrpc
stargateCqlHostAndPort := ingressConfigs[f.DataPlaneContexts[0]].StargateCql
f.DeployStargateIngresses(t, f.DataPlaneContexts[0], namespace, dcPrefix+"-stargate-service", stargateRestHostAndPort, stargateGrpcHostAndPort)
defer f.UndeployAllIngresses(t, f.DataPlaneContexts[0], namespace)

// Perform the upgrade
err = upgradeToLatest(t, ctx, f, namespace)
err = upgradeToLatest(t, ctx, f, namespace, dcPrefix)
require.NoError(err, "failed to upgrade to latest version")

// Wait for the Stargate deployment resource hash to change.
// It'll allow to wait for the pod to be successfully upgraded before performing the Stargate API tests.
// It's possible that this assertion will fail if the Stargate deployment resource hash is not changed.
newStargateResourceHash := waitForStargateUpgrade(t, f, ctx, stargateDeploymentKey, initialStargateResourceHash)

t.Logf("Stargate initial deployment resource hash: %s / Current hash: %s", initialStargateResourceHash, newStargateResourceHash)
if initialStargateResourceHash != newStargateResourceHash {
// Stargate deployment was modified after the upgrade, we need to wait for the new pod to be ready
t.Log("Stargate deployment updated, waiting for new pod to be ready")
require.Eventually(func() bool {
newStargatePodNames := GetStargatePodNames(t, f, ctx, stargateDeploymentKey)
return !utils.SliceContains(newStargatePodNames, initialStargatePodNames[0])
}, polling.stargateReady.timeout, polling.stargateReady.interval)
}

checkStargateApisReachable(t, ctx, f.DataPlaneContexts[0], namespace, dcPrefix, stargateRestHostAndPort, stargateGrpcHostAndPort, stargateCqlHostAndPort, username, password, false, f)

replication := map[string]int{DcName(t, f, dcKey): 1}
testStargateApis(t, f, ctx, f.DataPlaneContexts[0], namespace, dcPrefix, username, password, false, replication)
}

// createSingleDatacenterCluster creates a K8ssandraCluster with one CassandraDatacenter
Expand Down Expand Up @@ -1839,6 +1794,20 @@ func checkDatacenterReady(t *testing.T, ctx context.Context, key framework.Clust
}), polling.datacenterReady.timeout, polling.datacenterReady.interval, fmt.Sprintf("timed out waiting for datacenter %s to become ready", key.Name))
}

func checkDatacenterHasHeapSizeSet(t *testing.T, ctx context.Context, key framework.ClusterKey, f *framework.E2eFramework) {
t.Logf("check that datacenter %s in cluster %s has its heap size set", key.Name, key.K8sContext)
withDatacenter := f.NewWithDatacenter(ctx, key)
require.Eventually(t, withDatacenter(func(dc *cassdcapi.CassandraDatacenter) bool {
dcConfig, err := utils.UnmarshalToMap(dc.Spec.Config)
if err != nil {
t.Logf("failed to unmarshal datacenter %s config: %v", key.Name, err)
return false
}
initialHeapSize := dcConfig["jvm-server-options"].(map[string]interface{})["initial_heap_size"].(float64)
return initialHeapSize > 0
}), 10*time.Second, 1*time.Second, fmt.Sprintf("timed out waiting for datacenter %s to become ready", key.Name))
}

func checkDatacenterUpdating(t *testing.T, ctx context.Context, key framework.ClusterKey, f *framework.E2eFramework) {
t.Logf("check that datacenter %s in cluster %s is updating", key.Name, key.K8sContext)
withDatacenter := f.NewWithDatacenter(ctx, key)
Expand Down Expand Up @@ -1948,7 +1917,7 @@ func checkKeyspaceExists(
ctx context.Context,
k8sContext, namespace, clusterName, pod, keyspace string,
) {
assert.Eventually(t, func() bool {
require.Eventually(t, func() bool {
keyspaces, err := f.ExecuteCql(ctx, k8sContext, namespace, clusterName, pod, "describe keyspaces")
if err != nil {
t.Logf("failed to describe keyspaces: %v", err)
Expand Down Expand Up @@ -2108,32 +2077,6 @@ func DcName(
return cassdc.DatacenterName()
}

func waitForStargateUpgrade(t *testing.T, f *framework.E2eFramework, ctx context.Context, stargateDeploymentKey framework.ClusterKey, initialStargateResourceHash string) string {
stargateChan := make(chan string)
go func() {
for {
stargateDeploymentResourceHash := GetStargateResourceHash(t, f, ctx, stargateDeploymentKey)
if stargateDeploymentResourceHash != initialStargateResourceHash {
stargateChan <- stargateDeploymentResourceHash
return
}
time.Sleep(time.Second)
}
}()

stargateUpgradeTimeout := time.After(5 * time.Minute)
for {
select {
case newStargateResourceHash := <-stargateChan:
t.Logf("Stargate deployment resource hash changed to %s", newStargateResourceHash)
return newStargateResourceHash
case <-stargateUpgradeTimeout:
t.Log("Stargate deployment resource hash did not change")
return initialStargateResourceHash
}
}
}

func checkMetricsFiltersAbsence(t *testing.T, ctx context.Context, f *framework.E2eFramework, dcKey framework.ClusterKey) error {
t.Logf("check that metric filters are absent on dc %s in cluster %s", dcKey.Name, dcKey.K8sContext)
cassdc := &cassdcapi.CassandraDatacenter{}
Expand Down
9 changes: 3 additions & 6 deletions test/framework/e2e_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,14 @@ var (
nodeToolStatusDN = regexp.MustCompile(`DN\s\s`)
)

func NewE2eFramework(t *testing.T, kubeconfigFile string, useDse bool, controlPlane string, dataPlanes ...string) (*E2eFramework, error) {
func NewE2eFramework(t *testing.T, kubeconfigFile string, controlPlane string, dataPlanes ...string) (*E2eFramework, error) {
config, err := clientcmd.LoadFromFile(kubeconfigFile)
if err != nil {
return nil, err
}

// Specify if DSE is used to adjust paths to binaries (cqlsh, ...)
cqlshBinLocation := "/opt/cassandra/bin/cqlsh"
if useDse {
cqlshBinLocation = "/opt/dse/bin/cqlsh"
}
cqlshBin := "cqlsh"

remoteClients := make(map[string]client.Client, 0)
t.Logf("Using config file: %s", kubeconfigFile)
Expand Down Expand Up @@ -98,7 +95,7 @@ func NewE2eFramework(t *testing.T, kubeconfigFile string, useDse bool, controlPl

f := NewFramework(remoteClients[controlPlane], controlPlane, validDataPlanes, remoteClients)

return &E2eFramework{Framework: f, cqlshBin: cqlshBinLocation}, nil
return &E2eFramework{Framework: f, cqlshBin: cqlshBin}, nil
}

func newRemoteClient(config *clientcmdapi.Config, context string) (client.Client, error) {
Expand Down
1 change: 1 addition & 0 deletions test/testdata/fixtures/add-dc-cass-only/k8ssandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
spec:
cassandra:
serverVersion: "4.1.0"
serverImage: "k8ssandra/cass-management-api:4.1.0"
storageConfig:
cassandraDataVolumeClaimSpec:
storageClassName: standard
Expand Down
1 change: 1 addition & 0 deletions test/testdata/fixtures/add-dc/k8ssandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
spec:
cassandra:
serverVersion: "3.11.14"
serverImage: "k8ssandra/cass-management-api:3.11.14"
storageConfig:
cassandraDataVolumeClaimSpec:
storageClassName: standard
Expand Down
1 change: 1 addition & 0 deletions test/testdata/fixtures/add-external-dc/cassdc1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ spec:
resources: {}
serverType: cassandra
serverVersion: 3.11.14
serverImage: "k8ssandra/cass-management-api:3.11.14"
size: 2
storageConfig:
cassandraDataVolumeClaimSpec:
Expand Down
Loading

0 comments on commit 1695ff4

Please sign in to comment.