Skip to content

Commit

Permalink
ObservedGeneration and update block (#1298)
Browse files Browse the repository at this point in the history
* Add new status field: ObservedGeneration and ensure it is updated at the end of the reconciliation

* Upgrade operator test should verify the state of CassandraDatacenter also, not just Stargate

* Add new SetConditionStatus to reduce duplication in the code, add restricted values for AutomatedUpdate, "always" and "once" and verify them in the webhook.

Remove tee outputting to stdout in the helm prepare script.

UpdateStatus should only delete and reset the ClusterRequiresUpdate if it was allowed to update in the first place. Also, we should listen to changes and reconcile if annotations change.

Add additional checks to the GenerationCheck test to ensure we do not touch the CassandraDatacenter unless given permission.

* Add new refresh K8ssandraTask

* Re-enable tests

* Make the process entirely in the annotations, no more task

* Change the name of the refresh task to be predictable

* Add documentation

* Add reconcile request after updating the Datacenter

---------

Co-authored-by: Olivier Michallat <[email protected]>
  • Loading branch information
burmanm and olim7t authored Jul 25, 2024
1 parent 1a62958 commit 99a34d5
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-1.18.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ When cutting a new release, update the `unreleased` heading to the tag being gen
* [CHANGE] Update cassandra-medusa to 0.22.0
* [CHANGE] Update cass-operator to v1.22.0
* [FEATURE] [#1310](https://github.com/k8ssandra/k8ssandra-operator/issues/1310) Enhance the MedusaBackupSchedule API to allow scheduling purge tasks
* [ENHANCEMENT] [#1274](https://github.com/k8ssandra/k8ssandra-operator/issues/1274) On upgrade, do not modify the CassandraDatacenter object unless instructed with an annotation `k8ssandra.io/autoupdate-spec` with value `once` or `always`
* [BUGFIX] [#1222](https://github.com/k8ssandra/k8ssandra-operator/issues/1222) Consider DC-level config when validating numToken updates in webhook
* [BUGFIX] [#1366](https://github.com/k8ssandra/k8ssandra-operator/issues/1366) Reaper deployment can't be created on OpenShift due to missing RBAC rule
9 changes: 9 additions & 0 deletions apis/k8ssandra/v1alpha1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,17 @@ const (

// Annotation to indicate the purpose of a given resource.
PurposeAnnotation = "k8ssandra.io/purpose"

// AutomatedUpdateAnnotation is an annotation that allows the Datacenters to be updated even if no changes were done to the K8ssandraCluster spec
AutomatedUpdateAnnotation = "k8ssandra.io/autoupdate-spec"

AllowUpdateAlways AllowUpdateType = "always"
AllowUpdateOnce AllowUpdateType = "once"
)

// TODO Use the accepted values from cass-operator's api instead to prevent drift, once Kubernetes dependencies are updated in k8ssandra-operator
type AllowUpdateType string

var (
SystemKeyspaces = []string{"system_traces", "system_distributed", "system_auth"}
DseKeyspaces = []string{"dse_leases", "dse_perf", "dse_security"}
Expand Down
20 changes: 20 additions & 0 deletions apis/k8ssandra/v1alpha1/k8ssandracluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,17 @@ type K8ssandraClusterStatus struct {

// +kubebuilder:default=None
Error string `json:"error,omitempty"`

// ObservedGeneration is the last observed generation of the K8ssandraCluster.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

type K8ssandraClusterConditionType string

const (
ClusterRequiresUpdate K8ssandraClusterConditionType = "RequiresUpdate"
)

type DecommissionProgress string

const (
Expand Down Expand Up @@ -511,6 +518,15 @@ func (s *K8ssandraClusterStatus) GetConditionStatus(conditionType K8ssandraClust
return corev1.ConditionUnknown
}

func (s *K8ssandraClusterStatus) SetConditionStatus(conditionType K8ssandraClusterConditionType, status corev1.ConditionStatus) {
now := metav1.Now()
s.SetCondition(K8ssandraClusterCondition{
Type: conditionType,
Status: status,
LastTransitionTime: &now,
})
}

func (s *K8ssandraClusterStatus) SetCondition(condition K8ssandraClusterCondition) {
for i, c := range s.Conditions {
if c.Type == condition.Type {
Expand Down Expand Up @@ -548,3 +564,7 @@ func (sd *ServerDistribution) IsDse() bool {
func (kc *K8ssandraCluster) GetClusterIdHash() string {
return utils.HashNameNamespace(kc.Name, kc.Namespace)
}

func (k *K8ssandraCluster) GenerationChanged() bool {
return k.Status.ObservedGeneration < k.Generation
}
21 changes: 21 additions & 0 deletions apis/k8ssandra/v1alpha1/k8ssandracluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha1
import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -194,3 +195,23 @@ metadata:
assert.Equal(t, "nodePortSvcLabelValue1", dc.Meta.ServiceConfig.NodePortService.Labels["nodePortSvcLabel1"])
assert.Equal(t, "nodePortSvcAnnotationValue1", dc.Meta.ServiceConfig.NodePortService.Annotations["nodePortSvcAnnotation1"])
}

func TestGenerationChanged(t *testing.T) {
assert := assert.New(t)
kc := &K8ssandraCluster{
ObjectMeta: metav1.ObjectMeta{
Generation: 2,
},
Spec: K8ssandraClusterSpec{},
}

kc.Status = K8ssandraClusterStatus{
ObservedGeneration: 0,
}

assert.True(kc.GenerationChanged())
kc.Status.ObservedGeneration = 2
assert.False(kc.GenerationChanged())
kc.ObjectMeta.Generation = 3
assert.True(kc.GenerationChanged())
}
9 changes: 9 additions & 0 deletions apis/k8ssandra/v1alpha1/k8ssandracluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/Masterminds/semver/v3"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"

"github.com/k8ssandra/k8ssandra-operator/pkg/clientcache"
Expand Down Expand Up @@ -100,6 +101,14 @@ func (r *K8ssandraCluster) validateK8ssandraCluster() error {
}
}

if metav1.HasAnnotation(r.ObjectMeta, AutomatedUpdateAnnotation) {
// Allow only always and once in the annotation
annotationValue := r.ObjectMeta.GetAnnotations()[AutomatedUpdateAnnotation]
if annotationValue != string(AllowUpdateAlways) && annotationValue != string(AllowUpdateOnce) {
return fmt.Errorf("invalid value for %s annotation: %s", AutomatedUpdateAnnotation, annotationValue)
}
}

if err := r.ValidateMedusa(); err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions apis/k8ssandra/v1alpha1/k8ssandracluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestWebhook(t *testing.T) {
t.Run("MedusaPrefixMissing", testMedusaPrefixMissing)
t.Run("InvalidDcName", testInvalidDcName)
t.Run("MedusaConfigNonLocalNamespace", testMedusaNonLocalNamespace)
t.Run("AutomatedUpdateAnnotation", testAutomatedUpdateAnnotation)
}

func testContextValidation(t *testing.T) {
Expand Down Expand Up @@ -577,3 +578,20 @@ func TestValidateUpdateNumTokens(t *testing.T) {
}
}
}

func testAutomatedUpdateAnnotation(t *testing.T) {
require := require.New(t)
createNamespace(require, "automated-update-namespace")
cluster := createMinimalClusterObj("automated-update-test", "automated-update-namespace")
require.NoError(cluster.validateK8ssandraCluster())

// Test should accept values once and always
metav1.SetMetaDataAnnotation(&cluster.ObjectMeta, AutomatedUpdateAnnotation, string(AllowUpdateOnce))
require.NoError(cluster.validateK8ssandraCluster())

metav1.SetMetaDataAnnotation(&cluster.ObjectMeta, AutomatedUpdateAnnotation, string(AllowUpdateAlways))
require.NoError(cluster.validateK8ssandraCluster())

cluster.Annotations[AutomatedUpdateAnnotation] = string("true")
require.Error(cluster.validateK8ssandraCluster())
}
5 changes: 5 additions & 0 deletions charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31025,6 +31025,11 @@ spec:
error:
default: None
type: string
observedGeneration:
description: ObservedGeneration is the last observed generation of
the K8ssandraCluster.
format: int64
type: integer
type: object
type: object
served: true
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30963,6 +30963,11 @@ spec:
error:
default: None
type: string
observedGeneration:
description: ObservedGeneration is the last observed generation of
the K8ssandraCluster.
format: int64
type: integer
type: object
type: object
served: true
Expand Down
10 changes: 6 additions & 4 deletions controllers/k8ssandra/add_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func addDcSetupForSingleDc(ctx context.Context, t *testing.T, f *framework.Frame
require := require.New(t)
kc := &api.K8ssandraCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "add-dc-test",
Namespace: namespace,
Name: "add-dc-test",
Annotations: map[string]string{api.AutomatedUpdateAnnotation: string(api.AllowUpdateAlways)},
},
Spec: api.K8ssandraClusterSpec{
Cassandra: &api.CassandraClusterTemplate{
Expand Down Expand Up @@ -119,8 +120,9 @@ func addDcSetupForMultiDc(ctx context.Context, t *testing.T, f *framework.Framew
require := require.New(t)
kc := &api.K8ssandraCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "add-dc-test",
Namespace: namespace,
Name: "add-dc-test",
Annotations: map[string]string{api.AutomatedUpdateAnnotation: string(api.AllowUpdateAlways)},
},
Spec: api.K8ssandraClusterSpec{
Cassandra: &api.CassandraClusterTemplate{
Expand Down
12 changes: 4 additions & 8 deletions controllers/k8ssandra/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ func createSingleDcClusterNoAuth(t *testing.T, ctx context.Context, f *framework
err := f.Client.Create(ctx, kc)
require.NoError(t, err, "failed to create K8ssandraCluster")

kcKey := framework.ClusterKey{K8sContext: f.ControlPlaneContext, NamespacedName: types.NamespacedName{Namespace: namespace, Name: kc.Name}}
dcKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}}
reaperKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "cluster1-dc1-reaper"}}
stargateKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "cluster1-dc1-stargate"}}

verifyFinalizerAdded(ctx, t, f, kcKey.NamespacedName)
verifyFinalizerAdded(ctx, t, f, kc)
verifySuperuserSecretCreated(ctx, t, f, kc)
verifySecretNotCreated(ctx, t, f, kc.Namespace, reaper.DefaultUserSecretName(kc.SanitizedName()))
verifyReplicatedSecretReconciled(ctx, t, f, kc)
Expand Down Expand Up @@ -165,12 +164,11 @@ func createSingleDcClusterAuth(t *testing.T, ctx context.Context, f *framework.F
err := f.Client.Create(ctx, kc)
require.NoError(t, err, "failed to create K8ssandraCluster")

kcKey := framework.ClusterKey{K8sContext: f.ControlPlaneContext, NamespacedName: types.NamespacedName{Namespace: namespace, Name: kc.Name}}
dcKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}}
reaperKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "cluster1-dc1-reaper"}}
stargateKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "cluster1-dc1-stargate"}}

verifyFinalizerAdded(ctx, t, f, kcKey.NamespacedName)
verifyFinalizerAdded(ctx, t, f, kc)
verifySuperuserSecretCreated(ctx, t, f, kc)
verifySecretCreated(ctx, t, f, kc.Namespace, reaper.DefaultUserSecretName(kc.Name))
verifyReplicatedSecretReconciled(ctx, t, f, kc)
Expand Down Expand Up @@ -285,12 +283,11 @@ func createSingleDcClusterAuthExternalSecrets(t *testing.T, ctx context.Context,
err := f.Client.Create(ctx, kc)
require.NoError(t, err, "failed to create K8ssandraCluster")

kcKey := framework.ClusterKey{K8sContext: f.ControlPlaneContext, NamespacedName: types.NamespacedName{Namespace: namespace, Name: kc.Name}}
dcKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}}
reaperKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "cluster1-dc1-reaper"}}
stargateKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "cluster1-dc1-stargate"}}

verifyFinalizerAdded(ctx, t, f, kcKey.NamespacedName)
verifyFinalizerAdded(ctx, t, f, kc)
verifySuperuserSecretNotCreated(ctx, t, f, kc)

// verify not created
Expand Down Expand Up @@ -416,10 +413,9 @@ func createSingleDcClusterExternalInternode(t *testing.T, ctx context.Context, f
err := f.Client.Create(ctx, kc)
require.NoError(err, "failed to create K8ssandraCluster")

kcKey := framework.ClusterKey{K8sContext: f.ControlPlaneContext, NamespacedName: types.NamespacedName{Namespace: namespace, Name: kc.Name}}
dcKey := framework.ClusterKey{K8sContext: f.DataPlaneContexts[1], NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}}

verifyFinalizerAdded(ctx, t, f, kcKey.NamespacedName)
verifyFinalizerAdded(ctx, t, f, kc)
verifySuperuserSecretCreated(ctx, t, f, kc)
verifyReplicatedSecretReconciled(ctx, t, f, kc)

Expand Down
2 changes: 1 addition & 1 deletion controllers/k8ssandra/cassandra_metrics_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func createSingleDcClusterWithMetricsAgent(t *testing.T, ctx context.Context, f
err := f.Client.Create(ctx, kc)
require.NoError(err, "failed to create K8ssandraCluster")

verifyFinalizerAdded(ctx, t, f, client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name})
verifyFinalizerAdded(ctx, t, f, kc)

verifySuperuserSecretCreated(ctx, t, f, kc)

Expand Down
76 changes: 67 additions & 9 deletions controllers/k8ssandra/datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ import (
"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
cassctlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1"
ktaskapi "github.com/k8ssandra/k8ssandra-operator/apis/control/v1alpha1"
api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
telemetryapi "github.com/k8ssandra/k8ssandra-operator/apis/telemetry/v1alpha1"
"github.com/k8ssandra/k8ssandra-operator/pkg/annotations"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"github.com/k8ssandra/k8ssandra-operator/pkg/labels"
"github.com/k8ssandra/k8ssandra-operator/pkg/result"
"github.com/k8ssandra/k8ssandra-operator/pkg/secret"
agent "github.com/k8ssandra/k8ssandra-operator/pkg/telemetry/cassandra_agent"
"github.com/k8ssandra/k8ssandra-operator/pkg/utils"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,6 +33,10 @@ const (
rebuildNodesLabel = "k8ssandra.io/rebuild-nodes"
)

func AllowUpdate(kc *api.K8ssandraCluster) bool {
return kc.GenerationChanged() || metav1.HasAnnotation(kc.ObjectMeta, api.AutomatedUpdateAnnotation)
}

func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, kc *api.K8ssandraCluster, logger logr.Logger) (result.ReconcileResult, []*cassdcapi.CassandraDatacenter) {
kcKey := utils.GetKey(kc)

Expand Down Expand Up @@ -143,9 +150,15 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k

r.setStatusForDatacenter(kc, actualDc)

if !annotations.CompareHashAnnotations(actualDc, desiredDc) {
dcLogger.Info("Updating datacenter")

if !annotations.CompareHashAnnotations(actualDc, desiredDc) && !AllowUpdate(kc) {
logger.Info("Datacenter requires an update, but we're not allowed to do it", "CassandraDatacenter", dcKey)
// We're not allowed to update, but need to
patch := client.MergeFrom(kc.DeepCopy())
kc.Status.SetConditionStatus(api.ClusterRequiresUpdate, corev1.ConditionTrue)
if err := r.Client.Status().Patch(ctx, kc, patch); err != nil {
return result.Error(fmt.Errorf("failed to set %s condition: %v", api.ClusterRequiresUpdate, err)), actualDcs
}
} else if !annotations.CompareHashAnnotations(actualDc, desiredDc) {
if actualDc.Spec.SuperuserSecretName != desiredDc.Spec.SuperuserSecretName {
// If actualDc is created with SuperuserSecretName, it can't be changed anymore. We should reject all changes coming from K8ssandraCluster
desiredDc.Spec.SuperuserSecretName = actualDc.Spec.SuperuserSecretName
Expand Down Expand Up @@ -178,6 +191,8 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
dcLogger.Error(err, "Failed to update datacenter")
return result.Error(err), actualDcs
}

return result.RequeueSoon(r.DefaultDelay), actualDcs
}

if actualDc.Spec.Stopped {
Expand Down Expand Up @@ -233,17 +248,60 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
}
}

if AllowUpdate(kc) {
dcsRequiringUpdate := make([]string, 0, len(actualDcs))
for _, dc := range actualDcs {
if dc.Status.GetConditionStatus(cassdcapi.DatacenterRequiresUpdate) == corev1.ConditionTrue {
dcsRequiringUpdate = append(dcsRequiringUpdate, dc.ObjectMeta.Name)
}
}

if len(dcsRequiringUpdate) > 0 {
generatedName := fmt.Sprintf("refresh-%d-%d", kc.Generation, kc.Status.ObservedGeneration)
internalTask := &ktaskapi.K8ssandraTask{}
err := r.Get(ctx, types.NamespacedName{Namespace: kc.Namespace, Name: generatedName}, internalTask)
// If task wasn't found, create it and if task is still running, requeue
if errors.IsNotFound(err) {
// Delegate work to the task controller for Datacenter operations
task := &ktaskapi.K8ssandraTask{
ObjectMeta: metav1.ObjectMeta{
Namespace: kc.Namespace,
Name: generatedName,
Labels: labels.WatchedByK8ssandraClusterLabels(kcKey),
},
Spec: ktaskapi.K8ssandraTaskSpec{
Cluster: corev1.ObjectReference{
Name: kc.Name,
},
Datacenters: dcsRequiringUpdate,
Template: cassctlapi.CassandraTaskTemplate{
Jobs: []cassctlapi.CassandraJob{{
Name: fmt.Sprintf("refresh-%s", kc.Name),
Command: "refresh",
}},
},
DcConcurrencyPolicy: batchv1.ForbidConcurrent,
},
}

if err := r.Create(ctx, task); err != nil {
return result.Error(err), actualDcs
}

return result.RequeueSoon(r.DefaultDelay), actualDcs

} else if internalTask.Status.CompletionTime.IsZero() {
return result.RequeueSoon(r.DefaultDelay), actualDcs
}
}
}

// If we reach this point all CassandraDatacenters are ready. We only set the
// CassandraInitialized condition if it is unset, i.e., only once. This allows us to
// distinguish whether we are deploying a CassandraDatacenter as part of a new cluster
// or as part of an existing cluster.
if kc.Status.GetConditionStatus(api.CassandraInitialized) == corev1.ConditionUnknown {
now := metav1.Now()
kc.Status.SetCondition(api.K8ssandraClusterCondition{
Type: api.CassandraInitialized,
Status: corev1.ConditionTrue,
LastTransitionTime: &now,
})
kc.Status.SetConditionStatus(api.CassandraInitialized, corev1.ConditionTrue)
}

return result.Continue(), actualDcs
Expand Down
Loading

0 comments on commit 99a34d5

Please sign in to comment.