Skip to content

Commit

Permalink
feat: job scheduler - run jobs continuously [DHIS2-16004] (#15466)
Browse files Browse the repository at this point in the history
* feat: job scheduler - run jobs continuously [DHIS2-16004]

* fix: don't restrict list returned by service to 1 entry per type [DHIS2-16004]
  • Loading branch information
jbee authored Oct 25, 2023
1 parent 027e318 commit b565b84
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,11 @@ String create(JobConfiguration config, MimeType contentType, InputStream content
* Get all job configurations that should start within the next n seconds.
*
* @param dueInNextSeconds number of seconds from now the job should start
* @param limitToNext1 true, to only return a single config per {@link JobType}, false to return
* all due jobs
* @param includeWaiting true to also list jobs that cannot run because another job of the same
* type is already running
* @return only jobs that should start soon within the given number of seconds
*/
List<JobConfiguration> getDueJobConfigurations(
int dueInNextSeconds, boolean limitToNext1, boolean includeWaiting);
List<JobConfiguration> getDueJobConfigurations(int dueInNextSeconds, boolean includeWaiting);

/**
* Finds stale jobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ static Defaults dailyRandomBetween3and5(String uid, String name) {
this.defaults = defaults;
}

/**
* @return true, if {@link JobProgress} events should be forwarded to the {@link
* org.eclipse.emf.common.notify.Notifier} API, otherwise false
*/
public boolean isUsingNotifications() {
return this == RESOURCE_TABLE
|| this == SEND_SCHEDULED_MESSAGE
Expand All @@ -204,6 +208,10 @@ public boolean isUsingNotifications() {
|| this == GEOJSON_IMPORT;
}

/**
* @return true, when an error notification should be sent by email in case the job execution
* fails, otherwise false
*/
public boolean isUsingErrorNotification() {
return this == ANALYTICS_TABLE
|| this == VALIDATION_RESULTS_NOTIFICATION
Expand All @@ -216,6 +224,15 @@ public boolean isUsingErrorNotification() {
|| this == METADATA_IMPORT;
}

/**
* @return true, if jobs of this type should try to run as soon as possible by having job
* scheduler workers execute all known ready jobs of the type, when false only the oldest of
* the ready jobs per type is attempted to start in a single loop cycle
*/
public boolean isUsingContinuousExecution() {
return this == METADATA_IMPORT;
}

public boolean hasJobParameters() {
return jobParameters != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -234,7 +233,7 @@ public List<JobConfiguration> getJobConfigurations(JobType type) {
@Override
@Transactional(readOnly = true)
public List<JobConfiguration> getDueJobConfigurations(
int dueInNextSeconds, boolean limitToNext1, boolean includeWaiting) {
int dueInNextSeconds, boolean includeWaiting) {
Instant now = Instant.now();
Instant endOfWindow = now.plusSeconds(dueInNextSeconds);
Duration maxCronDelay =
Expand All @@ -243,16 +242,7 @@ public List<JobConfiguration> getDueJobConfigurations(
jobConfigurationStore
.getDueJobConfigurations(includeWaiting)
.filter(c -> c.isDueBetween(now, endOfWindow, maxCronDelay));
if (!limitToNext1) return dueJobs.toList();
Set<JobType> types = EnumSet.noneOf(JobType.class);
return dueJobs
.filter(
config -> {
if (types.contains(config.getJobType())) return false;
types.add(config.getJobType());
return true;
})
.toList();
return dueJobs.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void assureAsLeader(int ttlSeconds) {
@Override
@Transactional(readOnly = true)
public List<JobConfiguration> getDueJobConfigurations(int dueInNextSeconds) {
return jobConfigurationService.getDueJobConfigurations(dueInNextSeconds, true, false);
return jobConfigurationService.getDueJobConfigurations(dueInNextSeconds, false);
}

@Override
Expand All @@ -121,6 +121,13 @@ public JobConfiguration getNextInQueue(String queue, int fromPosition) {
return jobConfigurationStore.getNextInQueue(queue, fromPosition);
}

@CheckForNull
@Override
@Transactional(readOnly = true)
public JobConfiguration getJobConfiguration(String jobId) {
return jobConfigurationStore.getByUid(jobId);
}

@Override
@Transactional
public boolean tryRun(@Nonnull String jobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -92,6 +88,7 @@ public class JobScheduler implements Runnable, JobRunner {
private final JobSchedulerLoopService service;
private final SystemSettingManager systemSettings;
private final ExecutorService workers = Executors.newCachedThreadPool();
private final Map<JobType, Queue<String>> continuousJobsByType = new ConcurrentHashMap<>();

public void start() {
long loopTimeMs = LOOP_SECONDS * 1000L;
Expand Down Expand Up @@ -121,21 +118,67 @@ public void run() {
service.getDueJobConfigurations(LOOP_SECONDS).stream()
.collect(groupingBy(JobConfiguration::getJobType));
// only attempt to start one per type per loop invocation
readyByType.forEach((type, jobs) -> runIfDue(now, jobs.get(0)));
readyByType.forEach((type, jobs) -> runIfDue(now, type, jobs));
}
} catch (Exception ex) {
log.error("Exceptions thrown in scheduler loop", ex);
// this needs to be caught otherwise the scheduling would end
}
}

private void runIfDue(Instant now, JobType type, List<JobConfiguration> jobs) {
if (!type.isUsingContinuousExecution()) {
runIfDue(now, jobs.get(0));
return;
}
Queue<String> jobIds =
continuousJobsByType.computeIfAbsent(type, key -> new ConcurrentLinkedQueue<>());
// add a worker either if no worker is on it (empty new queue) or if there are many jobs
boolean spawnWorker = jobIds.isEmpty();
// add those IDs to the queue that are not yet in it
jobs.stream()
.map(JobConfiguration::getUid)
.filter(jobId -> !jobIds.contains(jobId))
.forEach(jobIds::add);
if (spawnWorker) {
// we want to prevent starting more than one worker per job type
// but if this does happen it is no issue as both will be pulling
// from the same queue
workers.submit(() -> runContinuous(type));
}
}

private void runContinuous(JobType type) {
try {
Queue<String> jobIds = continuousJobsByType.get(type);
String jobId = jobIds.poll();
while (jobId != null) {
JobConfiguration config = service.getJobConfiguration(jobId);
if (config != null && config.getJobStatus() == JobStatus.SCHEDULED) {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
Instant dueTime = dueTime(now, config);
runDueJob(config, dueTime);
}
jobId = jobIds.poll();
}
} finally {
// need to be done so that we never have a queue without a worker by accident
continuousJobsByType.remove(type);
}
}

private void runIfDue(Instant now, JobConfiguration config) {
Instant dueTime = dueTime(now, config);
if (dueTime != null) {
workers.submit(() -> runDueJob(config, dueTime));
}
}

private Instant dueTime(Instant now, JobConfiguration config) {
Duration maxCronDelay =
Duration.ofHours(systemSettings.getIntSetting(SettingKey.JOBS_MAX_CRON_DELAY_HOURS));
Instant dueTime = config.nextExecutionTime(now, maxCronDelay);
if (dueTime != null && !dueTime.isAfter(now)) {
workers.submit(() -> runDueJob(config, dueTime));
}
return dueTime != null && !dueTime.isAfter(now) ? dueTime : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public interface JobSchedulerLoopService {
*/
List<JobConfiguration> getDueJobConfigurations(int dueInNextSeconds);

@CheckForNull
JobConfiguration getJobConfiguration(String jobId);

@CheckForNull
JobConfiguration getNextInQueue(String queue, int fromPosition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class JobConfigurationController extends AbstractCrudController<JobConfig
public List<JobConfiguration> getDueJobConfigurations(
@RequestParam int seconds,
@RequestParam(required = false, defaultValue = "false") boolean includeWaiting) {
return jobConfigurationService.getDueJobConfigurations(seconds, false, includeWaiting);
return jobConfigurationService.getDueJobConfigurations(seconds, includeWaiting);
}

@GetMapping("/stale")
Expand Down

0 comments on commit b565b84

Please sign in to comment.