Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

truststore reload task #686

Merged
merged 22 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3e8f962
Make sure build dir is in .gitignore.
Miles-Garnsey Aug 19, 2024
0ad4b43
Add reload truststore cassandraTask.
Miles-Garnsey Aug 21, 2024
b306f1a
Remove async function, it is unused.
Miles-Garnsey Aug 21, 2024
427f00d
Fix comment, add codegen stuff.
Miles-Garnsey Aug 21, 2024
deb3ea7
Undo gitignore change.
Miles-Garnsey Aug 21, 2024
db54a5c
Fix unit test.
Miles-Garnsey Aug 21, 2024
d47aab7
Fix path for management API url.
Miles-Garnsey Aug 21, 2024
0bd0b24
Ensure the tsreload happens only against one pod.
Miles-Garnsey Aug 21, 2024
4ff3b04
Make sure we're using POST as method.
Miles-Garnsey Aug 21, 2024
64465e0
Rename truststore reload endpoints per Micke's suggestions.
Miles-Garnsey Aug 22, 2024
0b7c1c1
Ensure the reload task is in the FakeExecutorServerWithDetails.
Miles-Garnsey Aug 22, 2024
26f1f78
Add endpoint to the right place in the tests.
Miles-Garnsey Aug 28, 2024
a10bae9
Rename several variables to reflect that we might later have client t…
Miles-Garnsey Aug 28, 2024
5b4c3a8
Error handling for when a pod does not support the new ts reload func…
Miles-Garnsey Aug 28, 2024
2c985c7
Implement some of Micke's feedback.
Miles-Garnsey Aug 28, 2024
f626539
Add more rigorous tests.
Miles-Garnsey Aug 28, 2024
09b2e39
Fix all tests so we test failed features endpoint for a sync feature,…
Miles-Garnsey Aug 29, 2024
510e147
Refer to tsreload using more dsetool like language.
Miles-Garnsey Aug 29, 2024
a0fb94f
Fix issue where returned stuff from goroutine was getting missed.
Miles-Garnsey Aug 30, 2024
1472387
Micke's suggestion for additional test validation.
Miles-Garnsey Aug 30, 2024
75c938e
Working to get task to fail.
Miles-Garnsey Aug 30, 2024
e55a640
Ensure that waitForTaskFailed works as expected.
Miles-Garnsey Aug 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/control/v1alpha1/cassandratask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
CommandGarbageCollect CassandraCommand = "garbagecollect"
CommandFlush CassandraCommand = "flush"
CommandRefresh CassandraCommand = "refresh"
CommandTSReload CassandraCommand = "tsreload"
)

type CassandraJob struct {
Expand Down
23 changes: 16 additions & 7 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type TaskConfiguration struct {
// Execution functionality per pod
AsyncFeature httphelper.Feature
AsyncFunc AsyncTaskExecutorFunc
SyncFeature httphelper.Feature
SyncFunc SyncTaskExecutorFunc
PodFilter PodFilterFunc

Expand Down Expand Up @@ -317,6 +318,8 @@ JobDefinition:
}
completed = taskConfig.Completed
break JobDefinition
case api.CommandTSReload:
inodeTsReload(taskConfig)
default:
err = fmt.Errorf("unknown job command: %s", job.Command)
return ctrl.Result{}, err
Expand Down Expand Up @@ -647,13 +650,9 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
if !taskConfig.Filter(&pod) {
continue
}

features := &httphelper.FeatureSet{}
if taskConfig.AsyncFeature != "" {
features, err = nodeMgmtClient.FeatureSet(&pod)
if err != nil {
return ctrl.Result{}, failed, completed, errMsg, err
}
features, err := nodeMgmtClient.FeatureSet(&pod)
if err != nil {
return ctrl.Result{}, failed, completed, errMsg, err
}

if pod.Annotations == nil {
Expand Down Expand Up @@ -786,6 +785,16 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
return ctrl.Result{}, failed, completed, errMsg, err
}

if taskConfig.SyncFeature != "" {
if !features.Supports(taskConfig.SyncFeature) {
logger.Error(err, "Pod doesn't support this feature", "Pod", pod, "Feature", taskConfig.SyncFeature)
jobStatus.Status = podJobError
failed++
errMsg = fmt.Sprintf("Pod %s doesn't support %s feature", pod.Name, taskConfig.SyncFeature)
return ctrl.Result{}, failed, completed, errMsg, err
}
}

jobId := strconv.Itoa(idx)

// This pod should run next, mark it
Expand Down
53 changes: 53 additions & 0 deletions internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ func waitForTaskCompletion(taskKey types.NamespacedName) *api.CassandraTask {
return emptyTask
}

func waitForTaskFailed(taskKey types.NamespacedName) *api.CassandraTask {
var emptyTask *api.CassandraTask
Eventually(func() bool {
emptyTask = &api.CassandraTask{}
err := k8sClient.Get(context.TODO(), taskKey, emptyTask)
Expect(err).ToNot(HaveOccurred())

return emptyTask.Status.Failed > 0
}, time.Duration(5*time.Second)).Should(BeTrue())
return emptyTask
}

var _ = Describe("CassandraTask controller tests", func() {
Describe("Execute jobs against all pods", func() {
JobRunningRequeue = time.Duration(1 * time.Millisecond)
Expand Down Expand Up @@ -622,6 +634,47 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})
It("Runs a ts reload task against a pod and fails", func() {
By("Creating a task for tsreload")
taskKey, task := buildTask(api.CommandTSReload, testNamespaceName)
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskFailed(taskKey)

Expect(callDetails.URLCounts["/api/v0/ops/node/encryption/internode/truststore/reload"]).To(Equal(0)) // This doesn't get called because the test of whether the feature exists doesn't pass. The features endpoint doesn't exist in this mock server.

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 0))
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
Expect(completedTask.Status.Failed).To(BeNumerically(">", 0))
})
})
Context("successful SyncFeature usage", func() {
var testNamespaceName string
BeforeEach(func() {
By("Creating fake synchronous mgmt-api server")
var err error
callDetails = httphelper.NewCallDetails()
mockServer, err = httphelper.FakeServerWithSyncFeaturesEndpoint(callDetails)
testNamespaceName = fmt.Sprintf("test-sync-task-%d", rand.Int31())
Expect(err).ToNot(HaveOccurred())
mockServer.Start()
By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName))
})
It("Runs a ts reload task against a pod", func() {
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
By("Creating a task for tsreload")
taskKey, task := buildTask(api.CommandTSReload, testNamespaceName)
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v0/ops/node/encryption/internode/truststore/reload"]).To(Equal(1))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 0)) // This doesn't get called because the test of whether the feature exists doesn't pass.
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})
Expand Down
13 changes: 13 additions & 0 deletions internal/controllers/control/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,19 @@ func (r *CassandraTaskReconciler) refreshDatacenter(ctx context.Context, dc *cas
}
}
return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil

}

// ts reload functionality

func inodeTsReload(taskConfig *TaskConfiguration) {
taskConfig.PodFilter = genericPodFilter
taskConfig.SyncFunc = inodeTsReloadSync
taskConfig.SyncFeature = httphelper.ReloadInodeTruststore
}

func inodeTsReloadSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error {
return nodeMgmtClient.CallinodeTsReloadEndpoint(pod)
}

// Common functions
Expand Down
2 changes: 2 additions & 0 deletions internal/envtest/fake_mgmtapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func FakeServer(cli client.Client, logger logr.Logger, podKey types.NamespacedNa
w.WriteHeader(http.StatusOK)
case "/api/v0/ops/keyspace/cleanup":
w.WriteHeader(http.StatusOK)
case "/api/v0/ops/node/encryption/internode/truststore/reload":
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusNotFound)
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/httphelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const (
Move Feature = "async_move_task"
AsyncGarbageCollect Feature = "async_gc_task"
AsyncFlush Feature = "async_flush_task"
ReloadInodeTruststore Feature = "reload_internode_truststore"
)

func (f *FeatureSet) UnmarshalJSON(b []byte) error {
Expand Down Expand Up @@ -585,6 +586,33 @@ func (client *NodeMgmtClient) CallCompactionEndpoint(pod *corev1.Pod, compactReq
return nil
}

// CallTSReloadEndpoint calls the async version of TSReload
func (client *NodeMgmtClient) CallinodeTsReloadEndpoint(pod *corev1.Pod) error {
client.Log.Info(
"calling Management API TS REload endpoint - POST /api/v0/node/encryption/internode/truststore/reload",
"pod", pod.Name,
)
podHost, podPort, err := BuildPodHostFromPod(pod)
if err != nil {
return err
}

request := nodeMgmtRequest{
endpoint: "/api/v0/ops/node/encryption/internode/truststore/reload",
host: podHost,
port: podPort,
method: http.MethodPost,
timeout: 60 * time.Second,
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
}

_, err = callNodeMgmtEndpoint(client, request, "application/json")
if err != nil {
return err
}

return nil
}

type ScrubRequest struct {
DisableSnapshot bool `json:"disable_snapshot"`
SkipCorrupted bool `json:"skip_corrupted"`
Expand Down
25 changes: 24 additions & 1 deletion pkg/httphelper/server_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var featuresReply = `{
"async_gc_task",
"async_flush_task",
"async_scrub_task",
"async_compaction_task"
"async_compaction_task",
"reload_internode_truststore"
]
}`

Expand Down Expand Up @@ -128,10 +129,32 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser
}))
}

func FakeServerWithSyncFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) {
return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := url.ParseQuery(r.URL.RawQuery)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
if r.Method == http.MethodGet && r.RequestURI == "/api/v0/metadata/versions/features" {
w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte(featuresReply))
} else if r.Method == http.MethodPost && r.URL.Path == "/api/v0/ops/node/encryption/internode/truststore/reload" {
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}))
}

func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) {
return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") {
w.WriteHeader(http.StatusOK)
} else if r.Method == http.MethodPost && r.URL.Path == "/api/v0/ops/node/encryption/internode/truststore/reload" {
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
Expand Down
Loading