From 5801d6ac15c215ce018ab69bc901d729dd0358d7 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 8 May 2024 18:11:47 +0300 Subject: [PATCH] Make the process entirely in the annotations, no more task --- .../control/k8ssandratask_controller.go | 108 ------------------ .../control/k8ssandratask_controller_test.go | 74 ------------ controllers/k8ssandra/datacenters.go | 52 +++++++++ .../k8ssandracluster_controller_test.go | 47 ++++++++ 4 files changed, 99 insertions(+), 182 deletions(-) diff --git a/controllers/control/k8ssandratask_controller.go b/controllers/control/k8ssandratask_controller.go index c45a75890..8301f0ac2 100644 --- a/controllers/control/k8ssandratask_controller.go +++ b/controllers/control/k8ssandratask_controller.go @@ -23,7 +23,6 @@ import ( "time" "github.com/go-logr/logr" - cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" cassapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" k8capi "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/clientcache" @@ -55,8 +54,6 @@ import ( const ( k8ssandraTaskFinalizer = "control.k8ssandra.io/finalizer" - InternalTaskAnnotation = "control.k8ssandra.io/internal-command" - internalRefreshCommand = "refresh" defaultTTL = time.Duration(86400) * time.Second ) @@ -152,12 +149,6 @@ func (r *K8ssandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques return r.reportInvalidSpec(ctx, kTask, "unknown K8ssandraCluster %s.%s", kcKey.Namespace, kcKey.Name) } - if kTask.Spec.Template.Jobs[0].Command == "refresh" { - if !metav1.HasAnnotation(kc.ObjectMeta, InternalTaskAnnotation) { - return r.executeRefreshTask(ctx, kTask, kc) - } - } - if dcs, err := filterDcs(kc, kTask.Spec.Datacenters); err != nil { return r.reportInvalidSpec(ctx, kTask, err.Error()) } else { @@ -210,92 +201,6 @@ func (r *K8ssandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } -func (r *K8ssandraTaskReconciler) executeRefreshTask(ctx context.Context, kTask *api.K8ssandraTask, kc *k8capi.K8ssandraCluster) (ctrl.Result, error) { - if kTask.Status.StartTime == nil { - patch := client.MergeFrom(kTask.DeepCopy()) - now := metav1.Now() - kTask.Status.StartTime = &now - kTask.Status.Active = 1 - kTask.SetCondition(cassapi.JobRunning, metav1.ConditionTrue) - if err := r.Status().Patch(ctx, kTask, patch); err != nil { - return ctrl.Result{}, err - } - } - - // First verify if K8ssandraCluster itself has "UpdateRequired" and process it until it no longer has it. - if kc.Status.GetConditionStatus(k8capi.ClusterRequiresUpdate) == corev1.ConditionTrue { - if metav1.HasAnnotation(kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation) { - return ctrl.Result{Requeue: true}, nil - } else { - patch := client.MergeFrom(kc.DeepCopy()) - metav1.SetMetaDataAnnotation(&kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation, string(k8capi.AllowUpdateOnce)) - if err := r.Patch(ctx, kc, patch); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{Requeue: true}, nil - } - } - - internalTask := &api.K8ssandraTask{} - err := r.Get(ctx, types.NamespacedName{Namespace: kTask.Namespace, Name: kTask.Name + "-refresh-internal"}, internalTask) - // If task wasn't found, create it and if task is still running, requeue - if k8serrors.IsNotFound(err) { - // Then verify that no CassandraDatacenter has "UpdateRequired" and if they do, create new tasks to execute them - dcs, err := r.datacenters(ctx, kc) - if err != nil { - return ctrl.Result{}, err - } - - dcsRequiringUpdate := make([]string, 0, len(dcs)) - for _, dc := range dcs { - if dc.Status.GetConditionStatus("RequiresUpdate") == corev1.ConditionTrue { // TODO Update this to cassdcapi const when cass-operator is updatedß - dcsRequiringUpdate = append(dcsRequiringUpdate, dc.DatacenterName()) // TODO Ís this the correct reference? - } - } - - if len(dcsRequiringUpdate) > 0 { - // Delegate work to the task controller for Datacenter operations - task := &api.K8ssandraTask{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: kTask.Namespace, - Name: kTask.Name + "-refresh-internal", - Annotations: map[string]string{ - InternalTaskAnnotation: internalRefreshCommand, - }, - }, - Spec: api.K8ssandraTaskSpec{ - Datacenters: make([]string, len(dcsRequiringUpdate)), - Template: kTask.Spec.Template, - }, - } - - if err := r.Create(ctx, task); err != nil { - return ctrl.Result{}, err - } - } - } else if err != nil { - return ctrl.Result{}, err - } else { - // Verify if the job is completed - if internalTask.Status.CompletionTime.IsZero() { - return ctrl.Result{Requeue: true}, nil - } - } - - patch := client.MergeFrom(kTask.DeepCopy()) - now := metav1.Now() - kTask.Status.CompletionTime = &now - kTask.Status.Succeeded = 1 - kTask.Status.Active = 0 - kTask.SetCondition(cassapi.JobComplete, metav1.ConditionTrue) - kTask.SetCondition(cassapi.JobRunning, metav1.ConditionFalse) - if err := r.Status().Patch(ctx, kTask, patch); err != nil { - return ctrl.Result{}, err - } - - return ctrl.Result{}, nil -} - func (r *K8ssandraTaskReconciler) getCluster(ctx context.Context, kcKey client.ObjectKey) (*k8capi.K8ssandraCluster, bool, error) { kc := &k8capi.K8ssandraCluster{} if err := r.Get(ctx, kcKey, kc); k8serrors.IsNotFound(err) { @@ -306,19 +211,6 @@ func (r *K8ssandraTaskReconciler) getCluster(ctx context.Context, kcKey client.O return kc, true, nil } -func (r *K8ssandraTaskReconciler) datacenters(ctx context.Context, kc *k8capi.K8ssandraCluster) ([]cassdcapi.CassandraDatacenter, error) { - dcs := make([]cassdcapi.CassandraDatacenter, 0, len(kc.Spec.Cassandra.Datacenters)) - for _, dcTemplate := range kc.Spec.Cassandra.Datacenters { - dcKey := client.ObjectKey{Namespace: utils.FirstNonEmptyString(dcTemplate.Meta.Namespace, kc.Namespace), Name: dcTemplate.Meta.Name} - dc := &cassdcapi.CassandraDatacenter{} - if err := r.Get(ctx, dcKey, dc); err != nil { - return nil, err - } - dcs = append(dcs, *dc) - } - return dcs, nil -} - func (r *K8ssandraTaskReconciler) deleteCassandraTasks( ctx context.Context, kTask *api.K8ssandraTask, diff --git a/controllers/control/k8ssandratask_controller_test.go b/controllers/control/k8ssandratask_controller_test.go index 101b672f8..d0b4f0d5c 100644 --- a/controllers/control/k8ssandratask_controller_test.go +++ b/controllers/control/k8ssandratask_controller_test.go @@ -2,7 +2,6 @@ package control import ( "context" - "fmt" "testing" "time" @@ -10,7 +9,6 @@ import ( cassapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" api "github.com/k8ssandra/k8ssandra-operator/apis/control/v1alpha1" k8capi "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" - "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" "github.com/k8ssandra/k8ssandra-operator/pkg/clientcache" "github.com/k8ssandra/k8ssandra-operator/pkg/config" testutils "github.com/k8ssandra/k8ssandra-operator/pkg/test" @@ -73,7 +71,6 @@ func TestK8ssandraTask(t *testing.T) { t.Run("ExecuteSequentialK8ssandraTask", testEnv.ControllerTest(ctx, executeSequentialK8ssandraTask)) t.Run("DeleteK8ssandraTask", testEnv.ControllerTest(ctx, deleteK8ssandraTask)) t.Run("ExpireK8ssandraTask", testEnv.ControllerTest(ctx, expireK8ssandraTask)) - t.Run("RefreshK8ssandraCluster", testEnv.ControllerTest(ctx, refreshK8ssandraTask)) } // executeParallelK8ssandraTask creates and runs a K8ssandraTask with parallel DC processing. @@ -381,65 +378,6 @@ func expireK8ssandraTask(t *testing.T, ctx context.Context, f *framework.Framewo require.Eventually(func() bool { return !f.K8ssandraTaskExists(ctx, k8TaskKey)() }, timeout, interval) } -func refreshK8ssandraTask(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) { - require := require.New(t) - - kc := newCluster(namespace, "kc", - newDc("dc1", f.DataPlaneContexts[0])) - require.NoError(f.Client.Create(ctx, kc), "failed to create K8ssandraCluster") - - kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionTrue) - require.NoError(f.Client.Status().Update(ctx, kc)) - - dcConfig := cassandra.Coalesce(kc.CassClusterName(), kc.Spec.Cassandra.DeepCopy(), kc.Spec.Cassandra.Datacenters[0].DeepCopy()) - dc, err := cassandra.NewDatacenter(types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name}, dcConfig) - require.NoError(err) - require.NoError(f.Client.Create(ctx, dc)) - - t.Log("Create a K8ssandraTask with TTL") - ttl := new(int32) - *ttl = 1 - k8Task := &api.K8ssandraTask{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - Name: "refresh", - }, - Spec: api.K8ssandraTaskSpec{ - Cluster: corev1.ObjectReference{ - Name: "kc", - }, - Template: cassapi.CassandraTaskTemplate{ - TTLSecondsAfterFinished: ttl, - Jobs: []cassapi.CassandraJob{{ - Name: "job1", - Command: "refresh", - }}, - }, - DcConcurrencyPolicy: batchv1.ForbidConcurrent, - }, - } - require.NoError(f.Client.Create(ctx, k8Task), "failed to create K8ssandraTask") - - require.Eventually(func() bool { - if err := f.Client.Get(ctx, types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name}, kc); err != nil { - return false - } - if !metav1.HasAnnotation(kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation) { - return false - } - return kc.Annotations[k8capi.AutomatedUpdateAnnotation] == string(k8capi.AllowUpdateOnce) - }, timeout, interval) - // First case, there's only changes in K8ssandraCluster - t.Log("Mark the K8ssandraCluster as updated if the annotation was added") - require.Equal(corev1.ConditionTrue, kc.Status.GetConditionStatus(k8capi.ClusterRequiresUpdate)) - kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionFalse) - require.NoError(f.Client.Status().Update(ctx, kc)) - - waitForTaskCompletion(ctx, t, f, newClusterKey(f.ControlPlaneContext, namespace, "refresh")) - // Second case, we also have changes in the CassandraDatacenter, even after updating K8ssandraCluster - -} - func newCluster(namespace, name string, dcs ...k8capi.CassandraDatacenterTemplate) *k8capi.K8ssandraCluster { return &k8capi.K8ssandraCluster{ ObjectMeta: metav1.ObjectMeta{ @@ -472,18 +410,6 @@ func newDc(name string, k8sContext string) k8capi.CassandraDatacenterTemplate { } } -func waitForTaskCompletion(ctx context.Context, t *testing.T, f *framework.Framework, taskKey framework.ClusterKey) { - require.Eventually(t, func() bool { - k8Task := &api.K8ssandraTask{} - require.NoError(t, f.Get(ctx, taskKey, k8Task)) - fmt.Printf("k8Task.Status: %+v\n", k8Task.Status) - return k8Task.Status.Active == 0 && - k8Task.Status.Succeeded > 0 && - k8Task.GetConditionStatus(cassapi.JobRunning) == metav1.ConditionFalse && - k8Task.GetConditionStatus(cassapi.JobComplete) == metav1.ConditionTrue - }, timeout, interval) -} - func newClusterKey(k8sContext, namespace, name string) framework.ClusterKey { return framework.ClusterKey{ NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}, diff --git a/controllers/k8ssandra/datacenters.go b/controllers/k8ssandra/datacenters.go index 378e69be4..fd25376d3 100644 --- a/controllers/k8ssandra/datacenters.go +++ b/controllers/k8ssandra/datacenters.go @@ -6,19 +6,23 @@ import ( "sort" "strconv" "strings" + "time" "github.com/Masterminds/semver/v3" "github.com/go-logr/logr" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" cassctlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" + ktaskapi "github.com/k8ssandra/k8ssandra-operator/apis/control/v1alpha1" api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" telemetryapi "github.com/k8ssandra/k8ssandra-operator/apis/telemetry/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" + "github.com/k8ssandra/k8ssandra-operator/pkg/labels" "github.com/k8ssandra/k8ssandra-operator/pkg/result" "github.com/k8ssandra/k8ssandra-operator/pkg/secret" agent "github.com/k8ssandra/k8ssandra-operator/pkg/telemetry/cassandra_agent" "github.com/k8ssandra/k8ssandra-operator/pkg/utils" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -243,6 +247,54 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k } } + if AllowUpdate(kc) { + dcsRequiringUpdate := make([]string, 0, len(actualDcs)) + for _, dc := range actualDcs { + if dc.Status.GetConditionStatus("RequiresUpdate") == corev1.ConditionTrue { // TODO Update this to cassdcapi const when cass-operator is updatedß + dcsRequiringUpdate = append(dcsRequiringUpdate, dc.DatacenterName()) + } + } + + if len(dcsRequiringUpdate) > 0 { + generatedName := fmt.Sprintf("refresh-%d", time.Now().Unix()) + internalTask := &ktaskapi.K8ssandraTask{} + err := r.Get(ctx, types.NamespacedName{Namespace: kc.Namespace, Name: generatedName}, internalTask) + // If task wasn't found, create it and if task is still running, requeue + if errors.IsNotFound(err) { + // Delegate work to the task controller for Datacenter operations + task := &ktaskapi.K8ssandraTask{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kc.Namespace, + Name: generatedName, + Labels: labels.WatchedByK8ssandraClusterLabels(kcKey), + }, + Spec: ktaskapi.K8ssandraTaskSpec{ + Cluster: corev1.ObjectReference{ + Name: kc.Name, + }, + Datacenters: make([]string, len(dcsRequiringUpdate)), + Template: cassctlapi.CassandraTaskTemplate{ + Jobs: []cassctlapi.CassandraJob{{ + Name: fmt.Sprintf("refresh-%s", kc.Name), + Command: "refresh", + }}, + }, + DcConcurrencyPolicy: batchv1.ForbidConcurrent, + }, + } + + if err := r.Create(ctx, task); err != nil { + return result.Error(err), actualDcs + } + + return result.RequeueSoon(r.DefaultDelay), actualDcs + + } else if internalTask.Status.CompletionTime.IsZero() { + return result.RequeueSoon(r.DefaultDelay), actualDcs + } + } + } + // If we reach this point all CassandraDatacenters are ready. We only set the // CassandraInitialized condition if it is unset, i.e., only once. This allows us to // distinguish whether we are deploying a CassandraDatacenter as part of a new cluster diff --git a/controllers/k8ssandra/k8ssandracluster_controller_test.go b/controllers/k8ssandra/k8ssandracluster_controller_test.go index 19b402261..654bb7d54 100644 --- a/controllers/k8ssandra/k8ssandracluster_controller_test.go +++ b/controllers/k8ssandra/k8ssandracluster_controller_test.go @@ -2721,3 +2721,50 @@ func testGenerationCheck(t *testing.T, ctx context.Context, f *framework.Framewo require.NoError(f.Get(ctx, dcKey, dc), "failed to get CassandraDatacenter dc1") require.NotEqual("gibberish", dc.Annotations[api.ResourceHashAnnotation]) } + +// func refreshK8ssandraTask(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) { +// require := require.New(t) + +// kc := newCluster(namespace, "kc", +// newDc("dc1", f.DataPlaneContexts[0])) +// require.NoError(f.Client.Create(ctx, kc), "failed to create K8ssandraCluster") + +// kcKey := types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name} +// kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionTrue) +// require.NoError(f.Client.Status().Update(ctx, kc)) + +// dcKey := types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name} +// dcConfig := cassandra.Coalesce(kc.CassClusterName(), kc.Spec.Cassandra.DeepCopy(), kc.Spec.Cassandra.Datacenters[0].DeepCopy()) +// dc, err := cassandra.NewDatacenter(dcKey, dcConfig) +// require.NoError(err) +// require.NoError(f.Client.Create(ctx, dc)) + +// require.NoError(f.Client.Get(ctx, dcKey, dc)) +// dc.Status.SetCondition(*cassdcapi.NewDatacenterCondition("RequiresUpdate", corev1.ConditionTrue)) +// require.NoError(f.Client.Status().Update(ctx, dc)) + +// require.NoError(f.Client.Get(ctx, kcKey, kc)) +// metav1.SetMetaDataAnnotation(&kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation, string(k8capi.AllowUpdateOnce)) + +// require.Eventually(func() bool { +// // Find the correct task +// tasks := &api.K8ssandraTaskList{} +// }, timeout, interval) + +// require.Eventually(func() bool { +// if err := f.Client.Get(ctx, dcKey, dc); err != nil { +// return false +// } +// return metav1.HasAnnotation(dc.ObjectMeta, "cassandra.datastax.com/autoupdate-spec") +// }, timeout, interval) + +// // First case, there's only changes in K8ssandraCluster +// t.Log("Mark the K8ssandraCluster as updated if the annotation was added") +// require.Equal(corev1.ConditionTrue, kc.Status.GetConditionStatus(k8capi.ClusterRequiresUpdate)) +// kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionFalse) +// require.NoError(f.Client.Status().Update(ctx, kc)) + +// waitForTaskCompletion(ctx, t, f, newClusterKey(f.ControlPlaneContext, namespace, "refresh")) +// // Second case, we also have changes in the CassandraDatacenter, even after updating K8ssandraCluster + +// }