Skip to content

Commit

Permalink
Merge pull request #18 from converged-computing/allow-completed-state
Browse files Browse the repository at this point in the history
cleanup: allow jobs to stay in completed
  • Loading branch information
vsoch authored Aug 9, 2024
2 parents f925ffa + 2d9f829 commit 0d577aa
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 35 deletions.
5 changes: 0 additions & 5 deletions .github/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ echo "Sleeping 1 minute waiting for scheduler deploy"
sleep 60
kubectl get pods

# Note that exiting early here on success, we will test this further with the kubectl command TBA
# since all pods are cleaned up. Or we need a way to distinguish between the terminated event
# because the job finished and terminated otherwise (and needs the pod cleaned up)
exit 0

# This will get the fluence image (which has scheduler and sidecar), which should be first
fluxnetes_pod=$(kubectl get pods -o json | jq -r .items[0].metadata.name)
echo "Found fluxnetes pod ${fluxnetes_pod}"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ SELECT group_name, group_size from pods_provisional;
- we likely want to move assume pod outside of that schedule function, or ensure pod passed matches.
- [ ] Optimize queries.
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] need to cancel reservations and clear table at end of cycle
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- Testing:
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ After install (see the [README](../README.md)) you can create any abstraction (p
| "fluxnetes.group-name" | The name of the group | fluxnetes-group-<namespace>-<name> |
| "fluxnetes.group-size" | The size of the group | 1 |

As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single pod becomes a single member group.
As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single pod becomes a single member group.

### Duration

Expand Down
27 changes: 25 additions & 2 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package fluxnetes
import (
"encoding/json"

podutil "k8s.io/kubernetes/pkg/api/v1/pod"

groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers"

Expand Down Expand Up @@ -60,7 +62,6 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
default:
klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name)
}

// Get the fluxid from the database, and issue cleanup for the group:
// - deletes fluxID if it exists
// - cleans up Kubernetes objects up to parent with "true"
Expand All @@ -70,6 +71,28 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Errorf("Issue marshalling podspec for Pod %s/%s", pod.Namespace, pod.Name)
}
groupName := groups.GetPodGroupName(pod)

// Since this is a termination event (meaning a single pod has terminated)
// we only want to cancel the fluxion job if ALL pods in the group are done.
// We don't want to delete the Kubernetes objects - this should happen on its
// own, never (if no timeout) or with a cancel job if a duration is set
pods, err := q.GetGroupPods(pod.Namespace, groupName)
if err != nil {
klog.Errorf("Issue getting group pods for %s", groupName)
}
fluxID, err := q.GetFluxID(pod.Namespace, groupName)
err = workers.Cleanup(q.Context, string(podspec), fluxID, true, groupName)

// Determine finished status (delete via fluxion flux id ONLY if all are finished)
finished := true
for _, pod := range pods {
isFinished := podutil.IsPodPhaseTerminal(pod.Status.Phase)
if !isFinished {
finished = false
break
}
}
if !finished {
fluxID = -1
}
err = workers.Cleanup(q.Context, string(podspec), fluxID, false, groupName)
}
1 change: 1 addition & 0 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (

// We need to get a single podspec for binding, etc
GetPodspecQuery = "select podspec from pods_provisional where group_name = $1 and name = $2 and namespace = $3;"
GetPodsQuery = "select name, podspec from pods_provisional where group_name = $1 and namespace = $2;"

// This query should achieve the following
// 1. Select groups for which the size >= the number of pods we've seen
Expand Down
47 changes: 47 additions & 0 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
klog "k8s.io/klog/v2"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -165,6 +168,50 @@ func (q *Queue) GetFluxID(namespace, groupName string) (int64, error) {
return int64(fluxID), err
}

// Get all pods in a group
func (q *Queue) GetGroupPods(namespace, groupName string) ([]*corev1.Pod, error) {
podlist := []*corev1.Pod{}
pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil {
klog.Errorf("Issue creating new pool %s", err)
return podlist, err
}
defer pool.Close()

podRows, err := pool.Query(q.Context, queries.GetPodsQuery, groupName, namespace)
if err != nil {
klog.Infof("GetPodsQuery Error: query for pods for group %s: %s", groupName, err)
return nil, err
}
pods, err := pgx.CollectRows(podRows, pgx.RowToStructByName[types.PodModel])
if err != nil {
klog.Infof("GetPodsQuery Error: collect rows for groups %s: %s", groupName, err)
return nil, err
}

// We need to get a live pod to determine if it is done
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

// Assemble one podspec, and list of pods that we will need
for _, item := range pods {
// Get the live pod with the API, ignore not found
pod, err := clientset.CoreV1().Pods(namespace).Get(q.Context, item.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
klog.Infof("Error retrieving Pod %s/%s: %s", namespace, item.Name, err)
} else {
podlist = append(podlist, pod)
}
}
return podlist, nil
}

// Get a pod (Podspec) on demand
// We need to be able to do this to complete a scheduling cycle
// This podSpec will eventually need to go into the full request to
Expand Down
26 changes: 2 additions & 24 deletions kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types"
)

// Job Database Model we are retrieving for jobs
// We will eventually want more than these three
type JobModel struct {
GroupName string `db:"group_name"`
Namespace string `db:"namespace"`
GroupSize int32 `db:"group_size"`
Duration int32 `db:"duration"`
Podspec string `db:"podspec"`
}

// This collects the individual pod names and podspecs for the group
type PodModel struct {
Name string `db:"name"`
Podspec string `db:"podspec"`
}

// GroupModel provides the group name and namespace for groups at size
type GroupModel struct {
GroupName string `db:"group_name"`
Namespace string `db:"namespace"`
}

// The provisional queue is a custom queue (to go along with a queue strategy attached
// to a Fluxnetes.Queue) that handles ingesting single pods, and delivering them
// in a particular way (e.g., sorted by timestamp, by group, etc). Since these
Expand Down Expand Up @@ -144,7 +122,7 @@ func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Poo
}
defer rows.Close()

models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel])
models, err := pgx.CollectRows(rows, pgx.RowToStructByName[types.JobModel])
if err != nil {
klog.Infof("GetReadGroups Error: collect rows for groups at size: %s", err)
return nil, err
Expand All @@ -167,7 +145,7 @@ func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Poo
return nil, err
}

pods, err := pgx.CollectRows(podRows, pgx.RowToStructByName[PodModel])
pods, err := pgx.CollectRows(podRows, pgx.RowToStructByName[types.PodModel])
if err != nil {
klog.Infof("SelectPodsQuery Error: collect rows for groups %s: %s", model.GroupName, err)
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ func deleteFluxion(fluxID int64) error {
// see: https://riverqueue.com/docs/job-retries
conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure())
if err != nil {
klog.Error("[Fluxnetes] AskFlux error connecting to server: %v\n", err)
return err
return fmt.Errorf("[Fluxnetes] AskFlux error connecting to server: %v\n", err)
}
defer conn.Close()

Expand All @@ -250,7 +249,8 @@ func deleteFluxion(fluxID int64) error {
// and possible already cancelled?
_, err = fluxion.Cancel(fluxionCtx, request)
if err != nil {
klog.Errorf("[Fluxnetes] Issue with cancel %s", err)
return fmt.Errorf("[Fluxnetes] Issue with cancel %s", err)
}
klog.Infof("[Fluxnetes] Successful cancel for jobid %d", fluxID)
return err
}
22 changes: 22 additions & 0 deletions kubernetes/pkg/fluxnetes/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,25 @@ const (
// Unknown means some other error happened (usually not related to pod)
Unknown
)

// Job Database Model we are retrieving for jobs
// We will eventually want more than these three
type JobModel struct {
GroupName string `db:"group_name"`
Namespace string `db:"namespace"`
GroupSize int32 `db:"group_size"`
Duration int32 `db:"duration"`
Podspec string `db:"podspec"`
}

// This collects the individual pod names and podspecs for the group
type PodModel struct {
Name string `db:"name"`
Podspec string `db:"podspec"`
}

// GroupModel provides the group name and namespace for groups at size
type GroupModel struct {
GroupName string `db:"group_name"`
Namespace string `db:"namespace"`
}

0 comments on commit 0d577aa

Please sign in to comment.