Skip to content

Commit

Permalink
support batch id label for canary strategy
Browse files Browse the repository at this point in the history
Signed-off-by: zhihao jian <[email protected]>
  • Loading branch information
zhihao jian committed Jan 13, 2025
1 parent faa2d03 commit 227898c
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 14 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/batchrelease/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type BatchContext struct {
CurrentBatch int32 `json:"currentBatchIndex"`
// workload update revision
UpdateRevision string `json:"updateRevision,omitempty"`
// stable revision
StableRevision string `json:"stableRevision,omitempty"`

// workload replicas
Replicas int32 `json:"replicas"`
Expand Down
17 changes: 14 additions & 3 deletions pkg/controller/batchrelease/control/canarystyle/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,19 @@ func (rc *realCanaryController) UpgradeBatch() error {
return fmt.Errorf("wait canary workload %v reconcile", canary.GetCanaryInfo().LogKey)
}

batchContext := rc.CalculateBatchContext(rc.release)
batchContext, err := rc.CalculateBatchContext(rc.release)
if err != nil {
return err
}
klog.Infof("BatchRelease %v calculated context when upgrade batch: %s",
klog.KObj(rc.release), batchContext.Log())

return canary.UpgradeBatch(batchContext)
err = canary.UpgradeBatch(batchContext)
if err != nil {
return err
}

return rc.patcher.PatchPodBatchLabel(batchContext)
}

func (rc *realCanaryController) CheckBatchReady() error {
Expand All @@ -129,7 +137,10 @@ func (rc *realCanaryController) CheckBatchReady() error {
return fmt.Errorf("wait canary workload %v reconcile", canary.GetCanaryInfo().LogKey)
}

batchContext := rc.CalculateBatchContext(rc.release)
batchContext, err := rc.CalculateBatchContext(rc.release)
if err != nil {
return err
}
klog.Infof("BatchRelease %v calculated context when check batch ready: %s",
klog.KObj(rc.release), batchContext.Log())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type realCanaryController struct {
canaryObject *apps.Deployment
canaryClient client.Client
objectKey types.NamespacedName
canaryPods []*corev1.Pod
}

func newCanary(cli client.Client, key types.NamespacedName) realCanaryController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -82,19 +83,40 @@ func (rc *realController) BuildCanaryController(release *v1beta1.BatchRelease) (
return rc, nil
}

func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) *batchcontext.BatchContext {
func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (*batchcontext.BatchContext, error) {
rolloutID := release.Spec.ReleasePlan.RolloutID
if rolloutID != "" {
// if rollout-id is set, the pod will be patched batch label,
// so we have to list pod here.
if _, err := rc.ListOwnedPods(); err != nil {
return nil, err
}
}
replicas := *rc.stableObject.Spec.Replicas
currentBatch := release.Status.CanaryStatus.CurrentBatch
desiredUpdate := int32(control.CalculateBatchReplicas(release, int(replicas), int(currentBatch)))

return &batchcontext.BatchContext{
Pods: rc.canaryPods,
RolloutID: rolloutID,
Replicas: replicas,
UpdateRevision: release.Status.UpdateRevision,
StableRevision: release.Status.StableRevision,
CurrentBatch: currentBatch,
DesiredUpdatedReplicas: desiredUpdate,
FailureThreshold: release.Spec.ReleasePlan.FailureThreshold,
UpdatedReplicas: rc.canaryObject.Status.Replicas,
UpdatedReadyReplicas: rc.canaryObject.Status.AvailableReplicas,
}
FilterFunc: func(pods []*v1.Pod, ctx *batchcontext.BatchContext) []*v1.Pod {
filteredPods := make([]*v1.Pod, 0, len(pods))
for i := range pods {
if !util.IsConsistentWithRevision(pods[i], ctx.StableRevision) {
filteredPods = append(filteredPods, pods[i])
}
}
return filteredPods
},
}, nil
}

func (rc *realController) getLatestTemplate() (*v1.PodTemplateSpec, error) {
Expand All @@ -104,3 +126,12 @@ func (rc *realController) getLatestTemplate() (*v1.PodTemplateSpec, error) {
}
return &rc.stableObject.Spec.Template, nil
}

func (rc *realController) ListOwnedPods() ([]*corev1.Pod, error) {
if rc.canaryPods != nil {
return rc.canaryPods, nil
}
var err error
rc.canaryPods, err = util.ListOwnedPods(rc.canaryClient, rc.canaryObject)
return rc.canaryPods, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func TestCalculateBatchContext(t *testing.T) {
canaryObject: canary,
},
}
got := control.CalculateBatchContext(cs.release())
got, err := control.CalculateBatchContext(cs.release())
Expect(err).NotTo(HaveOccurred())
Expect(reflect.DeepEqual(got, cs.result)).Should(BeTrue())
})
}
Expand Down Expand Up @@ -290,7 +291,8 @@ func TestRealCanaryController(t *testing.T) {
Expect(util.EqualIgnoreHash(&c.canaryObject.Spec.Template, &deployment.Spec.Template)).Should(BeTrue())

// check rolling
batchContext := c.CalculateBatchContext(release)
batchContext, err := c.CalculateBatchContext(release)
Expect(err).NotTo(HaveOccurred())
err = controller.UpgradeBatch(batchContext)
Expect(err).NotTo(HaveOccurred())
canary := getCanaryDeployment(release, deployment, c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Interface interface {
BuildCanaryController(release *v1beta1.BatchRelease) (CanaryInterface, error)
// CalculateBatchContext calculate the current batch context according to
// our release plan and the statues of stable workload and canary workload.
CalculateBatchContext(release *v1beta1.BatchRelease) *batchcontext.BatchContext
CalculateBatchContext(release *v1beta1.BatchRelease) (*batchcontext.BatchContext, error)
}

// CanaryInterface contains the methods about canary workload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (
NoNeedUpdatedReplicas: noNeedUpdate,
PlannedUpdatedReplicas: plannedUpdate,
DesiredUpdatedReplicas: desiredUpdate,
FilterFunc: func(pods []*corev1.Pod, ctx *batchcontext.BatchContext) []*corev1.Pod {
filteredPods := make([]*corev1.Pod, 0, len(pods))
for i := range pods {
if util.IsConsistentWithRevision(pods[i], ctx.UpdateRevision) {
filteredPods = append(filteredPods, pods[i])
}
}
return filteredPods
},
}

if noNeedUpdate != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (rc *realController) BuildController() (partitionstyle.Interface, error) {
}
rc.object = object

//update this function
// update this function
rc.WorkloadInfo = util.ParseWorkload(object)

// for Advanced DaemonSet which has no updatedReadyReplicas field, we should
Expand Down Expand Up @@ -189,6 +189,15 @@ func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (
NoNeedUpdatedReplicas: noNeedUpdate,
PlannedUpdatedReplicas: plannedUpdate,
DesiredUpdatedReplicas: desiredUpdate,
FilterFunc: func(pods []*corev1.Pod, ctx *batchcontext.BatchContext) []*corev1.Pod {
filteredPods := make([]*corev1.Pod, 0, len(pods))
for i := range pods {
if util.IsConsistentWithRevision(pods[i], ctx.UpdateRevision) {
filteredPods = append(filteredPods, pods[i])
}
}
return filteredPods
},
}

if noNeedUpdate != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (
RolloutID: rolloutID,
CurrentBatch: currentBatch,
UpdateRevision: release.Status.UpdateRevision,
StableRevision: release.Status.StableRevision,
DesiredPartition: desiredPartition,
FailureThreshold: release.Spec.ReleasePlan.FailureThreshold,

Expand All @@ -183,6 +184,15 @@ func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (
UpdatedReadyReplicas: rc.Status.UpdatedReadyReplicas,
PlannedUpdatedReplicas: PlannedUpdatedReplicas,
DesiredUpdatedReplicas: PlannedUpdatedReplicas,
FilterFunc: func(pods []*corev1.Pod, ctx *batchcontext.BatchContext) []*corev1.Pod {
filteredPods := make([]*corev1.Pod, 0, len(pods))
for i := range pods {
if util.IsConsistentWithRevision(pods[i], ctx.UpdateRevision) {
filteredPods = append(filteredPods, pods[i])
}
}
return filteredPods
},
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (
NoNeedUpdatedReplicas: noNeedUpdate,
PlannedUpdatedReplicas: plannedUpdate,
DesiredUpdatedReplicas: desiredUpdate,
FilterFunc: func(pods []*corev1.Pod, ctx *batchcontext.BatchContext) []*corev1.Pod {
filteredPods := make([]*corev1.Pod, 0, len(pods))
for i := range pods {
if util.IsConsistentWithRevision(pods[i], ctx.UpdateRevision) {
filteredPods = append(filteredPods, pods[i])
}
}
return filteredPods
},
}

if noNeedUpdate != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller/batchrelease/labelpatch/patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ func (r *realPatcher) patchPodBatchLabel(pods []*corev1.Pod, ctx *batchcontext.B
klog.InfoS("Pod is being deleted, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
continue
}
// we don't patch label for the active old revision pod
if !util.IsConsistentWithRevision(pod, ctx.UpdateRevision) {
klog.InfoS("Pod is not consistent with revision, skip patching", "pod", klog.KObj(pod), "rollout", r.logKey)
continue
}

if pod.Labels[v1beta1.RolloutIDLabel] != ctx.RolloutID {
// for example: new/recreated pods
updatedButUnpatchedPods = append(updatedButUnpatchedPods, pod)
Expand Down

0 comments on commit 227898c

Please sign in to comment.