Skip to content

Commit

Permalink
Add compaction and scrub tasks with tests. Improve testing in control…
Browse files Browse the repository at this point in the history
… tasks envtests by allowing to verify the payload sent to the management-api
  • Loading branch information
burmanm committed Oct 16, 2023
1 parent 277d9b1 commit f9388c9
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 16 deletions.
3 changes: 3 additions & 0 deletions apis/control/v1alpha1/cassandratask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type JobArguments struct {
SkipCorrupted bool `json:"skip_corrupted,omitempty"`

// Compaction arguments
SplitOutput bool `json:"split_output,omitempty"`
StartToken string `json:"start_token,omitempty"`
EndToken string `json:"end_token,omitempty"`

// NewTokens is a map of pod names to their newly-assigned tokens. Required for the move
// command, ignored otherwise. Pods referenced in this map must exist; any existing pod not
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/control.k8ssandra.io_cassandratasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ spec:
args:
description: Arguments are additional parameters for the command
properties:
end_token:
type: string
jobs:
type: integer
keyspace_name:
Expand All @@ -133,6 +135,11 @@ spec:
type: boolean
source_datacenter:
type: string
split_output:
description: Compaction arguments
type: boolean
start_token:
type: string
tables:
items:
type: string
Expand Down
3 changes: 0 additions & 3 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,9 @@ JobDefinition:
logger.V(1).Info("This job isn't complete yet or there's an error, requeueing", "requeueAfter", res.RequeueAfter)
break
}
// completedCount++
logger.V(1).Info("We're returning from this", "completed", completed, "failed", failed)
}

if res.RequeueAfter == 0 && !res.Requeue {
logger.V(1).Info("No requeues queued..", "completed", completed, "failed", failed, "jobCount", len(cassTask.Spec.Jobs))
// Job has been completed
cassTask.GetLabels()[taskStatusLabel] = completedTaskLabelValue
if errUpdate := r.Client.Update(ctx, &cassTask); errUpdate != nil {
Expand Down
68 changes: 59 additions & 9 deletions internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package control

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http/httptest"
Expand Down Expand Up @@ -259,7 +260,7 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount))

Expect(len(completedTask.Status.Conditions)).To(Equal(2))
for _, cond := range completedTask.Status.Conditions {
Expand All @@ -282,8 +283,7 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount))
})
It("Runs a node move task against the datacenter pods", func() {
By("Creating a task for move")
Expand All @@ -305,7 +305,7 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3))

Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 3))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 3))
})
It("Runs a flush task against the datacenter pods", func() {
By("Creating a task for flush")
Expand All @@ -320,7 +320,7 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount))
})
It("Runs a flush task against a pod", func() {
By("Creating a task for flush")
Expand All @@ -336,7 +336,7 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1))
})

It("Runs a garbagecollect task against the datacenter pods", func() {
Expand All @@ -351,8 +351,58 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount))
})

It("Runs a scrub task against a pod", func() {
By("Creating a task for scrub")

taskKey, task := buildTask(api.CommandScrub, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
task.Spec.Jobs[0].Arguments.NoValidate = false
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v1/ops/tables/scrub"]).To(Equal(1))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1))

// Payloads should be of type ScrubRequest
var sreq httphelper.ScrubRequest
Expect(json.Unmarshal(callDetails.Payloads[0], &sreq)).Should(Succeed())
Expect(sreq.CheckData).To(BeTrue())
Expect(sreq.KeyspaceName).To(Equal("ks1"))
})

FIt("Runs a compaction task against a pod", func() {
By("Creating a task for compaction")

taskKey, task := buildTask(api.CommandCompaction, testNamespaceName)
task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1"
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
task.Spec.Jobs[0].Arguments.Tables = []string{"table1"}
task.Spec.Jobs[0].Arguments.SplitOutput = true

Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v1/ops/tables/compact"]).To(Equal(1))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1))

// Payloads should be of type CompactRequest
var req httphelper.CompactRequest
Expect(json.Unmarshal(callDetails.Payloads[0], &req)).Should(Succeed())
Expect(req.KeyspaceName).To(Equal("ks1"))
Expect(req.SplitOutput).To(BeTrue())
Expect(len(req.Tables)).To(BeNumerically("==", 1))
})

When("Running cleanup twice in the same datacenter", func() {
Expand Down Expand Up @@ -448,7 +498,7 @@ var _ = Describe("CassandraTask controller tests", func() {
})
})
})
FContext("Sync jobs", func() {
Context("Sync jobs", func() {
var testNamespaceName string
BeforeEach(func() {
By("Creating fake synchronous mgmt-api server")
Expand Down
3 changes: 3 additions & 0 deletions internal/controllers/control/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ func createCompactRequest(taskConfig *TaskConfiguration) *httphelper.CompactRequ
return &httphelper.CompactRequest{
KeyspaceName: taskConfig.Arguments.KeyspaceName,
Tables: taskConfig.Arguments.Tables,
SplitOutput: taskConfig.Arguments.SplitOutput,
StartToken: taskConfig.Arguments.StartToken,
EndToken: taskConfig.Arguments.EndToken,
}
}

Expand Down
28 changes: 24 additions & 4 deletions pkg/httphelper/server_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package httphelper

import (
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -17,7 +18,9 @@ var featuresReply = `{
"async_upgrade_sstable_task",
"async_move_task",
"async_gc_task",
"async_flush_task"
"async_flush_task",
"async_scrub_task",
"async_compaction_task"
]
}`

Expand All @@ -36,11 +39,13 @@ func mgmtApiListener() (net.Listener, error) {

type CallDetails struct {
URLCounts map[string]int
Payloads [][]byte
}

func NewCallDetails() *CallDetails {
return &CallDetails{
URLCounts: make(map[string]int),
Payloads: make([][]byte, 0),
}
}

Expand Down Expand Up @@ -68,7 +73,15 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server,
w.WriteHeader(http.StatusOK)
jobId := query.Get("job_id")
_, err = w.Write([]byte(fmt.Sprintf(jobDetailsCompleted, jobId)))
} else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move" || r.URL.Path == "/api/v1/ops/tables/compact" || r.URL.Path == "/api/v1/ops/tables/scrub" || r.URL.Path == "/api/v1/ops/tables/flush" || r.URL.Path == "/api/v1/ops/tables/garbagecollect") {
} else if r.Method == http.MethodPost &&
(r.URL.Path == "/api/v1/ops/keyspace/cleanup" ||
r.URL.Path == "/api/v1/ops/node/rebuild" ||
r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" ||
r.URL.Path == "/api/v0/ops/node/move" ||
r.URL.Path == "/api/v1/ops/tables/compact" ||
r.URL.Path == "/api/v1/ops/tables/scrub" ||
r.URL.Path == "/api/v1/ops/tables/flush" ||
r.URL.Path == "/api/v1/ops/tables/garbagecollect") {
w.WriteHeader(http.StatusOK)
// Write jobId
jobId++
Expand All @@ -87,8 +100,6 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server,
func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Server, error) {
jobId := 0

// TODO Repeated code from above.. refactor

return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
query, err := url.ParseQuery(r.URL.RawQuery)
if err != nil {
Expand Down Expand Up @@ -135,6 +146,15 @@ func FakeMgmtApiServer(callDetails *CallDetails, handlerFunc http.HandlerFunc) (
callerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if callDetails != nil {
callDetails.incr(r.URL.Path)

if r.ContentLength > 0 {
payload, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
callDetails.Payloads = append(callDetails.Payloads, payload)
}
}
handlerFunc(w, r)
})
Expand Down

0 comments on commit f9388c9

Please sign in to comment.