Skip to content

Commit

Permalink
Merge pull request #15 from converged-computing/add-unschedulable
Browse files Browse the repository at this point in the history
add unschedulable status
  • Loading branch information
vsoch authored Aug 8, 2024
2 parents 606e18b + f387611 commit 62baf69
Show file tree
Hide file tree
Showing 22 changed files with 487 additions and 175 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,34 @@ SELECT group_name, group_size from pods_provisional;
- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many.
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff)
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- Testing:
- [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up)
- [ ] spam submission and test reservations (and cancel)
- [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth)
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] create state diagram that shows how stuff works
- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job
- Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled.
- This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error.
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.
- we also need to look up jobid (flux) for a pod given deletion so we can issue cancel
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- How do we distinguish between a cancel to fluxion (error) vs. error because it was already cancelled?
- How would that happen?
- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
- more efficient to retrieve podspec from kubernetes instead of putting into database?

TODO:

- test job that has too many resources and won't pass (it should not make it to provisional or pending_queue)
- can we do a satisfies first?
- we probably need a unique on the insert...
- when that works, a pod that is completed / done needs to be removed from pending

## License

HPCIC DevTools is distributed under the terms of the MIT license.
Expand Down

This file was deleted.

8 changes: 8 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ We will eventually have other strategies that allow for more reservations. This
- **Default** "workers queue" (is what I call it) is what handles asking fluxion for allocations. This is the main queue.
- **Cleanup** "cancel queue" is what handles canceling reservations, and when pods are cancelled (to be implemented) it will handle that as well. It's a different queue (and different workers) so the jobs do not collide.

#### State Diagram

The following overview and diagrams describe the above components and show basic states.

![images/state-overview.png](images/state-overview.png)

![images/state-diagram.png](images/state-diagram.png)


## Notes

Expand Down
Binary file added docs/images/state-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/state-overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 23 additions & 0 deletions examples/job-unsatisfiable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job
spec:
completions: 10
parallelism: 10
template:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.group-size: "10"
spec:
schedulerName: fluxnetes
containers:
- name: job
image: busybox
command: [echo, potato]
resources:
requests:
cpu: 20
restartPolicy: Never
backoffLimit: 4
3 changes: 0 additions & 3 deletions examples/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ metadata:
name: job
spec:
template:
metadata:
labels:
fluxnetes.group-name: job
spec:
schedulerName: fluxnetes
containers:
Expand Down
11 changes: 10 additions & 1 deletion kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package fluxnetes

import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers"

corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

// Cleanup deletes a pod. It is assumed that it cannot be scheduled
// This means we do not have a flux id to cancel (-1)
func (q Queue) Cleanup(pod *corev1.Pod, podspec, groupName string) error {
return workers.Cleanup(q.Context, podspec, int64(-1), true, groupName)
}

// UpdatePodEvent is called on an update, and the old and new object are presented
func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) {

Expand Down Expand Up @@ -42,7 +50,8 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
// TODO insert submit cleanup here - need a way to get the fluxId
// TODO insert submit cleanup here - get the fluxid from pending?
// TODO need to put somewhere to remove from pending
// Likely we can keep around the group name and flux id in a database, and get / delete from there.
// err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{})
//if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion kubernetes/pkg/fluxnetes/fluxnetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
)

var (
GroupName = "scheduling.x-k8s.io"
CancelledState = "cancelled"
CleanupQueue = "cleanup"
Unsatisfiable = "unsatisfiable"
)

// JobResult serializes a result from Fluxnetes in the scheduler back to metadata
Expand Down
48 changes: 32 additions & 16 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,45 @@
package queries

// Queries used by the main queue (and shared across strategies sometimes)
const (
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, duration, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6, $7)"
CountPodsQuery = "select count(*) from pods_provisional where group_name=$1"
UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;"
// Used to get the earliest timstamp for the group
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 and namespace=$2 limit 1;"

// When we complete a job worker type after a successful MatchAllocate, this is how we send nodes back via an event
UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;"

// Reservations
AddReservationQuery = "insert into reservations (group_name, flux_id) values ($1, $2);"
DeleteReservationsQuery = "truncate reservations; delete from reservations;"
GetReservationsQuery = "select (group_name, flux_id) from reservations;"

// This query should achieve the following (but does not work)
// This query should achieve the following
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler)
// Ensures we are sorting by the timestamp when they were added (should be DESC I think)
RefreshGroupsQuery = "refresh materialized view groups_size;"
SelectGroupsReadyQuery = "select * from pods_provisional join groups_size on pods_provisional.group_name = groups_size.group_name where group_size >= count order by created_at desc;"
// 2. Then get a representative pod to model the resources for the group
// TODO add back created by and then sort by it desc
SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;"
SelectRepresentativePodQuery = `select podspec from pods_provisional where group_name = $1 and namespace = $2;`

// Pending queue - inserted after moving from provisional
InsertIntoPending = "insert into pending_queue (group_name, namespace, group_size) SELECT '%s', '%s', '%d' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM pending_queue WHERE group_name = '%s' and namespace = '%s');"

// We delete from the provisional tables when a group is added to the work queues (and pending queue, above)
DeleteProvisionalGroupsQuery = "delete from groups_provisional where %s;"
DeleteGroupsQuery = "delete from pods_provisional where %s;"

// TODO add created_at back
InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');"

// Enqueue queries
// 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here)
// 2. Groups are added to the groups_provisional, and this is where we can easily store a current cound
// Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again)
InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');"
IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';"

// 3. Then delete all from the table
DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');"
// Pending Queue queries
// 3. We always check if a group is in pending before Enqueue, because if so, we aren't allowed to modify / add to the group
IsPendingQuery = "select * from pending_queue where group_name = $1 and namespace = $2;"

// Note that is used to be done with two queries - these are no longer used
SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size, created_at having group_size >= count(*) order by created_at desc;"
SelectGroupsQuery = "select group_name, group_size, podspec, duration from pods_provisional where group_name in ('%s');"
// We remove from pending to allow another group submission of the same name on cleanup
DeleteFromPendingQuery = "delete from pending_queue where group_name=$1 and namespace=$2;"
)
34 changes: 19 additions & 15 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries"
strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types"
)

const (
Expand Down Expand Up @@ -58,7 +59,7 @@ type QueueEvent struct {

// NewQueue starts a new queue with a river client
func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) {
dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return nil, err
}
Expand All @@ -71,7 +72,7 @@ func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) {

// Each strategy has its own worker type
strategy.AddWorkers(workers)
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
// Change the verbosity of the logger here
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
Expand Down Expand Up @@ -103,7 +104,7 @@ func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) {

queue := Queue{
riverClient: riverClient,
Pool: dbPool,
Pool: pool,
Strategy: strategy,
Context: ctx,
ReservationDepth: depth,
Expand Down Expand Up @@ -160,26 +161,26 @@ func (q *Queue) GetInformer() cache.SharedIndexInformer {
// Enqueue a new job to the provisional queue
// 1. Assemble (discover or define) the group
// 2. Add to provisional table
func (q *Queue) Enqueue(pod *corev1.Pod) error {
func (q *Queue) Enqueue(pod *corev1.Pod) (types.EnqueueStatus, error) {

// Get the pod name, duration (seconds) and size, first from labels, then defaults
groupName := groups.GetPodGroupName(pod)
size, err := groups.GetPodGroupSize(pod)
if err != nil {
return err

return types.Unknown, err
}

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
if err != nil {
return err
return types.Unknown, err
}

duration, err := groups.GetPodGroupDuration(pod)
if err != nil {
return err
return types.Unknown, err
}

// Log the namespace/name, group name, and size
klog.Infof("Pod %s has Group %s (%d, %d seconds) created at %s", pod.Name, groupName, size, duration, ts)

Expand All @@ -199,18 +200,21 @@ func (q *Queue) Enqueue(pod *corev1.Pod) error {
// This mimics what Kubernetes does. Note that jobs can be sorted
// based on the scheduled at time AND priority.
func (q *Queue) Schedule() error {
// Queue Strategy "Schedule" moves provional to the worker queue
// Queue Strategy "Schedule" moves provisional to the worker queue
// We get them back in a back to schedule

batch, err := q.Strategy.Schedule(q.Context, q.Pool, q.ReservationDepth)
if err != nil {
return err
}

count, err := q.riverClient.InsertMany(q.Context, batch)
if err != nil {
return err
if len(batch) > 0 {
count, err := q.riverClient.InsertMany(q.Context, batch)
if err != nil {
return err
}
klog.Infof("[Fluxnetes] Schedule inserted %d jobs\n", count)
}
klog.Infof("[Fluxnetes] Schedule inserted %d jobs\n", count)

// Post submit functions
return q.Strategy.PostSubmit(q.Context, q.Pool, q.riverClient)
Expand All @@ -223,8 +227,8 @@ func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.
// First see if we've seen the group before, the creation times are shared across a group
ts := metav1.MicroTime{}

// This query will fail if there are no rows (the podGroup is not known)
row := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName)
// This query will fail if there are no rows (the podGroup is not known in the namespace)
row := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName, pod.Namespace)
err := row.Scan(&ts)
if err == nil {
klog.Info("Creation timestamp is", ts)
Expand Down
28 changes: 15 additions & 13 deletions kubernetes/pkg/fluxnetes/strategy/easy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/provisional"
work "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types"
)

// Easy with Backfill
Expand Down Expand Up @@ -59,7 +60,6 @@ func (s EasyBackfill) Schedule(
reservationDepth int32,
) ([]river.InsertManyParams, error) {

// TODO move logic from here into provisional
pending := provisional.NewProvisionalQueue(pool)

// Is this group ready to be scheduled with the addition of this pod?
Expand Down Expand Up @@ -91,7 +91,7 @@ func (s EasyBackfill) Schedule(
}
batch = append(batch, args)
}
return batch, err
return batch, nil
}

// PostSubmit does clearing of reservations
Expand Down Expand Up @@ -142,18 +142,20 @@ func (s EasyBackfill) PostSubmit(
}

// Insert the cleanup jobs
count, err := riverClient.InsertMany(ctx, batch)
if err != nil {
return err
}
klog.Infof("[easy] post cleanup (cancel) of %d jobs", count)
if len(batch) > 0 {
count, err := riverClient.InsertMany(ctx, batch)
if err != nil {
return err
}
klog.Infof("[easy] post cleanup (cancel) of %d jobs", count)

// Now cleanup!
dRows, err := pool.Query(ctx, queries.DeleteReservationsQuery)
if err != nil {
return err
// Now cleanup!
dRows, err := pool.Query(ctx, queries.DeleteReservationsQuery)
if err != nil {
return err
}
defer dRows.Close()
}
defer dRows.Close()
return nil
}

Expand All @@ -162,7 +164,7 @@ func (s EasyBackfill) Enqueue(
pool *pgxpool.Pool,
pod *corev1.Pod,
group *groups.PodGroup,
) error {
) (types.EnqueueStatus, error) {
pending := provisional.NewProvisionalQueue(pool)
return pending.Enqueue(ctx, pod, group)
}
Loading

0 comments on commit 62baf69

Please sign in to comment.