Skip to content

Commit

Permalink
Improved JobWithTimeOut()
Browse files Browse the repository at this point in the history
  • Loading branch information
dgruber committed Oct 11, 2023
1 parent 285f6e0 commit addfdb6
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 62 deletions.
129 changes: 69 additions & 60 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"runtime"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -98,7 +99,8 @@ func (j *Job) State() drmaa2interface.JobState {
j.begin(j.ctx, "State()")
task := j.lastJob()
// drmaa1 dictates caching
if task != nil && task.waitForEndStateCollectedJobInfo && task.jobinfoError == nil {
if task != nil && task.waitForEndStateCollectedJobInfo &&
task.jobinfoError == nil {
return task.jobinfo.State
}
job, jobArray, err := j.jobCheck()
Expand Down Expand Up @@ -235,7 +237,8 @@ func (j *Job) RunT(t drmaa2interface.JobTemplate) *Job {
// of commands at any given time. If set to 1 it forces sequential execution.
// If not required it should be set to the total amount of tasks specified.
func (j *Job) RunArray(begin, end, step, maxParallel int, cmd string, args ...string) *Job {
j.begin(j.ctx, fmt.Sprintf("RunArray(%d, %d, %d, %d, %s, %v)", begin, end, step, maxParallel, cmd, args))
j.begin(j.ctx, fmt.Sprintf("RunArray(%d, %d, %d, %d, %s, %v)",
begin, end, step, maxParallel, cmd, args))
if err := j.checkCtx(); err != nil {
j.lastError = err
return j
Expand All @@ -245,29 +248,33 @@ func (j *Job) RunArray(begin, end, step, maxParallel int, cmd string, args ...st
j.lastError = err
jobTemplate, copyErr := copystructure.Copy(jt)
if copyErr != nil {
j.tasklist = append(j.tasklist, &task{jobArray: job, isJobArray: true, submitError: err,
template: jobTemplate.(drmaa2interface.JobTemplate)})
j.tasklist = append(j.tasklist, &task{jobArray: job, isJobArray: true,
submitError: err,
template: jobTemplate.(drmaa2interface.JobTemplate)})
j.errorf(j.ctx, "could not copy job template: %v", copyErr)
return j
}
j.tasklist = append(j.tasklist, &task{jobArray: job, isJobArray: true, submitError: err,
template: jobTemplate.(drmaa2interface.JobTemplate)})
j.tasklist = append(j.tasklist, &task{jobArray: job, isJobArray: true,
submitError: err,
template: jobTemplate.(drmaa2interface.JobTemplate)})
return j
}

// RunArrayT executes the job defined in a JobTemplate multiple times. See also
// RunArray().
func (j *Job) RunArrayT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job {
j.begin(j.ctx, fmt.Sprintf("RunArrayT(%d, %d, %d, %d, %v)", begin, end, step, maxParallel, jt))
j.begin(j.ctx, fmt.Sprintf("RunArrayT(%d, %d, %d, %d, %v)",
begin, end, step, maxParallel, jt))
if err := j.checkCtx(); err != nil {
j.lastError = err
return j
}
job, err := j.wfl.js.RunBulkJobs(jt, begin, end, step, maxParallel)
j.lastError = err
jobTemplate, _ := copystructure.Copy(jt)
j.tasklist = append(j.tasklist, &task{jobArray: job, isJobArray: true, submitError: err,
template: jobTemplate.(drmaa2interface.JobTemplate)})
j.tasklist = append(j.tasklist, &task{jobArray: job, isJobArray: true,
submitError: err,
template: jobTemplate.(drmaa2interface.JobTemplate)})
return j
}

Expand Down Expand Up @@ -565,80 +572,78 @@ func (j *Job) After(d time.Duration) *Job {
return j
}

func wait(task *task) {
func wait(task *task, timeout time.Duration) error {
if task.terminated {
return
return nil
}
if task.job == nil {
if task.jobArray == nil {
return
return nil
}
task.terminationError = waitArrayJobTerminated(task.jobArray)
task.terminationError = waitArrayJobTerminated(task.jobArray, timeout)
task.terminated = true
// TODO cache job info
return
if task.terminationError != nil &&
strings.Contains(task.terminationError.Error(), "timeout") {
return errors.New("timeout")
}
return nil
}
task.terminationError = task.job.WaitTerminated(timeout)
state := task.job.GetState()
if state == drmaa2interface.Done ||
state == drmaa2interface.Failed {
task.terminated = true
// cache the jobinfo
task.jobinfo, task.jobinfoError = task.job.GetJobInfo()
task.waitForEndStateCollectedJobInfo = true
return nil
}
task.terminationError = task.job.WaitTerminated(drmaa2interface.InfiniteTime)
task.terminated = true
// cache the jobinfo
task.jobinfo, task.jobinfoError = task.job.GetJobInfo()
task.waitForEndStateCollectedJobInfo = true
return errors.New("timeout")
}

// Wait until the most recent task is finished. In case of a job array it waits
// for all tasks of the array.
func (j *Job) Wait() *Job {
j.infof(j.ctx, "Wait()")
// WaitWithTimeout waits until the most recent task is finished. In case of a
// job array it waits for all tasks of the array. It returns either when the
// task is finished or the timeout is reached. In case of an timeout an error is
// set which can be retrieved with LastError().
func (j *Job) WaitWithTimeout(timeout time.Duration) *Job {
j.infof(j.ctx, "WaitWithTimeout()")
j.lastError = nil
if task := j.lastJob(); task != nil {
if task.job != nil {
j.infof(j.ctx, fmt.Sprintf("Wait() for job %s", task.job.GetID()))
j.infof(j.ctx, fmt.Sprintf("WaitWithTimeout() for job %s",
task.job.GetID()))
} else if task.jobArray != nil {
j.infof(j.ctx, fmt.Sprintf("Wait() for job array %s",
j.infof(j.ctx, fmt.Sprintf("WaitWithTimeout() for job array %s",
task.jobArray.GetID()))
}
// check if we waited already (drmaa1 allows only one API call for job info)
if task.waitForEndStateCollectedJobInfo {
return j
}
wait(task)
err := wait(task, timeout)
if err != nil {
j.errorf(
j.ctx,
"WaitWithTimeout() has timed out",
)
j.lastError = err
}
} else {
j.errorf(
j.ctx,
"Wait() has no task to wait for",
"WaitForTimeout() has no task to wait for",
)
j.lastError = errors.New("task not available")
}
return j
}

// WaitWithTimeout waits until the most recent task is finished. In case of a
// job array it waits for all tasks of the array. It returns either when the
// task is finished or the timeout is reached. In case of an timeout an error is
// set which can be retrieved with LastError().
func (j *Job) WaitWithTimeout(timeout time.Duration) *Job {
j.infof(j.ctx, "WaitWithTimeout()")
j.lastError = nil
finished := make(chan bool)

go func() {
j.Wait()
finished <- true
}()

select {
case <-finished:
// all good
case <-time.After(timeout):
j.errorf(
j.ctx,
"WaitWithTimeout() timeout after %s",
timeout.String(),
)
j.lastError = fmt.Errorf("WaitWithTimeout() timeout after %s",
timeout.String())
}
return j
// Wait until the most recent task is finished. In case of a job array it waits
// for all tasks of the array.
func (j *Job) Wait() *Job {
j.infof(j.ctx, "Wait()")
return j.WaitWithTimeout(drmaa2interface.InfiniteTime)
}

// Retry waits until the last task in chain (not for the previous ones) is finished.
Expand Down Expand Up @@ -666,7 +671,7 @@ func (j *Job) Synchronize() *Job {
continue
}
j.infof(j.ctx, fmt.Sprintf("Synchronize() wait for job %s", task.job.GetID()))
wait(task)
wait(task, drmaa2interface.InfiniteTime)
}
return j
}
Expand All @@ -680,7 +685,7 @@ func (j *Job) ListAllFailed() []drmaa2interface.Job {
if task.job == nil {
continue
}
wait(task)
wait(task, drmaa2interface.InfiniteTime)
if task.job.GetState() == drmaa2interface.Failed {
failed = append(failed, task.job)
}
Expand Down Expand Up @@ -792,17 +797,20 @@ func (j *Job) RetryAnyFailed(amount int) *Job {
j.begin(j.ctx, fmt.Sprintf("RetryAnyFailed(%d)", amount))
for i := 0; i < amount || amount == -1; i++ {
for _, task := range j.tasklist {
wait(task)
wait(task, drmaa2interface.InfiniteTime)
if task.job != nil && task.job.GetState() == drmaa2interface.Failed {
failedJobID := task.job.GetID()
replaceTask(j, task)
j.warningf(j.ctx, "RetryAnyFailed(%d)): Task %s failed. Retry task (%s).",
j.warningf(j.ctx,
"RetryAnyFailed(%d)): Task %s failed. Retry task (%s).",
amount, failedJobID, task.job.GetID())
}
if task.jobArray != nil {
for _, job := range task.jobArray.GetJobs() {
if job.GetState() == drmaa2interface.Failed {
j.warningf(j.ctx, "cannot retry failed job array task %s\n", job.GetID())
j.warningf(j.ctx,
"cannot retry failed job array task %s\n",
job.GetID())
}
}
}
Expand Down Expand Up @@ -1081,7 +1089,8 @@ func (j *Job) OutputError() string {
if j.wfl.ctx.SMType != DefaultSessionManager &&
j.wfl.ctx.SMType != DockerSessionManager &&
j.wfl.ctx.SMType != KubernetesSessionManager {
j.errorf(j.ctx, "OutputError(): not supported for backend %s", j.wfl.ctx.SMType)
j.errorf(j.ctx, "OutputError(): not supported for backend %s",
j.wfl.ctx.SMType)
j.lastError = errors.New("ouput not supported for this backend")
return ""
}
Expand Down
14 changes: 12 additions & 2 deletions job_help.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"text/template"
"time"

"github.com/dgruber/drmaa2interface"
"github.com/dgruber/wfl/pkg/matrix"
Expand Down Expand Up @@ -151,13 +152,22 @@ func jobArrayState(jobArray drmaa2interface.ArrayJob, wait bool) drmaa2interface
return jobArrayState
}

func waitArrayJobTerminated(jobArray drmaa2interface.ArrayJob) error {
// waitArrayJobTerminated waits for all jobs in the array to be terminated
// drmaa2interface.InfiniteTime can be used for waiting forever
func waitArrayJobTerminated(jobArray drmaa2interface.ArrayJob, timeout time.Duration) error {
var lastErr error
start := time.Now()
for _, job := range jobArray.GetJobs() {
err := job.WaitTerminated(drmaa2interface.InfiniteTime)
err := job.WaitTerminated(timeout)
if err != nil {
lastErr = err
}
if timeout != drmaa2interface.InfiniteTime {
timeout = timeout - time.Since(start)
if timeout < 0 {
return fmt.Errorf("timeout reached")
}
}
}
return lastErr
}
Expand Down
9 changes: 9 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,15 @@ var _ = Describe("Job", func() {
Ω(job.Success()).Should(BeTrue())
})

It("should run a bunch of jobs and timeout while waiting", func() {
start := time.Now()
job := flow.RunArrayJob(1, 10, 1, 1, "sleep", "0.1").WaitWithTimeout(time.Millisecond * 200)
stop := time.Now()
Ω(job.LastError()).ShouldNot(BeNil())
Ω(job.LastError().Error()).Should(ContainSubstring("timeout"))
Ω(stop).Should(BeTemporally("<=", start.Add(time.Millisecond*300)))
})

It("should run a bunch of failing jobs", func() {
job := flow.RunArrayJob(1, 10, 1, 5, "/bin/bash", "-c", "exit 77").Wait()
Ω(job.State().String()).Should(Equal(drmaa2interface.Failed.String()))
Expand Down

0 comments on commit addfdb6

Please sign in to comment.