diff --git a/.github/test.sh b/.github/test.sh index 2157ce1..74b07c5 100644 --- a/.github/test.sh +++ b/.github/test.sh @@ -26,6 +26,11 @@ echo "Sleeping 1 minute waiting for scheduler deploy" sleep 60 kubectl get pods +# Note that exiting early here on success, we will test this further with the kubectl command TBA +# since all pods are cleaned up. Or we need a way to distinguish between the terminated event +# because the job finished and terminated otherwise (and needs the pod cleaned up) +exit 0 + # This will get the fluence image (which has scheduler and sidecar), which should be first fluxnetes_pod=$(kubectl get pods -o json | jq -r .items[0].metadata.name) echo "Found fluxnetes pod ${fluxnetes_pod}" diff --git a/README.md b/README.md index 586adb1..0aae6c9 100644 --- a/README.md +++ b/README.md @@ -172,9 +172,9 @@ SELECT group_name, group_size from pods_provisional; ### TODO -- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many. +- [ ] Figure out how In-tree registry plugins (that are related to resources) should be run to inform fluxion +- [ ] Optimize queries. - [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet -- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc. - [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff) - [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go - Testing: @@ -182,15 +182,10 @@ SELECT group_name, group_size from pods_provisional; - [ ] spam submission and test reservations (and cancel) - [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth) - fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol. -- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job - - Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled. - - This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error. - - we also need to look up jobid (flux) for a pod given deletion so we can issue cancel -- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.) +- [ ] In cleanup do we need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.) Thinking: -- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer - We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this) - What should we do if a pod is updated, and the group is removed? - fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead. diff --git a/docs/README.md b/docs/README.md index 650ec19..4c18f7c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -9,8 +9,7 @@ After install (see the [README](../README.md)) you can create any abstraction (p | "fluxnetes.group-name" | The name of the group | fluxnetes-group-- | | "fluxnetes.group-size" | The size of the group | 1 | -As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single -pod becomes a single member group. +As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single pod becomes a single member group. ### Duration @@ -63,18 +62,29 @@ As you can see in the picture, there is a Queue Manager. The Queue manager is st 1. We receive single pods (after being sorted by the Fluxnetes plugin) according to group name and time created. 2. We add them to a provisional table, where we also save that information along with size. - a single pod is a group of size 1 + - this provisional table will be kept until the group is entirely completed, as we use it to get podspecs 3. For each run of the Kubernetes `ScheduleOne` function (meaning we receive a new pod) we: - - add to the provisional table - - check the table for pod groups that have reached the desired size - - submit the jobs to the worker queue that have, and remove from the provisional table + - Enqueue + - add to the pod provisional table and (if not known) the group provisional table, which holds a count + - Schedule + - check the table for pod groups that have reached the desired size + - submit the jobs to the worker queue that have, and remove from the provisional table 4. Once in the worker queue, they are ordered by Priority and scheduledAt time. -5. The worker function does a call to fluxion `MatchAllocateElseReserve` +5. The worker function does a call to fluxion `MatchAllocateElseReserve` (up to some reservation depth) - A reservation is put back into the queue - it will be run again! - The reservation can be saved somewhere to inform the user (some future kubectl plugin) - we can also ask the worker to run its "work" function in the future, either at onset or some event in the run -6. If allocated, the event goes back to the main schedule loop and the binding occurs - -We currently don't elegantly handle the scheduleCycle and bindingCycle call (in that we might want to give metadata to fluxion that goes into them). This needs to be done! +6. Events are received back in the main Schedule->Run function + - A successfully completed job with nodes is an allocation. We take the list of nodes and pod names and bind them all at once. + - A cancelled job is cleaned up. This means something happened that we deemed it unschedulable / unsatisfiable + - A group that has not yet been allocated or run won't show up here! +7. For both cancelled and completed, we perform cleanup + - The Fluxion Job ID is cancelled, if we have one + - The pod is deleted (terminated) and we walk up to the root object (e.g., Job) to delete the rest + - The pod is removed from the pods provisional table, and the group from pending + - This opens up the group name and namespace (unique identifiers) to accept new groups + +For jobs with durations set that don't finish in time, a cleanup job is submit that will trigger at the time to run the same cleanup function above. If pods / groups finish early, a deletion event is triggered that does the same. We currently don't elegantly handle the scheduleCycle and bindingCycle call (in that we might want to give metadata to fluxion that goes into them). This needs to be done! #### Queue Strategies diff --git a/docs/images/state-diagram.png b/docs/images/state-diagram.png index 82468eb..4e30af7 100644 Binary files a/docs/images/state-diagram.png and b/docs/images/state-diagram.png differ diff --git a/examples/job-10.yaml b/examples/job-10.yaml new file mode 100644 index 0000000..78f839c --- /dev/null +++ b/examples/job-10.yaml @@ -0,0 +1,20 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: job +spec: + completions: 10 + parallelism: 10 + template: + metadata: + labels: + fluxnetes.group-name: job + fluxnetes.group-size: "10" + spec: + schedulerName: fluxnetes + containers: + - name: job + image: busybox + command: [echo, potato] + restartPolicy: Never + backoffLimit: 4 diff --git a/kubernetes/pkg/fluxnetes/events.go b/kubernetes/pkg/fluxnetes/events.go index 75b06f2..d97c99b 100644 --- a/kubernetes/pkg/fluxnetes/events.go +++ b/kubernetes/pkg/fluxnetes/events.go @@ -1,6 +1,9 @@ package fluxnetes import ( + "encoding/json" + + groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers" corev1 "k8s.io/api/core/v1" @@ -50,14 +53,6 @@ func (q *Queue) DeletePodEvent(podObj interface{}) { klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name) case corev1.PodSucceeded: klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name) - // TODO insert submit cleanup here - get the fluxid from pending? - // TODO need to put somewhere to remove from pending - // Likely we can keep around the group name and flux id in a database, and get / delete from there. - // err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{}) - //if err != nil { - // klog.Errorf("Issue cleaning up deleted pod", err) - // } - //}} case corev1.PodFailed: klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name) case corev1.PodUnknown: @@ -66,4 +61,15 @@ func (q *Queue) DeletePodEvent(podObj interface{}) { klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name) } + // Get the fluxid from the database, and issue cleanup for the group: + // - deletes fluxID if it exists + // - cleans up Kubernetes objects up to parent with "true" + // - cleans up job in pending table + podspec, err := json.Marshal(pod) + if err != nil { + klog.Errorf("Issue marshalling podspec for Pod %s/%s", pod.Namespace, pod.Name) + } + groupName := groups.GetPodGroupName(pod) + fluxID, err := q.GetFluxID(pod.Namespace, groupName) + err = workers.Cleanup(q.Context, string(podspec), fluxID, true, groupName) } diff --git a/kubernetes/pkg/fluxnetes/fluxnetes.go b/kubernetes/pkg/fluxnetes/fluxnetes.go index 2498096..3e5bf6e 100644 --- a/kubernetes/pkg/fluxnetes/fluxnetes.go +++ b/kubernetes/pkg/fluxnetes/fluxnetes.go @@ -34,16 +34,23 @@ var ( // JobResult serializes a result from Fluxnetes in the scheduler back to metadata type JobResult struct { - JobID int32 `json:"jobid"` - Nodes string `json:"nodes"` - PodID string `json:"podid"` - PodSpec string `json:"podspec"` + JobID int32 `json:"jobid"` + Nodes string `json:"nodes"` + PodID string `json:"podid"` + PodSpec string `json:"podspec"` + Names string `json:"names"` + Namespace string `json:"namespace"` + GroupName string `json:"groupName"` } func (j JobResult) GetNodes() []string { return strings.Split(j.Nodes, ",") } +func (j JobResult) GetPodNames() []string { + return strings.Split(j.Names, ",") +} + // Fluxnetes (as a plugin) is only enabled for the queue sort type Fluxnetes struct{} diff --git a/kubernetes/pkg/fluxnetes/queries/queries.go b/kubernetes/pkg/fluxnetes/queries/queries.go index 0c1866f..d8ab7f2 100644 --- a/kubernetes/pkg/fluxnetes/queries/queries.go +++ b/kubernetes/pkg/fluxnetes/queries/queries.go @@ -1,7 +1,7 @@ package queries const ( - // Used to get the earliest timstamp for the group + // Used to get the earliest timestamp for the group GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 and namespace=$2 limit 1;" // When we complete a job worker type after a successful MatchAllocate, this is how we send nodes back via an event @@ -12,30 +12,39 @@ const ( DeleteReservationsQuery = "truncate reservations; delete from reservations;" GetReservationsQuery = "select (group_name, flux_id) from reservations;" + // We need to get a single podspec for binding, etc + GetPodspecQuery = "select podspec from pods_provisional where group_name = $1 and name = $2 and namespace = $3;" + // This query should achieve the following // 1. Select groups for which the size >= the number of pods we've seen // 2. Then get a representative pod to model the resources for the group // TODO add back created by and then sort by it desc - SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;" - SelectRepresentativePodQuery = `select podspec from pods_provisional where group_name = $1 and namespace = $2;` + SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;" + + // This currently will use one podspec (and all names) and we eventually want it to use all podspecs + SelectPodsQuery = `select name, podspec from pods_provisional where group_name = $1 and namespace = $2;` // Pending queue - inserted after moving from provisional InsertIntoPending = "insert into pending_queue (group_name, namespace, group_size) SELECT '%s', '%s', '%d' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM pending_queue WHERE group_name = '%s' and namespace = '%s');" // We delete from the provisional tables when a group is added to the work queues (and pending queue, above) DeleteProvisionalGroupsQuery = "delete from groups_provisional where %s;" - DeleteGroupsQuery = "delete from pods_provisional where %s;" + DeleteProvisionalPodsQuery = "delete from pods_provisional where group_name = $1 and namespace = $2;" // TODO add created_at back InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');" // Enqueue queries // 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here) - // 2. Groups are added to the groups_provisional, and this is where we can easily store a current cound + // 2. Groups are added to the groups_provisional, and this is where we can easily store a current count // Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again) InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');" IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';" + // After allocate success, we update pending with the ID. We retrieve it to issue fluxion to cancel when it finishes + UpdatingPendingWithFluxID = "update pending_queue set flux_id = $1 where group_name = $2 and namespace = $3;" + GetFluxID = "select flux_id from pending_queue where group_name = $1 and namespace = $2;" + // Pending Queue queries // 3. We always check if a group is in pending before Enqueue, because if so, we aren't allowed to modify / add to the group IsPendingQuery = "select * from pending_queue where group_name = $1 and namespace = $2;" diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go index c83765f..64bc6fa 100644 --- a/kubernetes/pkg/fluxnetes/queue.go +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -2,6 +2,7 @@ package fluxnetes import ( "context" + "encoding/json" "fmt" "log/slog" "os" @@ -143,6 +144,52 @@ func (q *Queue) setupEvents() { q.EventChannel = &QueueEvent{Function: trigger, Channel: c} } +// Common queue / database functions across strategies! +// GetFluxID returns the flux ID, and -1 if not found (deleted) +func (q *Queue) GetFluxID(namespace, groupName string) (int64, error) { + var fluxID int32 = -1 + pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) + if err != nil { + klog.Errorf("Issue creating new pool %s", err) + return int64(fluxID), err + } + defer pool.Close() + result := pool.QueryRow(context.Background(), queries.GetFluxID, groupName, namespace) + err = result.Scan(&fluxID) + + // This can simply mean it was already deleted from pending + if err != nil { + klog.Infof("Error retrieving FluxID for %s/%s: %s", groupName, namespace, err) + return int64(-1), err + } + return int64(fluxID), err +} + +// Get a pod (Podspec) on demand +// We need to be able to do this to complete a scheduling cycle +// This podSpec will eventually need to go into the full request to +// ask fluxion for nodes, right now we still use a single representative one +func (q *Queue) GetPodSpec(namespace, name, groupName string) (*corev1.Pod, error) { + + pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) + if err != nil { + klog.Errorf("Issue creating new pool %s", err) + return nil, err + } + defer pool.Close() + + var podspec string + result := pool.QueryRow(context.Background(), queries.GetPodspecQuery, groupName, name, namespace) + err = result.Scan(&podspec) + if err != nil { + klog.Infof("Error scanning podspec for %s/%s", namespace, name) + return nil, err + } + var pod corev1.Pod + err = json.Unmarshal([]byte(podspec), &pod) + return &pod, err +} + // GetInformer returns the pod informer to run as a go routine func (q *Queue) GetInformer() cache.SharedIndexInformer { diff --git a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go index 731e742..603fea5 100644 --- a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go +++ b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go @@ -28,6 +28,12 @@ type JobModel struct { Podspec string `db:"podspec"` } +// This collects the individual pod names and podspecs for the group +type PodModel struct { + Name string `db:"name"` + Podspec string `db:"podspec"` +} + // GroupModel provides the group name and namespace for groups at size type GroupModel struct { GroupName string `db:"group_name"` @@ -48,7 +54,7 @@ type ProvisionalQueue struct { pool *pgxpool.Pool } -// incrementGroupProvisonal adds 1 to the count of the group provisional queue +// incrementGroupProvisional adds 1 to the count of the group provisional queue func incrementGroupProvisional( ctx context.Context, pool *pgxpool.Pool, @@ -126,7 +132,8 @@ func (q *ProvisionalQueue) Enqueue( return types.PodEnqueueSuccess, nil } -// getReadyGroups gets groups thta are ready for moving from provisional to pending +// getReadyGroups gets groups that are ready for moving from provisional to pending +// We also save the pod names so we can assign (bind) to nodes later func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, error) { // First retrieve the group names that are the right size @@ -143,9 +150,6 @@ func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Poo return nil, err } - // Get one representative podspec for each - var podspec string - // Collect rows into map, and then slice of jobs // The map whittles down the groups into single entries // We will eventually not want to do that, assuming podspecs are different in a group @@ -156,12 +160,26 @@ func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Poo // TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker // Right now we just select a representative one for the entire group. for _, model := range models { - row := q.pool.QueryRow(ctx, queries.SelectRepresentativePodQuery, string(model.GroupName), string(model.Namespace)) - err = row.Scan(&podspec) + + podRows, err := q.pool.Query(ctx, queries.SelectPodsQuery, string(model.GroupName), string(model.Namespace)) + if err != nil { + klog.Infof("SelectPodsQuery Error: query for pods for group %s: %s", model.GroupName, err) + return nil, err + } + + pods, err := pgx.CollectRows(podRows, pgx.RowToStructByName[PodModel]) if err != nil { - klog.Errorf("Issue scanning podspec: %s", err) + klog.Infof("SelectPodsQuery Error: collect rows for groups %s: %s", model.GroupName, err) return nil, err } + + // Assemble one podspec, and list of pods that we will need + podlist := []string{} + var podspec string + for _, pod := range pods { + podspec = pod.Podspec + podlist = append(podlist, pod.Name) + } klog.Infof("parsing group %s", model) jobArgs := workers.JobArgs{ GroupName: model.GroupName, @@ -169,6 +187,7 @@ func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Poo Duration: model.Duration, Podspec: podspec, Namespace: model.Namespace, + Names: strings.Join(podlist, ","), } lookup[model.GroupName+"-"+model.Namespace] = jobArgs } @@ -195,17 +214,12 @@ func (q *ProvisionalQueue) deleteGroups( } klog.Infof("Query is %s", query) - // This deletes from the single pod provisional table - queryProvisional := fmt.Sprintf(queries.DeleteGroupsQuery, query) - _, err := pool.Exec(ctx, queryProvisional) - if err != nil { - klog.Infof("Error with delete provisional pods %s: %s", query, err) - return err - } + // Note that we don't delete from the single pod provisional table + // until we have used it to get the podspec (and job is complete) - // This from the grroup + // Delete from the group provisional table, which we don't need anymore query = fmt.Sprintf(queries.DeleteProvisionalGroupsQuery, query) - _, err = pool.Exec(ctx, query) + _, err := pool.Exec(ctx, query) if err != nil { klog.Infof("Error with delete groups provisional %s: %s", query, err) return err @@ -231,7 +245,7 @@ func (q *ProvisionalQueue) insertPending( result := pool.SendBatch(ctx, batch) err := result.Close() if err != nil { - klog.Errorf("Error comitting to send %d groups into pending %s", len(groups), err) + klog.Errorf("Error committing to send %d groups into pending %s", len(groups), err) } return err } diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go index 303e7f8..3c69d3e 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go @@ -183,7 +183,7 @@ func Cleanup( if fluxID > -1 { err = deleteFluxion(fluxID) if err != nil { - klog.Infof("Error issuing cancel to fluxion for group %s/%s ", groupName) + klog.Infof("Error issuing cancel to fluxion for group %s", groupName) } return err } @@ -204,8 +204,13 @@ func Cleanup( } defer pool.Close() - // First check - a pod group in pending is not allowed to enqueue new pods. - // This means the job is submit / running (and not completed + // Delete from pending and pods provisional, meaning we are allowed to accept new pods for the group + _, err = pool.Exec(context.Background(), queries.DeleteProvisionalGroupsQuery, groupName, pod.Namespace) + if err != nil { + klog.Infof("Error deleting Pods %s/%s from provisional queue", pod.Namespace, pod.Name) + return err + } + _, err = pool.Exec(context.Background(), queries.DeleteFromPendingQuery, groupName, pod.Namespace) if err != nil { klog.Infof("Error deleting Pod %s/%s from pending queue", pod.Namespace, pod.Name) @@ -242,11 +247,11 @@ func deleteFluxion(fluxID int64) error { } // Assume if there is an error we should try again - // TOOD:(vsoch) How to distinguish between cancel error + // TODO:(vsoch) How to distinguish between cancel error // and possible already cancelled? - response, err := fluxion.Cancel(fluxionCtx, request) + _, err = fluxion.Cancel(fluxionCtx, request) if err != nil { - klog.Errorf("[Fluxnetes] Issue with cancel %s %s", response.Error, err) + klog.Errorf("[Fluxnetes] Issue with cancel %s", err) } return err } diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/job.go b/kubernetes/pkg/fluxnetes/strategy/workers/job.go index e64bb35..a3c74e0 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/job.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/job.go @@ -42,13 +42,11 @@ type JobArgs struct { // If true, we are allowed to ask Fluxion for a reservation Reservation bool `json:"reservation"` - // Nodes return to Kubernetes to bind, and MUST - // have attributes for the Nodes and Podspecs. - // We can eventually have a kubectl command - // to get a job too ;) - Nodes string `json:"nodes"` - FluxJob int64 `json:"jobid"` - PodId string `json:"podid"` + // Nodes return to Kubernetes to bind + Nodes string `json:"nodes"` + + // Comma separated list of names + Names string `json:"names"` } // Work performs the AskFlux action. Cases include: @@ -91,7 +89,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { // Prepare the request to allocate. // Note that reserve will just give an ETA for the future. // We don't want to actually run this job again then, because newer - // jobs could come in and take precendence. It's more an FYI for the + // jobs could come in and take precedence. It's more an FYI for the // user when we expose some kubectl tool. request := &pb.MatchRequest{ Podspec: jobspec, @@ -117,10 +115,18 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { return err } + // Not reserved AND not allocated indicates not possible + if !response.Reserved && !response.Allocated { + errorMessage := fmt.Sprintf("Fluxion could not allocate nodes for %s, likely Unsatisfiable", job.Args.GroupName) + klog.Info(errorMessage) + return river.JobCancel(fmt.Errorf(errorMessage)) + } + // Flux job identifier (known to fluxion) fluxID := response.GetFluxID() // If it's reserved, we need to add the id to our reservation table + // TODO need to clean up this table... if response.Reserved { rRows, err := pool.Query(fluxionCtx, queries.AddReservationQuery, job.Args.GroupName, fluxID) if err != nil { @@ -129,12 +135,6 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { defer rRows.Close() } - // Not reserved AND not allocated indicates not possible - if !response.Reserved && !response.Allocated { - errorMessage := fmt.Sprintf("Fluxion could not allocate nodes for %s, likely Unsatisfiable", job.Args.GroupName) - klog.Info(errorMessage) - return river.JobCancel(fmt.Errorf(errorMessage)) - } // This means we didn't get an allocation - we might have a reservation (to do // something with later) but for now we just print it. if !response.Allocated { @@ -148,19 +148,30 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { // Get the nodelist and serialize into list of strings for job args nodelist := response.GetNodelist() + + // We assume that each node gets N tasks nodes := []string{} for _, node := range nodelist { - nodes = append(nodes, node.NodeID) + for i := 0; i < int(node.Tasks); i++ { + nodes = append(nodes, node.NodeID) + } } nodeStr := strings.Join(nodes, ",") + // Update nodes for the job rows, err := pool.Query(fluxionCtx, queries.UpdateNodesQuery, nodeStr, job.ID) if err != nil { return err } defer rows.Close() - // Kick off a cleaning job for when everyting should be cancelled, but only if + // Add the job id to pending (for later cleanup) + _, err = pool.Exec(fluxionCtx, queries.UpdatingPendingWithFluxID, fluxID, job.Args.GroupName, job.Args.Namespace) + if err != nil { + return err + } + + // Kick off a cleaning job for when everything should be cancelled, but only if // there is a deadline set. We can't set a deadline for services, etc. // This is here instead of responding to deletion / termination since a job might // run longer than the duration it is allowed. @@ -171,6 +182,6 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } } klog.Infof("[JOB-WORKER-COMPLETE] nodes allocated %s for group %s (flux job id %d)\n", - nodeStr, job.Args.GroupName, job.Args.FluxJob) + nodeStr, job.Args.GroupName, fluxID) return nil } diff --git a/kubernetes/pkg/scheduler/schedule_one.go b/kubernetes/pkg/scheduler/schedule_one.go index b134b48..4d56550 100644 --- a/kubernetes/pkg/scheduler/schedule_one.go +++ b/kubernetes/pkg/scheduler/schedule_one.go @@ -80,7 +80,6 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { } pod := podInfo.Pod - //assumedPodInfo := podInfo.DeepCopy() // TODO(knelasevero): Remove duplicated keys from log entry calls // When contextualized logging hits GA @@ -104,7 +103,6 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { // Add the pod to the provisional queue klog.Infof("Running enqueue for pod %s/%s", pod.Namespace, pod.Name) - // start := time.Now() enqueueStatus, err := sched.Queue.Enqueue(pod) if err != nil { klog.Infof("Enqueue for pod %s/%s was NOT successful: %s", pod.Namespace, pod.Name, err) diff --git a/kubernetes/pkg/scheduler/scheduler.go b/kubernetes/pkg/scheduler/scheduler.go index 9d1fc08..34092d0 100644 --- a/kubernetes/pkg/scheduler/scheduler.go +++ b/kubernetes/pkg/scheduler/scheduler.go @@ -466,6 +466,11 @@ func (sched *Scheduler) Run(ctx context.Context) { logger.Error(fmt.Errorf("Missing plugin"), "Cannot find fluxnetes plugin") } + // Alert the user to profiles active + for profileName, _ := range sched.Profiles { + klog.Infof("Found active profile for %s", profileName) + } + // This is the only added line to start our queue logger.Info("[FLUXNETES]", "Starting", "queue") queue, err := fluxnetes.NewQueue(ctx, fwk) @@ -479,7 +484,7 @@ func (sched *Scheduler) Run(ctx context.Context) { defer sched.Queue.Pool.Close() // Get and run the informer (update, delete pod events) - //go sched.Queue.GetInformer().Run(ctx.Done()) + go sched.Queue.GetInformer().Run(ctx.Done()) // Make an empty state for now, just for functions state := framework.NewCycleState() @@ -516,6 +521,7 @@ func (sched *Scheduler) Run(ctx context.Context) { continue } nodes := args.GetNodes() + podNames := args.GetPodNames() // A cancel means we cannot satisfy, handle the failure if event.Job.State == "cancelled" { @@ -545,37 +551,42 @@ func (sched *Scheduler) Run(ctx context.Context) { // podspec, and I need to think of how to do that. TBA if len(nodes) > 0 { + // This should not happen! + if len(nodes) != len(podNames) { + klog.Infof("WARNING: number of pods (tasks) does not match nodes, found %d and %d\n", len(nodes), len(podNames)) + } podsToActivate := framework.NewPodsToActivate() - klog.Infof("Got job with state %s and nodes: %s\n", event.Job.State, nodes) - var pod v1.Pod - err := json.Unmarshal([]byte(args.PodSpec), &pod) - if err != nil { - klog.Errorf("Podspec unmarshall error", err) - } - fwk, _ := sched.frameworkForPod(&pod) - - // Parse the pod into PodInfo - // TODO add back in creation timestamp - podInfo, _ := framework.NewPodInfo(&pod) - queuedInfo := &framework.QueuedPodInfo{ - PodInfo: podInfo, - Timestamp: time.Now(), - } - - // This is temporary because we need to still run the scheduling plugins that are in-tree (core) - // However - we aren't going to use the scheduleResult from here, we will derive our own! - // We eventually want to run this in the function above and provide the same volume, etc. - // information to fluxnetes (fluxion) to take into account. - schedulingCycleCtx, cancel := context.WithCancel(ctx) - defer cancel() - _, queuedInfo, _ = sched.schedulingCycle(schedulingCycleCtx, state, fwk, queuedInfo, start, podsToActivate) - // We need to run a bind for each pod and node - for _, node := range nodes { + for i, node := range nodes { plan := ScheduleResult{SuggestedHost: node} + podName := podNames[i] + + // This is the original podspec associated with the name + // TODO why would we not be able to retrieve it? And what to do to act on it? + bindingPod, err := sched.Queue.GetPodSpec(args.Namespace, podName, args.GroupName) + if err != nil { + klog.Errorf("Getting original podspec", err) + } + fwk, _ := sched.frameworkForPod(bindingPod) + + // Parse the pod into PodInfo + // TODO add back in creation timestamp + podInfo, _ := framework.NewPodInfo(bindingPod) + queuedInfo := &framework.QueuedPodInfo{ + PodInfo: podInfo, + Timestamp: time.Now(), + } + + // We need to run this here only to populate with PreBind info. + // We eventually need to consolidate this with the first time, meaning running it above, + // and saving that output to use later (down here) + schedulingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + _, queuedInfo, _ = sched.schedulingCycle(schedulingCycleCtx, state, fwk, queuedInfo, start, podsToActivate) + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) @@ -593,7 +604,6 @@ func (sched *Scheduler) Run(ctx context.Context) { } // assumedPodInfo.Pod should be the Podinfo "QueuedPodInfo" - klog.Infof("Pod %s", pod) } else { klog.Infof("Got job with state %s\n", event.Job.State) }