Skip to content

Commit

Permalink
group: first fully working group through cleanup (#16)
Browse files Browse the repository at this point in the history
* group: first fully working group through cleanup

This changeset adds the first completely working submit through
cleanup, where all tables are properly cleaned up! We can actually
see the group of pods run and go away. Next I want to add back
the kubectl command so we can get an idea of job state in the queue,
etc.

Signed-off-by: vsoch <[email protected]>

---------

Signed-off-by: vsoch <[email protected]>
Co-authored-by: vsoch <[email protected]>
  • Loading branch information
vsoch and vsoch authored Aug 9, 2024
1 parent 62baf69 commit d6437af
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 104 deletions.
5 changes: 5 additions & 0 deletions .github/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ echo "Sleeping 1 minute waiting for scheduler deploy"
sleep 60
kubectl get pods

# Note that exiting early here on success, we will test this further with the kubectl command TBA
# since all pods are cleaned up. Or we need a way to distinguish between the terminated event
# because the job finished and terminated otherwise (and needs the pod cleaned up)
exit 0

# This will get the fluence image (which has scheduler and sidecar), which should be first
fluxnetes_pod=$(kubectl get pods -o json | jq -r .items[0].metadata.name)
echo "Found fluxnetes pod ${fluxnetes_pod}"
Expand Down
11 changes: 3 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,20 @@ SELECT group_name, group_size from pods_provisional;

### TODO

- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many.
- [ ] Figure out how In-tree registry plugins (that are related to resources) should be run to inform fluxion
- [ ] Optimize queries.
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff)
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- Testing:
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
- [ ] spam submission and test reservations (and cancel)
- [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth)
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job
- Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled.
- This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error.
- we also need to look up jobid (flux) for a pod given deletion so we can issue cancel
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)
- [ ] In cleanup do we need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
Expand Down
28 changes: 19 additions & 9 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ After install (see the [README](../README.md)) you can create any abstraction (p
| "fluxnetes.group-name" | The name of the group | fluxnetes-group-<namespace>-<name> |
| "fluxnetes.group-size" | The size of the group | 1 |

As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single
pod becomes a single member group.
As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single pod becomes a single member group.

### Duration

Expand Down Expand Up @@ -63,18 +62,29 @@ As you can see in the picture, there is a Queue Manager. The Queue manager is st
1. We receive single pods (after being sorted by the Fluxnetes plugin) according to group name and time created.
2. We add them to a provisional table, where we also save that information along with size.
- a single pod is a group of size 1
- this provisional table will be kept until the group is entirely completed, as we use it to get podspecs
3. For each run of the Kubernetes `ScheduleOne` function (meaning we receive a new pod) we:
- add to the provisional table
- check the table for pod groups that have reached the desired size
- submit the jobs to the worker queue that have, and remove from the provisional table
- Enqueue
- add to the pod provisional table and (if not known) the group provisional table, which holds a count
- Schedule
- check the table for pod groups that have reached the desired size
- submit the jobs to the worker queue that have, and remove from the provisional table
4. Once in the worker queue, they are ordered by Priority and scheduledAt time.
5. The worker function does a call to fluxion `MatchAllocateElseReserve`
5. The worker function does a call to fluxion `MatchAllocateElseReserve` (up to some reservation depth)
- A reservation is put back into the queue - it will be run again!
- The reservation can be saved somewhere to inform the user (some future kubectl plugin)
- we can also ask the worker to run its "work" function in the future, either at onset or some event in the run
6. If allocated, the event goes back to the main schedule loop and the binding occurs

We currently don't elegantly handle the scheduleCycle and bindingCycle call (in that we might want to give metadata to fluxion that goes into them). This needs to be done!
6. Events are received back in the main Schedule->Run function
- A successfully completed job with nodes is an allocation. We take the list of nodes and pod names and bind them all at once.
- A cancelled job is cleaned up. This means something happened that we deemed it unschedulable / unsatisfiable
- A group that has not yet been allocated or run won't show up here!
7. For both cancelled and completed, we perform cleanup
- The Fluxion Job ID is cancelled, if we have one
- The pod is deleted (terminated) and we walk up to the root object (e.g., Job) to delete the rest
- The pod is removed from the pods provisional table, and the group from pending
- This opens up the group name and namespace (unique identifiers) to accept new groups

For jobs with durations set that don't finish in time, a cleanup job is submit that will trigger at the time to run the same cleanup function above. If pods / groups finish early, a deletion event is triggered that does the same. We currently don't elegantly handle the scheduleCycle and bindingCycle call (in that we might want to give metadata to fluxion that goes into them). This needs to be done!

#### Queue Strategies

Expand Down
Binary file modified docs/images/state-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions examples/job-10.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job
spec:
completions: 10
parallelism: 10
template:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.group-size: "10"
spec:
schedulerName: fluxnetes
containers:
- name: job
image: busybox
command: [echo, potato]
restartPolicy: Never
backoffLimit: 4
22 changes: 14 additions & 8 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package fluxnetes

import (
"encoding/json"

groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,14 +53,6 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
// TODO insert submit cleanup here - get the fluxid from pending?
// TODO need to put somewhere to remove from pending
// Likely we can keep around the group name and flux id in a database, and get / delete from there.
// err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{})
//if err != nil {
// klog.Errorf("Issue cleaning up deleted pod", err)
// }
//}}
case corev1.PodFailed:
klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodUnknown:
Expand All @@ -66,4 +61,15 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name)
}

// Get the fluxid from the database, and issue cleanup for the group:
// - deletes fluxID if it exists
// - cleans up Kubernetes objects up to parent with "true"
// - cleans up job in pending table
podspec, err := json.Marshal(pod)
if err != nil {
klog.Errorf("Issue marshalling podspec for Pod %s/%s", pod.Namespace, pod.Name)
}
groupName := groups.GetPodGroupName(pod)
fluxID, err := q.GetFluxID(pod.Namespace, groupName)
err = workers.Cleanup(q.Context, string(podspec), fluxID, true, groupName)
}
15 changes: 11 additions & 4 deletions kubernetes/pkg/fluxnetes/fluxnetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,23 @@ var (

// JobResult serializes a result from Fluxnetes in the scheduler back to metadata
type JobResult struct {
JobID int32 `json:"jobid"`
Nodes string `json:"nodes"`
PodID string `json:"podid"`
PodSpec string `json:"podspec"`
JobID int32 `json:"jobid"`
Nodes string `json:"nodes"`
PodID string `json:"podid"`
PodSpec string `json:"podspec"`
Names string `json:"names"`
Namespace string `json:"namespace"`
GroupName string `json:"groupName"`
}

func (j JobResult) GetNodes() []string {
return strings.Split(j.Nodes, ",")
}

func (j JobResult) GetPodNames() []string {
return strings.Split(j.Names, ",")
}

// Fluxnetes (as a plugin) is only enabled for the queue sort
type Fluxnetes struct{}

Expand Down
19 changes: 14 additions & 5 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package queries

const (
// Used to get the earliest timstamp for the group
// Used to get the earliest timestamp for the group
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 and namespace=$2 limit 1;"

// When we complete a job worker type after a successful MatchAllocate, this is how we send nodes back via an event
Expand All @@ -12,30 +12,39 @@ const (
DeleteReservationsQuery = "truncate reservations; delete from reservations;"
GetReservationsQuery = "select (group_name, flux_id) from reservations;"

// We need to get a single podspec for binding, etc
GetPodspecQuery = "select podspec from pods_provisional where group_name = $1 and name = $2 and namespace = $3;"

// This query should achieve the following
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get a representative pod to model the resources for the group
// TODO add back created by and then sort by it desc
SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;"
SelectRepresentativePodQuery = `select podspec from pods_provisional where group_name = $1 and namespace = $2;`
SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;"

// This currently will use one podspec (and all names) and we eventually want it to use all podspecs
SelectPodsQuery = `select name, podspec from pods_provisional where group_name = $1 and namespace = $2;`

// Pending queue - inserted after moving from provisional
InsertIntoPending = "insert into pending_queue (group_name, namespace, group_size) SELECT '%s', '%s', '%d' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM pending_queue WHERE group_name = '%s' and namespace = '%s');"

// We delete from the provisional tables when a group is added to the work queues (and pending queue, above)
DeleteProvisionalGroupsQuery = "delete from groups_provisional where %s;"
DeleteGroupsQuery = "delete from pods_provisional where %s;"
DeleteProvisionalPodsQuery = "delete from pods_provisional where group_name = $1 and namespace = $2;"

// TODO add created_at back
InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');"

// Enqueue queries
// 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here)
// 2. Groups are added to the groups_provisional, and this is where we can easily store a current cound
// 2. Groups are added to the groups_provisional, and this is where we can easily store a current count
// Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again)
InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');"
IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';"

// After allocate success, we update pending with the ID. We retrieve it to issue fluxion to cancel when it finishes
UpdatingPendingWithFluxID = "update pending_queue set flux_id = $1 where group_name = $2 and namespace = $3;"
GetFluxID = "select flux_id from pending_queue where group_name = $1 and namespace = $2;"

// Pending Queue queries
// 3. We always check if a group is in pending before Enqueue, because if so, we aren't allowed to modify / add to the group
IsPendingQuery = "select * from pending_queue where group_name = $1 and namespace = $2;"
Expand Down
47 changes: 47 additions & 0 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fluxnetes

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -143,6 +144,52 @@ func (q *Queue) setupEvents() {
q.EventChannel = &QueueEvent{Function: trigger, Channel: c}
}

// Common queue / database functions across strategies!
// GetFluxID returns the flux ID, and -1 if not found (deleted)
func (q *Queue) GetFluxID(namespace, groupName string) (int64, error) {
var fluxID int32 = -1
pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil {
klog.Errorf("Issue creating new pool %s", err)
return int64(fluxID), err
}
defer pool.Close()
result := pool.QueryRow(context.Background(), queries.GetFluxID, groupName, namespace)
err = result.Scan(&fluxID)

// This can simply mean it was already deleted from pending
if err != nil {
klog.Infof("Error retrieving FluxID for %s/%s: %s", groupName, namespace, err)
return int64(-1), err
}
return int64(fluxID), err
}

// Get a pod (Podspec) on demand
// We need to be able to do this to complete a scheduling cycle
// This podSpec will eventually need to go into the full request to
// ask fluxion for nodes, right now we still use a single representative one
func (q *Queue) GetPodSpec(namespace, name, groupName string) (*corev1.Pod, error) {

pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil {
klog.Errorf("Issue creating new pool %s", err)
return nil, err
}
defer pool.Close()

var podspec string
result := pool.QueryRow(context.Background(), queries.GetPodspecQuery, groupName, name, namespace)
err = result.Scan(&podspec)
if err != nil {
klog.Infof("Error scanning podspec for %s/%s", namespace, name)
return nil, err
}
var pod corev1.Pod
err = json.Unmarshal([]byte(podspec), &pod)
return &pod, err
}

// GetInformer returns the pod informer to run as a go routine
func (q *Queue) GetInformer() cache.SharedIndexInformer {

Expand Down
Loading

0 comments on commit d6437af

Please sign in to comment.