Skip to content

Commit

Permalink
kcgid: gc: clean up orphaned pending pods
Browse files Browse the repository at this point in the history
  • Loading branch information
xdavidwu committed Jun 28, 2024
1 parent 39f311b commit 000877d
Showing 1 changed file with 57 additions and 3 deletions.
60 changes: 57 additions & 3 deletions internal/cgid/kubernetes/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func cleanupOldGeneration(log logr.Logger, c client.Client, current *kubecgiv1al
panic("cannot list pods")
}

// pending is always deleted
keep := map[corev1.PodPhase]bool{}

if current.Spec.HistoryLimit != nil {
Expand All @@ -49,9 +50,8 @@ func cleanupOldGeneration(log logr.Logger, c client.Client, current *kubecgiv1al
if generation >= current.Generation {
continue
}

if pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
log.Info("found non-terminated pod of previous geneation",
if pod.Status.Phase == corev1.PodRunning {
log.Info("found running pod of previous geneation",
"pod", pod.Name, "generation", generation, "phase", pod.Status.Phase)
continue
}
Expand Down Expand Up @@ -137,6 +137,51 @@ func deleteUnlessLastN(log logr.Logger, c client.WithWatch, n int32, listOpts ..
}
}

func deleteAll(log logr.Logger, c client.WithWatch, listOpts ...client.ListOption) {
// deletion may race with other policy or instance, thus ignoring not found
// last n should always be available as long as the order instances see is the same

var list corev1.PodList
err := c.List(context.Background(), &list, listOpts...)
if err != nil {
log.Error(err, "cannot list pods")
panic("cannot list pods")
}

l := int32(len(list.Items))
for i := int32(0); i < l; i += 1 {
err = client.IgnoreNotFound(c.Delete(context.Background(), &list.Items[i]))
if err != nil {
log.Error(err, "cannot delete pod", "pod", list.Items[i].Name)
}
}

watcher, err := watchtools.NewRetryWatcher(
list.ResourceVersion,
watcherWithOpts(context.Background(), c, &list, listOpts...),
)
if err != nil {
log.Error(err, "cannot watch pods")
panic("cannot watch pods")
}
results := watcher.ResultChan()
for {
ev, ok := <-results
if !ok {
log.Error(nil, "watch channel closed")
panic("watch channel closed")
}
if ev.Type == watch.Added {
pod := ev.Object.(*corev1.Pod)
log.Info("remove pod", "pod", pod.Name)
err = client.IgnoreNotFound(c.Delete(context.Background(), pod))
if err != nil {
log.Error(err, "cannot delete pod", "pod", pod.Name)
}
}
}
}

func CollectGarbage(log logr.Logger, c client.WithWatch, apiset *kubecgiv1alpha1.APISet) {
cleanupOldGeneration(log.WithValues("policy", "previousVersions"), c, apiset)

Expand Down Expand Up @@ -170,4 +215,13 @@ func CollectGarbage(log logr.Logger, c client.WithWatch, apiset *kubecgiv1alpha1
client.MatchingLabels{gcKey: "true"},
client.MatchingFields{"status.phase": string(phase)})
}
go deleteAll(
log.WithValues("policy", "orphanedPending"),
c,
client.InNamespace(apiset.Namespace),
client.MatchingLabels{managedByKey: manager},
client.MatchingLabels{generationKey: gen},
client.MatchingLabels{gcKey: "true"},
client.MatchingFields{"status.phase": string(corev1.PodPending)})
// TODO for running pod, define a deadline for it to terminate?
}

0 comments on commit 000877d

Please sign in to comment.