Skip to content

Commit

Permalink
group: first fully working group through cleanup
Browse files Browse the repository at this point in the history
This changeset adds the first completely working submit through
cleanup, where all tables are properly cleaned up! We can actually
see the group of pods run and go away. Next I want to add back
the kubectl command so we can get an idea of job state in the queue,
etc.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Aug 9, 2024
1 parent 96a0d61 commit 01efadf
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 89 deletions.
11 changes: 3 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ SELECT group_name, group_size from pods_provisional;

### TODO

- [ ] THIS NEXT: In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many.
- [ ] Figure out how In-tree registry plugins (that are related to resources) should be run to inform fluxion
- [ ] Optimize queries.
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff)
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
Expand All @@ -182,15 +182,10 @@ SELECT group_name, group_size from pods_provisional;
- [ ] spam submission and test reservations (and cancel)
- [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth)
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job
- Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled.
- This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error.
- we also need to look up jobid (flux) for a pod given deletion so we can issue cancel
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)
- [ ] In cleanup do we need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
Expand Down
28 changes: 19 additions & 9 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ After install (see the [README](../README.md)) you can create any abstraction (p
| "fluxnetes.group-name" | The name of the group | fluxnetes-group-<namespace>-<name> |
| "fluxnetes.group-size" | The size of the group | 1 |

As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single
pod becomes a single member group.
As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single pod becomes a single member group.

### Duration

Expand Down Expand Up @@ -63,18 +62,29 @@ As you can see in the picture, there is a Queue Manager. The Queue manager is st
1. We receive single pods (after being sorted by the Fluxnetes plugin) according to group name and time created.
2. We add them to a provisional table, where we also save that information along with size.
- a single pod is a group of size 1
- this provisional table will be kept until the group is entirely completed, as we use it to get podspecs
3. For each run of the Kubernetes `ScheduleOne` function (meaning we receive a new pod) we:
- add to the provisional table
- check the table for pod groups that have reached the desired size
- submit the jobs to the worker queue that have, and remove from the provisional table
- Enqueue
- add to the pod provisional table and (if not known) the group provisional table, which holds a count
- Schedule
- check the table for pod groups that have reached the desired size
- submit the jobs to the worker queue that have, and remove from the provisional table
4. Once in the worker queue, they are ordered by Priority and scheduledAt time.
5. The worker function does a call to fluxion `MatchAllocateElseReserve`
5. The worker function does a call to fluxion `MatchAllocateElseReserve` (up to some reservation depth)
- A reservation is put back into the queue - it will be run again!
- The reservation can be saved somewhere to inform the user (some future kubectl plugin)
- we can also ask the worker to run its "work" function in the future, either at onset or some event in the run
6. If allocated, the event goes back to the main schedule loop and the binding occurs

We currently don't elegantly handle the scheduleCycle and bindingCycle call (in that we might want to give metadata to fluxion that goes into them). This needs to be done!
6. Events are received back in the main Schedule->Run function
- A successfully completed job with nodes is an allocation. We take the list of nodes and pod names and bind them all at once.
- A cancelled job is cleaned up. This means something happened that we deemed it unschedulable / unsatisfiable
- A group that has not yet been allocated or run won't show up here!
7. For both cancelled and completed, we perform cleanup
- The Fluxion Job ID is cancelled, if we have one
- The pod is deleted (terminated) and we walk up to the root object (e.g., Job) to delete the rest
- The pod is removed from the pods provisional table, and the group from pending
- This opens up the group name and namespace (unique identifiers) to accept new groups

For jobs with durations set that don't finish in time, a cleanup job is submit that will trigger at the time to run the same cleanup function above. If pods / groups finish early, a deletion event is triggered that does the same. We currently don't elegantly handle the scheduleCycle and bindingCycle call (in that we might want to give metadata to fluxion that goes into them). This needs to be done!

#### Queue Strategies

Expand Down
Binary file modified docs/images/state-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 14 additions & 8 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package fluxnetes

import (
"encoding/json"

groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,14 +53,6 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
// TODO insert submit cleanup here - get the fluxid from pending?
// TODO need to put somewhere to remove from pending
// Likely we can keep around the group name and flux id in a database, and get / delete from there.
// err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{})
//if err != nil {
// klog.Errorf("Issue cleaning up deleted pod", err)
// }
//}}
case corev1.PodFailed:
klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodUnknown:
Expand All @@ -66,4 +61,15 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name)
}

// Get the fluxid from the database, and issue cleanup for the group:
// - deletes fluxID if it exists
// - cleans up Kubernetes objects up to parent with "true"
// - cleans up job in pending table
podspec, err := json.Marshal(pod)
if err != nil {
klog.Errorf("Issue marshalling podspec for Pod %s/%s", pod.Namespace, pod.Name)
}
groupName := groups.GetPodGroupName(pod)
fluxID, err := q.GetFluxID(pod.Namespace, groupName)
err = workers.Cleanup(q.Context, string(podspec), fluxID, true, groupName)
}
12 changes: 7 additions & 5 deletions kubernetes/pkg/fluxnetes/fluxnetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ var (

// JobResult serializes a result from Fluxnetes in the scheduler back to metadata
type JobResult struct {
JobID int32 `json:"jobid"`
Nodes string `json:"nodes"`
PodID string `json:"podid"`
PodSpec string `json:"podspec"`
Names string `json:"names"`
JobID int32 `json:"jobid"`
Nodes string `json:"nodes"`
PodID string `json:"podid"`
PodSpec string `json:"podspec"`
Names string `json:"names"`
Namespace string `json:"namespace"`
GroupName string `json:"groupName"`
}

func (j JobResult) GetNodes() []string {
Expand Down
13 changes: 10 additions & 3 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package queries

const (
// Used to get the earliest timstamp for the group
// Used to get the earliest timestamp for the group
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 and namespace=$2 limit 1;"

// When we complete a job worker type after a successful MatchAllocate, this is how we send nodes back via an event
Expand All @@ -12,6 +12,9 @@ const (
DeleteReservationsQuery = "truncate reservations; delete from reservations;"
GetReservationsQuery = "select (group_name, flux_id) from reservations;"

// We need to get a single podspec for binding, etc
GetPodspecQuery = "select podspec from pods_provisional where group_name = $1 and name = $2 and namespace = $3;"

// This query should achieve the following
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get a representative pod to model the resources for the group
Expand All @@ -26,18 +29,22 @@ const (

// We delete from the provisional tables when a group is added to the work queues (and pending queue, above)
DeleteProvisionalGroupsQuery = "delete from groups_provisional where %s;"
DeleteGroupsQuery = "delete from pods_provisional where %s;"
DeleteProvisionalPodsQuery = "delete from pods_provisional where group_name = $1 and namespace = $2;"

// TODO add created_at back
InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');"

// Enqueue queries
// 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here)
// 2. Groups are added to the groups_provisional, and this is where we can easily store a current cound
// 2. Groups are added to the groups_provisional, and this is where we can easily store a current count
// Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again)
InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');"
IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';"

// After allocate success, we update pending with the ID. We retrieve it to issue fluxion to cancel when it finishes
UpdatingPendingWithFluxID = "update pending_queue set flux_id = $1 where group_name = $2 and namespace = $3;"
GetFluxID = "select flux_id from pending_queue where group_name = $1 and namespace = $2;"

// Pending Queue queries
// 3. We always check if a group is in pending before Enqueue, because if so, we aren't allowed to modify / add to the group
IsPendingQuery = "select * from pending_queue where group_name = $1 and namespace = $2;"
Expand Down
47 changes: 47 additions & 0 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fluxnetes

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -143,6 +144,52 @@ func (q *Queue) setupEvents() {
q.EventChannel = &QueueEvent{Function: trigger, Channel: c}
}

// Common queue / database functions across strategies!
// GetFluxID returns the flux ID, and -1 if not found (deleted)
func (q *Queue) GetFluxID(namespace, groupName string) (int64, error) {
var fluxID int32 = -1
pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil {
klog.Errorf("Issue creating new pool %s", err)
return int64(fluxID), err
}
defer pool.Close()
result := pool.QueryRow(context.Background(), queries.GetFluxID, groupName, namespace)
err = result.Scan(&fluxID)

// This can simply mean it was already deleted from pending
if err != nil {
klog.Infof("Error retrieving FluxID for %s/%s: %s", groupName, namespace, err)
return int64(-1), err
}
return int64(fluxID), err
}

// Get a pod (Podspec) on demand
// We need to be able to do this to complete a scheduling cycle
// This podSpec will eventually need to go into the full request to
// ask fluxion for nodes, right now we still use a single representative one
func (q *Queue) GetPodSpec(namespace, name, groupName string) (*corev1.Pod, error) {

pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil {
klog.Errorf("Issue creating new pool %s", err)
return nil, err
}
defer pool.Close()

var podspec string
result := pool.QueryRow(context.Background(), queries.GetPodspecQuery, groupName, name, namespace)
err = result.Scan(&podspec)
if err != nil {
klog.Infof("Error scanning podspec for %s/%s", namespace, name)
return nil, err
}
var pod corev1.Pod
err = json.Unmarshal([]byte(podspec), &pod)
return &pod, err
}

// GetInformer returns the pod informer to run as a go routine
func (q *Queue) GetInformer() cache.SharedIndexInformer {

Expand Down
19 changes: 7 additions & 12 deletions kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ProvisionalQueue struct {
pool *pgxpool.Pool
}

// incrementGroupProvisonal adds 1 to the count of the group provisional queue
// incrementGroupProvisional adds 1 to the count of the group provisional queue
func incrementGroupProvisional(
ctx context.Context,
pool *pgxpool.Pool,
Expand Down Expand Up @@ -132,7 +132,7 @@ func (q *ProvisionalQueue) Enqueue(
return types.PodEnqueueSuccess, nil
}

// getReadyGroups gets groups thta are ready for moving from provisional to pending
// getReadyGroups gets groups that are ready for moving from provisional to pending
// We also save the pod names so we can assign (bind) to nodes later
func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, error) {

Expand Down Expand Up @@ -214,17 +214,12 @@ func (q *ProvisionalQueue) deleteGroups(
}
klog.Infof("Query is %s", query)

// This deletes from the single pod provisional table
queryProvisional := fmt.Sprintf(queries.DeleteGroupsQuery, query)
_, err := pool.Exec(ctx, queryProvisional)
if err != nil {
klog.Infof("Error with delete provisional pods %s: %s", query, err)
return err
}
// Note that we don't delete from the single pod provisional table
// until we have used it to get the podspec (and job is complete)

// This from the grroup
// Delete from the group provisional table, which we don't need anymore
query = fmt.Sprintf(queries.DeleteProvisionalGroupsQuery, query)
_, err = pool.Exec(ctx, query)
_, err := pool.Exec(ctx, query)
if err != nil {
klog.Infof("Error with delete groups provisional %s: %s", query, err)
return err
Expand All @@ -250,7 +245,7 @@ func (q *ProvisionalQueue) insertPending(
result := pool.SendBatch(ctx, batch)
err := result.Close()
if err != nil {
klog.Errorf("Error comitting to send %d groups into pending %s", len(groups), err)
klog.Errorf("Error committing to send %d groups into pending %s", len(groups), err)
}
return err
}
Expand Down
17 changes: 11 additions & 6 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func Cleanup(
if fluxID > -1 {
err = deleteFluxion(fluxID)
if err != nil {
klog.Infof("Error issuing cancel to fluxion for group %s/%s ", groupName)
klog.Infof("Error issuing cancel to fluxion for group %s", groupName)
}
return err
}
Expand All @@ -204,8 +204,13 @@ func Cleanup(
}
defer pool.Close()

// First check - a pod group in pending is not allowed to enqueue new pods.
// This means the job is submit / running (and not completed
// Delete from pending and pods provisional, meaning we are allowed to accept new pods for the group
_, err = pool.Exec(context.Background(), queries.DeleteProvisionalGroupsQuery, groupName, pod.Namespace)
if err != nil {
klog.Infof("Error deleting Pods %s/%s from provisional queue", pod.Namespace, pod.Name)
return err
}

_, err = pool.Exec(context.Background(), queries.DeleteFromPendingQuery, groupName, pod.Namespace)
if err != nil {
klog.Infof("Error deleting Pod %s/%s from pending queue", pod.Namespace, pod.Name)
Expand Down Expand Up @@ -242,11 +247,11 @@ func deleteFluxion(fluxID int64) error {
}

// Assume if there is an error we should try again
// TOOD:(vsoch) How to distinguish between cancel error
// TODO:(vsoch) How to distinguish between cancel error
// and possible already cancelled?
response, err := fluxion.Cancel(fluxionCtx, request)
_, err = fluxion.Cancel(fluxionCtx, request)
if err != nil {
klog.Errorf("[Fluxnetes] Issue with cancel %s %s", response.Error, err)
klog.Errorf("[Fluxnetes] Issue with cancel %s", err)
}
return err
}
Loading

0 comments on commit 01efadf

Please sign in to comment.