diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index d02703b3..ffc1b701 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -81,6 +81,7 @@ const ( CommandGarbageCollect CassandraCommand = "garbagecollect" CommandFlush CassandraCommand = "flush" CommandRefresh CassandraCommand = "refresh" + CommandTSReload CassandraCommand = "tsreload" ) type CassandraJob struct { diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 45bd78fd..6aaa5627 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -98,6 +98,7 @@ type TaskConfiguration struct { // Execution functionality per pod AsyncFeature httphelper.Feature AsyncFunc AsyncTaskExecutorFunc + SyncFeature httphelper.Feature SyncFunc SyncTaskExecutorFunc PodFilter PodFilterFunc @@ -317,6 +318,8 @@ JobDefinition: } completed = taskConfig.Completed break JobDefinition + case api.CommandTSReload: + inodeTsReload(taskConfig) default: err = fmt.Errorf("unknown job command: %s", job.Command) return ctrl.Result{}, err @@ -647,13 +650,9 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc if !taskConfig.Filter(&pod) { continue } - - features := &httphelper.FeatureSet{} - if taskConfig.AsyncFeature != "" { - features, err = nodeMgmtClient.FeatureSet(&pod) - if err != nil { - return ctrl.Result{}, failed, completed, errMsg, err - } + features, err := nodeMgmtClient.FeatureSet(&pod) + if err != nil { + return ctrl.Result{}, failed, completed, errMsg, err } if pod.Annotations == nil { @@ -786,6 +785,16 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc return ctrl.Result{}, failed, completed, errMsg, err } + if taskConfig.SyncFeature != "" { + if !features.Supports(taskConfig.SyncFeature) { + logger.Error(err, "Pod doesn't support this feature", "Pod", pod, "Feature", taskConfig.SyncFeature) + jobStatus.Status = podJobError + failed++ + errMsg = fmt.Sprintf("Pod %s doesn't support %s feature", pod.Name, taskConfig.SyncFeature) + return ctrl.Result{}, failed, completed, errMsg, err + } + } + jobId := strconv.Itoa(idx) // This pod should run next, mark it diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 8e1396ef..9e722fd4 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -228,6 +228,18 @@ func waitForTaskCompletion(taskKey types.NamespacedName) *api.CassandraTask { return emptyTask } +func waitForTaskFailed(taskKey types.NamespacedName) *api.CassandraTask { + var emptyTask *api.CassandraTask + Eventually(func() bool { + emptyTask = &api.CassandraTask{} + err := k8sClient.Get(context.TODO(), taskKey, emptyTask) + Expect(err).ToNot(HaveOccurred()) + + return emptyTask.Status.Failed > 0 + }, time.Duration(5*time.Second)).Should(BeTrue()) + return emptyTask +} + var _ = Describe("CassandraTask controller tests", func() { Describe("Execute jobs against all pods", func() { JobRunningRequeue = time.Duration(1 * time.Millisecond) @@ -622,6 +634,47 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + It("Runs a ts reload task against a pod and fails", func() { + By("Creating a task for tsreload") + taskKey, task := buildTask(api.CommandTSReload, testNamespaceName) + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskFailed(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/node/encryption/internode/truststore/reload"]).To(Equal(0)) // This doesn't get called because the test of whether the feature exists doesn't pass. The features endpoint doesn't exist in this mock server. + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 0)) + Expect(completedTask.Status.Failed).To(BeNumerically(">", 0)) + }) + }) + Context("successful SyncFeature usage", func() { + var testNamespaceName string + BeforeEach(func() { + By("Creating fake synchronous mgmt-api server") + var err error + callDetails = httphelper.NewCallDetails() + mockServer, err = httphelper.FakeServerWithSyncFeaturesEndpoint(callDetails) + testNamespaceName = fmt.Sprintf("test-sync-task-%d", rand.Int31()) + Expect(err).ToNot(HaveOccurred()) + mockServer.Start() + By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName)) + }) + It("Runs a ts reload task against a pod", func() { + By("Creating a task for tsreload") + taskKey, task := buildTask(api.CommandTSReload, testNamespaceName) + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/node/encryption/internode/truststore/reload"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 0)) // This doesn't get called because the test of whether the feature exists doesn't pass. + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) }) diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index 3fa41f7f..591d902f 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -442,6 +442,19 @@ func (r *CassandraTaskReconciler) refreshDatacenter(ctx context.Context, dc *cas } } return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil + +} + +// ts reload functionality + +func inodeTsReload(taskConfig *TaskConfiguration) { + taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = inodeTsReloadSync + taskConfig.SyncFeature = httphelper.ReloadInodeTruststore +} + +func inodeTsReloadSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + return nodeMgmtClient.CallinodeTsReloadEndpoint(pod) } // Common functions diff --git a/internal/envtest/fake_mgmtapi.go b/internal/envtest/fake_mgmtapi.go index 4b4fa212..36b90af6 100644 --- a/internal/envtest/fake_mgmtapi.go +++ b/internal/envtest/fake_mgmtapi.go @@ -84,6 +84,8 @@ func FakeServer(cli client.Client, logger logr.Logger, podKey types.NamespacedNa w.WriteHeader(http.StatusOK) case "/api/v0/ops/keyspace/cleanup": w.WriteHeader(http.StatusOK) + case "/api/v0/ops/node/encryption/internode/truststore/reload": + w.WriteHeader(http.StatusOK) default: w.WriteHeader(http.StatusNotFound) } diff --git a/pkg/httphelper/client.go b/pkg/httphelper/client.go index f2b9603c..4b731fca 100644 --- a/pkg/httphelper/client.go +++ b/pkg/httphelper/client.go @@ -140,6 +140,7 @@ const ( Move Feature = "async_move_task" AsyncGarbageCollect Feature = "async_gc_task" AsyncFlush Feature = "async_flush_task" + ReloadInodeTruststore Feature = "reload_internode_truststore" ) func (f *FeatureSet) UnmarshalJSON(b []byte) error { @@ -585,6 +586,33 @@ func (client *NodeMgmtClient) CallCompactionEndpoint(pod *corev1.Pod, compactReq return nil } +// CallTSReloadEndpoint calls the async version of TSReload +func (client *NodeMgmtClient) CallinodeTsReloadEndpoint(pod *corev1.Pod) error { + client.Log.Info( + "calling Management API TS REload endpoint - POST /api/v0/node/encryption/internode/truststore/reload", + "pod", pod.Name, + ) + podHost, podPort, err := BuildPodHostFromPod(pod) + if err != nil { + return err + } + + request := nodeMgmtRequest{ + endpoint: "/api/v0/ops/node/encryption/internode/truststore/reload", + host: podHost, + port: podPort, + method: http.MethodPost, + timeout: 60 * time.Second, + } + + _, err = callNodeMgmtEndpoint(client, request, "application/json") + if err != nil { + return err + } + + return nil +} + type ScrubRequest struct { DisableSnapshot bool `json:"disable_snapshot"` SkipCorrupted bool `json:"skip_corrupted"` diff --git a/pkg/httphelper/server_test_utils.go b/pkg/httphelper/server_test_utils.go index 26cf8909..37ea0679 100644 --- a/pkg/httphelper/server_test_utils.go +++ b/pkg/httphelper/server_test_utils.go @@ -20,7 +20,8 @@ var featuresReply = `{ "async_gc_task", "async_flush_task", "async_scrub_task", - "async_compaction_task" + "async_compaction_task", + "reload_internode_truststore" ] }` @@ -128,10 +129,32 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser })) } +func FakeServerWithSyncFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) { + return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := url.ParseQuery(r.URL.RawQuery) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + } + if r.Method == http.MethodGet && r.RequestURI == "/api/v0/metadata/versions/features" { + w.WriteHeader(http.StatusOK) + _, err = w.Write([]byte(featuresReply)) + } else if r.Method == http.MethodPost && r.URL.Path == "/api/v0/ops/node/encryption/internode/truststore/reload" { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusNotFound) + } + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + })) +} + func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) { return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") { w.WriteHeader(http.StatusOK) + } else if r.Method == http.MethodPost && r.URL.Path == "/api/v0/ops/node/encryption/internode/truststore/reload" { + w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusNotFound) }