diff --git a/pkg/controller.v1/tensorflow/controller.go b/pkg/controller.v1/tensorflow/controller.go index 5631862fa4..7e69516958 100644 --- a/pkg/controller.v1/tensorflow/controller.go +++ b/pkg/controller.v1/tensorflow/controller.go @@ -273,13 +273,13 @@ func (tc *TFController) processNextWorkItem() bool { } // Wait until queuing annotation is removed - if tfJob.Annotations != nil { - if suspend, exist := tfJob.Annotations[suspendInQueue]; exist && suspend == "true" { - infoMsg := fmt.Sprintf("Annotation %s is found, operator will not process until removed", suspendInQueue) - tflogger.LoggerForKey(key).Info(infoMsg) - return true - } - } + // if tfJob.Annotations != nil { + // if suspend, exist := tfJob.Annotations[suspendInQueue]; exist && suspend == "true" { + // infoMsg := fmt.Sprintf("Annotation %s is found, operator will not process until removed", suspendInQueue) + // tflogger.LoggerForKey(key).Info(infoMsg) + // return true + // } + // } // Sync TFJob to match the actual state to this desired state. forget, err := tc.syncHandler(key) @@ -354,6 +354,15 @@ func (tc *TFController) syncTFJob(key string) (bool, error) { return true, err } +func isSuspend(tfjob *tfv1.TFJob) bool { + if tfjob.Annotations != nil { + if suspend, exist := tfjob.Annotations[suspendInQueue]; exist && suspend == "true" { + return true + } + } + return false +} + // reconcileTFJobs checks and updates replicas for each given TFReplicaSpec. // It will requeue the tfjob in case of an error while creating/deleting pods/services. func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error { @@ -428,12 +437,12 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error { } // If the TFJob is terminated, delete all pods and services. - if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit { + if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit || isSuspend(tfjob) { // If TTL is set, you need to wait until the TTL time before reclaiming resources. ttlDuration, shouldWaitPodTTL := getPodTTL(tfjob) - if !shouldWaitPodTTL || isPodTTLReached(tfjob, ttlDuration) { + if !shouldWaitPodTTL || isPodTTLReached(tfjob, ttlDuration) || isSuspend(tfjob) { if err := tc.deletePodsAndServices(tfjob, pods); err != nil { return err } diff --git a/pkg/controller.v1/tensorflow/job.go b/pkg/controller.v1/tensorflow/job.go index b2eef33443..1150caf91c 100644 --- a/pkg/controller.v1/tensorflow/job.go +++ b/pkg/controller.v1/tensorflow/job.go @@ -2,12 +2,13 @@ package tensorflow import ( "fmt" - "github.com/kubeflow/tf-operator/pkg/util" "reflect" "time" + "github.com/kubeflow/tf-operator/pkg/util" + log "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/kubernetes/scheme" @@ -182,8 +183,9 @@ func (tc *TFController) deletePodsAndServices(tfJob *tfv1.TFJob, pods []*v1.Pod) return nil } + isSuspend := isSuspend(tfJob) // Delete nothing when the cleanPodPolicy is None. - if *tfJob.Spec.CleanPodPolicy == common.CleanPodPolicyNone { + if *tfJob.Spec.CleanPodPolicy == common.CleanPodPolicyNone && !isSuspend { return nil }