Skip to content

Commit

Permalink
Add new annotation cassandra.datastax.com/track-cleanup-tasks that en…
Browse files Browse the repository at this point in the history
…ables tracking cleanup after scale up. This will wait for the cleanup to finish before scale up is marked as completed. Also, add a unit test for the CheckRackLabels()
  • Loading branch information
burmanm committed Oct 25, 2024
1 parent c82c668 commit fa86b4b
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti
## unreleased

* [FEATURE] [#651](https://github.com/k8ssandra/cass-operator/issues/651) Add tsreload task for DSE deployments and ability to check if sync operation is available on the mgmt-api side
* [ENHANCEMENT] [#722](https://github.com/k8ssandra/cass-operator/issues/722) Add back the ability to track cleanup task before marking scale up as done. This is controlled by an annotation cassandra.datastax.com/track-cleanup-tasks
* [BUGFIX] [#705](https://github.com/k8ssandra/cass-operator/issues/705) Ensure ConfigSecret has annotations map before trying to set a value

## v1.22.4
Expand Down
23 changes: 23 additions & 0 deletions apis/cassandra/v1beta1/cassandradatacenter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ const (
// UseClientBuilderAnnotation enforces the usage of new config builder from k8ssandra-client for versions that would otherwise use the cass-config-builder
UseClientBuilderAnnotation = "cassandra.datastax.com/use-new-config-builder"

// TrackCleanupTasksAnnotation enforces the operator to track cleanup tasks after doing scale up. This prevents other operations to take place until the cleanup
// task has completed.
TrackCleanupTasksAnnotation = "cassandra.datastax.com/track-cleanup-tasks"

AllowUpdateAlways AllowUpdateType = "always"
AllowUpdateOnce AllowUpdateType = "once"

Expand Down Expand Up @@ -563,6 +567,25 @@ func (status *CassandraDatacenterStatus) GetConditionStatus(conditionType Datace
return corev1.ConditionUnknown
}

func (status *CassandraDatacenterStatus) AddTaskToTrack(objectMeta metav1.ObjectMeta) {
if status.TrackedTasks == nil {
status.TrackedTasks = make([]corev1.ObjectReference, 0, 1)
}

status.TrackedTasks = append(status.TrackedTasks, corev1.ObjectReference{
Name: objectMeta.Name,
Namespace: objectMeta.Namespace,
})
}

func (status *CassandraDatacenterStatus) RemoveTrackedTask(objectMeta metav1.ObjectMeta) {
for index, task := range status.TrackedTasks {
if task.Name == objectMeta.Name && task.Namespace == objectMeta.Namespace {
status.TrackedTasks = append(status.TrackedTasks[:index], status.TrackedTasks[index+1:]...)
}
}
}

func (dc *CassandraDatacenter) GetConditionStatus(conditionType DatacenterConditionType) corev1.ConditionStatus {
return (&dc.Status).GetConditionStatus(conditionType)
}
Expand Down
60 changes: 59 additions & 1 deletion pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2274,10 +2274,27 @@ func (rc *ReconciliationContext) CheckCassandraNodeStatuses() result.ReconcileRe

func (rc *ReconciliationContext) cleanupAfterScaling() result.ReconcileResult {
if !metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.NoAutomatedCleanupAnnotation) {

if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) {
// Verify if the cleanup task has completed before moving on the with ScalingUp finished
task, err := rc.findActiveTask(taskapi.CommandCleanup)
if err != nil {
return result.Error(err)
}

if task != nil {
return rc.activeTaskCompleted(task)
}
}

// Create the cleanup task
if err := rc.createTask(taskapi.CommandCleanup); err != nil {
return result.Error(err)
}

if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) {
return result.RequeueSoon(10)
}
}

return result.Continue()
Expand Down Expand Up @@ -2319,7 +2336,48 @@ func (rc *ReconciliationContext) createTask(command taskapi.CassandraCommand) er
return err
}

return nil
if !metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) {
return nil
}

dcPatch := client.MergeFrom(dc.DeepCopy())

rc.Datacenter.Status.AddTaskToTrack(task.ObjectMeta)

return rc.Client.Status().Patch(rc.Ctx, dc, dcPatch)
}

func (rc *ReconciliationContext) activeTaskCompleted(task *taskapi.CassandraTask) result.ReconcileResult {
if task.Status.CompletionTime != nil {
// Job was completed, remove it from followed task
dc := rc.Datacenter
dcPatch := client.MergeFrom(dc.DeepCopy())
rc.Datacenter.Status.RemoveTrackedTask(task.ObjectMeta)
if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil {
return result.Error(err)
}
return result.Continue()
}
return result.RequeueSoon(10)
}

func (rc *ReconciliationContext) findActiveTask(command taskapi.CassandraCommand) (*taskapi.CassandraTask, error) {
if len(rc.Datacenter.Status.TrackedTasks) > 0 {
for _, taskMeta := range rc.Datacenter.Status.TrackedTasks {
taskKey := types.NamespacedName{Name: taskMeta.Name, Namespace: taskMeta.Namespace}
task := &taskapi.CassandraTask{}
if err := rc.Client.Get(rc.Ctx, taskKey, task); err != nil {
return nil, err
}

for _, job := range task.Spec.Jobs {
if job.Command == command {
return task, nil
}
}
}
}
return nil, nil
}

func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult {
Expand Down
78 changes: 78 additions & 0 deletions pkg/reconciliation/reconcile_racks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,49 @@ func TestCleanupAfterScaling(t *testing.T) {
r := rc.cleanupAfterScaling()
assert.Equal(result.Continue(), r, "expected result of result.Continue()")
assert.Equal(taskapi.CommandCleanup, task.Spec.Jobs[0].Command)
assert.Equal(0, len(rc.Datacenter.Status.TrackedTasks))
}

func TestCleanupAfterScalingWithTracker(t *testing.T) {
rc, _, cleanupMockScr := setupTest()
defer cleanupMockScr()
assert := assert.New(t)

// Setup annotation

mockClient := mocks.NewClient(t)
rc.Client = mockClient

metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation, "true")

var task *taskapi.CassandraTask
// 1. Create task - return ok
k8sMockClientCreate(rc.Client.(*mocks.Client), nil).
Run(func(args mock.Arguments) {
arg := args.Get(1).(*taskapi.CassandraTask)
task = arg
}).
Times(1)

k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once()

r := rc.cleanupAfterScaling()
assert.Equal(taskapi.CommandCleanup, task.Spec.Jobs[0].Command)
assert.Equal(result.RequeueSoon(10), r, "expected result of result.RequeueSoon(10)")
assert.Equal(1, len(rc.Datacenter.Status.TrackedTasks))
// 3. GET - return completed task
k8sMockClientGet(rc.Client.(*mocks.Client), nil).
Run(func(args mock.Arguments) {
arg := args.Get(2).(*taskapi.CassandraTask)
task.DeepCopyInto(arg)
timeNow := metav1.Now()
arg.Status.CompletionTime = &timeNow
}).Once()
// 4. Patch to datacenter status
k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once()
r = rc.cleanupAfterScaling()
assert.Equal(result.Continue(), r, "expected result of result.Continue()")
assert.Equal(0, len(rc.Datacenter.Status.TrackedTasks))
}

func TestStripPassword(t *testing.T) {
Expand Down Expand Up @@ -2874,3 +2917,38 @@ func TestDatacenterPodsOldLabels(t *testing.T) {
// We should still find the pods
assert.Equal(int(*desiredStatefulSet.Spec.Replicas), len(rc.datacenterPods()))
}

func TestCheckRackLabels(t *testing.T) {
rc, _, cleanupMockScr := setupTest()
defer cleanupMockScr()
require := require.New(t)
err := rc.CalculateRackInformation()
require.NoError(err)

desiredStatefulSet, err := newStatefulSetForCassandraDatacenter(
nil,
"default",
rc.Datacenter,
3)
require.NoErrorf(err, "error occurred creating statefulset")

desiredStatefulSet.Status.ReadyReplicas = *desiredStatefulSet.Spec.Replicas

trackObjects := []runtime.Object{
desiredStatefulSet,
rc.Datacenter,
}
rc.Client = fake.NewClientBuilder().WithStatusSubresource(rc.Datacenter).WithRuntimeObjects(trackObjects...).Build()

rc.statefulSets = []*appsv1.StatefulSet{desiredStatefulSet}

res := rc.CheckRackLabels()
require.Equal(result.Continue(), res, "Label updates should not cause errors")
require.Subset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default"))
desiredStatefulSet.Labels[api.RackLabel] = "r1"
require.NotSubset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default"))

res = rc.CheckRackLabels()
require.Equal(result.Continue(), res, "Label updates should not cause errors")
require.Subset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default"))
}
2 changes: 1 addition & 1 deletion tests/testdata/default-two-rack-two-node-dc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ spec:
clusterName: cluster1
datacenterName: My_Super_Dc
serverType: cassandra
serverVersion: "4.0.10"
serverVersion: "4.1.7"
managementApiAuth:
insecure: {}
size: 2
Expand Down

0 comments on commit fa86b4b

Please sign in to comment.