Skip to content

Commit

Permalink
Revert "chore: Make async tracker importer use scheduler (#15597)" (#…
Browse files Browse the repository at this point in the history
…15682)

This reverts commit 5cd719c.
  • Loading branch information
enricocolasante authored Nov 14, 2023
1 parent 5cd719c commit bc2befc
Show file tree
Hide file tree
Showing 119 changed files with 1,843 additions and 1,343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ public boolean isUsingNotifications() {
|| this == DATAVALUE_IMPORT
|| this == COMPLETE_DATA_SET_REGISTRATION_IMPORT
|| this == METADATA_IMPORT
|| this == TRACKER_IMPORT_JOB
|| this == GEOJSON_IMPORT;
}

Expand All @@ -232,7 +231,7 @@ public boolean isUsingErrorNotification() {
* the ready jobs per type is attempted to start in a single loop cycle
*/
public boolean isUsingContinuousExecution() {
return this == METADATA_IMPORT || this == TRACKER_IMPORT_JOB;
return this == METADATA_IMPORT;
}

public boolean hasJobParameters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import static java.lang.Math.max;
import static java.util.stream.Collectors.toSet;
import static org.springframework.transaction.annotation.Propagation.REQUIRES_NEW;

import java.util.List;
import java.util.Set;
Expand All @@ -46,7 +45,6 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

/**
* @author Jan Bernitt
Expand Down Expand Up @@ -244,7 +242,6 @@ public Stream<JobConfiguration> getDueJobConfigurations(boolean includeWaiting)
}

@Override
@Transactional(propagation = REQUIRES_NEW)
public boolean tryExecuteNow(@Nonnull String jobId) {
// language=SQL
String sql =
Expand Down
8 changes: 8 additions & 0 deletions dhis-2/dhis-services/dhis-service-tracker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
<groupId>org.hisp.dhis.rules</groupId>
<artifactId>rule-engine</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
Expand All @@ -85,6 +89,10 @@
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-core</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,35 @@
*/
package org.hisp.dhis.tracker.imports;

import static org.hisp.dhis.tracker.imports.report.TimingsStats.COMMIT_OPS;
import static org.hisp.dhis.tracker.imports.report.TimingsStats.PREHEAT_OPS;
import static org.hisp.dhis.tracker.imports.report.TimingsStats.PREPROCESS_OPS;
import static org.hisp.dhis.tracker.imports.report.TimingsStats.PROGRAMRULE_OPS;
import static org.hisp.dhis.tracker.imports.report.TimingsStats.TOTAL_OPS;
import static org.hisp.dhis.tracker.imports.report.TimingsStats.VALIDATE_PROGRAMRULE_OPS;
import static org.hisp.dhis.tracker.imports.report.TimingsStats.VALIDATION_OPS;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hisp.dhis.scheduling.JobProgress;
import org.hisp.dhis.system.notification.NotificationLevel;
import org.hisp.dhis.system.notification.Notifier;
import org.hisp.dhis.tracker.TrackerType;
import org.hisp.dhis.tracker.imports.bundle.TrackerBundle;
import org.hisp.dhis.tracker.imports.bundle.TrackerBundleService;
import org.hisp.dhis.tracker.imports.domain.TrackerObjects;
import org.hisp.dhis.tracker.imports.job.TrackerSideEffectDataBundle;
import org.hisp.dhis.tracker.imports.preprocess.TrackerPreprocessService;
import org.hisp.dhis.tracker.imports.report.ImportReport;
import org.hisp.dhis.tracker.imports.report.PersistenceReport;
import org.hisp.dhis.tracker.imports.report.Status;
import org.hisp.dhis.tracker.imports.report.TimingsStats;
import org.hisp.dhis.tracker.imports.report.TrackerTypeReport;
import org.hisp.dhis.tracker.imports.report.ValidationReport;
import org.hisp.dhis.tracker.imports.validation.ValidationResult;
Expand All @@ -68,78 +78,144 @@ public class DefaultTrackerImportService implements TrackerImportService {

@Nonnull private final TrackerUserService trackerUserService;

/* Import is not meant to be annotated with @Transactional.
* PreHeat and Commit phases are separated transactions, other
* phases do not need to be in a transaction. */
@Nonnull private final Notifier notifier;

@Override
public ImportReport importTracker(
TrackerImportParams params, TrackerObjects trackerObjects, JobProgress jobProgress) {
public ImportReport importTracker(TrackerImportParams params) {
User user = trackerUserService.getUser(params.getUserId());
params.setUser(user);

jobProgress.startingStage("Running PreHeat");
TrackerBundle trackerBundle =
jobProgress.runStage(() -> trackerBundleService.create(params, trackerObjects, user));
TimingsStats opsTimer = new TimingsStats();

jobProgress.startingStage("Calculating Payload Size");
Map<TrackerType, Integer> bundleSize =
jobProgress.runStage(() -> calculatePayloadSize(trackerBundle));
startImport(params);

jobProgress.startingStage("Running PreProcess");
jobProgress.runStage(() -> trackerPreprocessService.preprocess(trackerBundle));
try {
TrackerBundle trackerBundle = preHeat(params, opsTimer);

jobProgress.startingStage("Running Validation");
ValidationResult validationResult = jobProgress.runStage(() -> validateBundle(trackerBundle));
Map<TrackerType, Integer> bundleSize = calculatePayloadSize(trackerBundle);

ValidationReport validationReport = ValidationReport.fromResult(validationResult);
preProcess(opsTimer, trackerBundle);

if (!trackerBundle.isSkipRuleEngine() && !params.getImportStrategy().isDelete()) {
jobProgress.startingStage("Running Rule Engine");
jobProgress.runStage(() -> trackerBundleService.runRuleEngine(trackerBundle));

jobProgress.startingStage("Running Rule Engine Validation");
ValidationResult result =
jobProgress.runStage(() -> validationService.validateRuleEngine(trackerBundle));
trackerBundle.setTrackedEntities(result.getTrackedEntities());
trackerBundle.setEnrollments(result.getEnrollments());
trackerBundle.setEvents(result.getEvents());
trackerBundle.setRelationships(result.getRelationships());

validationReport = ValidationReport.merge(validationResult, result);
}
ValidationReport validationReport = validate(params, opsTimer, trackerBundle);

if (exitOnError(validationReport, params)) {
return buildReportAndNotify(params, validationReport, opsTimer, bundleSize);
}

PersistenceReport persistenceReport = commit(params, opsTimer, trackerBundle);

postCommit(trackerBundle);

ImportReport importReport =
ImportReport.withImportCompleted(
Status.OK, persistenceReport, validationReport, opsTimer.stopTimer(), bundleSize);

if (exitOnError(validationReport, params)) {
return ImportReport.withValidationErrors(
validationReport, bundleSize.values().stream().mapToInt(Integer::intValue).sum());
endImport(params, importReport);

return importReport;
} catch (Exception e) {
log.error("Exception thrown during import.", e);

ImportReport report =
ImportReport.withError(
"Exception:" + e.getMessage(), ValidationReport.emptyReport(), opsTimer.stopTimer());

endImportWithError(params, report, e);

return report;
}
}

private TrackerBundle preHeat(TrackerImportParams params, TimingsStats opsTimer) {
TrackerBundle trackerBundle = opsTimer.exec(PREHEAT_OPS, () -> preheatBundle(params));

notifyOps(params, PREHEAT_OPS, opsTimer);

jobProgress.startingStage("Commit Transaction");
PersistenceReport persistenceReport = jobProgress.runStage(() -> commit(params, trackerBundle));
return trackerBundle;
}

private void preProcess(TimingsStats opsTimer, TrackerBundle trackerBundle) {
opsTimer.execVoid(PREPROCESS_OPS, () -> preProcessBundle(trackerBundle));
}

private ValidationReport validate(
TrackerImportParams params, TimingsStats opsTimer, TrackerBundle trackerBundle) {
ValidationResult validationResult =
opsTimer.exec(VALIDATION_OPS, () -> validateBundle(params, trackerBundle, opsTimer));

jobProgress.startingStage("PostCommit");
jobProgress.runStage(() -> trackerBundleService.postCommit(trackerBundle));
if (!trackerBundle.isSkipRuleEngine() && !params.getImportStrategy().isDelete()) {
ValidationResult ruleEnginevalidationResult = execRuleEngine(params, opsTimer, trackerBundle);

return ValidationReport.merge(validationResult, ruleEnginevalidationResult);
}

return ImportReport.withImportCompleted(
Status.OK, persistenceReport, validationReport, bundleSize);
return ValidationReport.fromResult(validationResult);
}

private PersistenceReport commit(TrackerImportParams params, TrackerBundle trackerBundle) {
private PersistenceReport commit(
TrackerImportParams params, TimingsStats opsTimer, TrackerBundle trackerBundle) {
PersistenceReport persistenceReport;
if (TrackerImportStrategy.DELETE == params.getImportStrategy()) {
return deleteBundle(trackerBundle);
persistenceReport = opsTimer.exec(COMMIT_OPS, () -> deleteBundle(trackerBundle));
} else {
return commitBundle(trackerBundle);
persistenceReport = opsTimer.exec(COMMIT_OPS, () -> commitBundle(trackerBundle));
}

notifyOps(params, COMMIT_OPS, opsTimer);
return persistenceReport;
}

private void postCommit(TrackerBundle trackerBundle) {
trackerBundleService.postCommit(trackerBundle);
}

protected ValidationResult validateBundle(TrackerBundle bundle) {
protected ValidationResult validateBundle(
TrackerImportParams params, TrackerBundle bundle, TimingsStats opsTimer) {
ValidationResult result = validationService.validate(bundle);
bundle.setTrackedEntities(result.getTrackedEntities());
bundle.setEnrollments(result.getEnrollments());
bundle.setEvents(result.getEvents());
bundle.setRelationships(result.getRelationships());

notifyOps(params, VALIDATION_OPS, opsTimer);

return result;
}

private ValidationResult execRuleEngine(
TrackerImportParams params, TimingsStats opsTimer, TrackerBundle bundle) {
opsTimer.execVoid(PROGRAMRULE_OPS, () -> trackerBundleService.runRuleEngine(bundle));

notifyOps(params, PROGRAMRULE_OPS, opsTimer);

ValidationResult result =
opsTimer.exec(VALIDATE_PROGRAMRULE_OPS, () -> validationService.validateRuleEngine(bundle));
bundle.setTrackedEntities(result.getTrackedEntities());
bundle.setEnrollments(result.getEnrollments());
bundle.setEvents(result.getEvents());
bundle.setRelationships(result.getRelationships());

notifyOps(params, VALIDATE_PROGRAMRULE_OPS, opsTimer);

return result;
}

private ImportReport buildReportAndNotify(
TrackerImportParams params,
ValidationReport validationReport,
TimingsStats opsTimer,
Map<TrackerType, Integer> bundleSize) {
ImportReport importReport =
ImportReport.withValidationErrors(
validationReport,
opsTimer.stopTimer(),
bundleSize.values().stream().mapToInt(Integer::intValue).sum());

endImport(params, importReport);

return importReport;
}

private boolean exitOnError(ValidationReport validationReport, TrackerImportParams params) {
return validationReport.hasErrors() && params.getAtomicMode() == AtomicMode.ALL;
}
Expand All @@ -152,6 +228,14 @@ private Map<TrackerType, Integer> calculatePayloadSize(TrackerBundle bundle) {
TrackerType.RELATIONSHIP, bundle.getRelationships().size());
}

protected TrackerBundle preheatBundle(TrackerImportParams params) {
return trackerBundleService.create(params);
}

protected void preProcessBundle(TrackerBundle bundle) {
trackerPreprocessService.preprocess(bundle);
}

protected PersistenceReport commitBundle(TrackerBundle trackerBundle) {
PersistenceReport persistenceReport = trackerBundleService.commit(trackerBundle);

Expand All @@ -160,7 +244,7 @@ protected PersistenceReport commitBundle(TrackerBundle trackerBundle) {
Stream.of(TrackerType.ENROLLMENT, TrackerType.EVENT)
.map(trackerType -> safelyGetSideEffectsDataBundles(persistenceReport, trackerType))
.flatMap(Collection::stream)
.toList();
.collect(Collectors.toList());

trackerBundleService.handleTrackerSideEffects(sideEffectDataBundles);
}
Expand All @@ -181,6 +265,48 @@ protected PersistenceReport deleteBundle(TrackerBundle trackerBundle) {
return trackerBundleService.delete(trackerBundle);
}

private void startImport(TrackerImportParams params) {
if (null != params.getJobConfiguration()) {
notifier.notify(params.getJobConfiguration(), params.userStartInfo() + " Import:Start");
}
}

private void notifyOps(TrackerImportParams params, String validationOps, TimingsStats opsTimer) {
if (null != params.getJobConfiguration()) {
notifier.update(
params.getJobConfiguration(),
NotificationLevel.DEBUG,
params
+ validationOps
+ " completed in "
+ opsTimer.get(validationOps)
+ " Import:"
+ validationOps);
}
}

private void endImport(TrackerImportParams params, ImportReport importReport) {
if (null != params.getJobConfiguration()) {
notifier.update(
params.getJobConfiguration(),
params + " finished in " + importReport.getTimingsStats().get(TOTAL_OPS) + " Import:Done",
true);

notifier.addJobSummary(params.getJobConfiguration(), importReport, ImportReport.class);
}
}

private void endImportWithError(
TrackerImportParams params, ImportReport importReport, Exception e) {
notifier.update(
params.getJobConfiguration(),
NotificationLevel.ERROR,
params + " failed with exception: " + e.getMessage() + " Import:Error",
true);

notifier.addJobSummary(params.getJobConfiguration(), importReport, ImportReport.class);
}

/**
* Clone the TrackerImportReport and filters out validation data based on the provided {@link
* PersistenceReport}.
Expand All @@ -202,10 +328,11 @@ public ImportReport buildImportReport(
if (originalValidationReport != null) {
validationReport.addErrors(originalValidationReport.getErrors());
}
if (originalValidationReport != null
&& (TrackerBundleReportMode.WARNINGS == reportMode
|| TrackerBundleReportMode.FULL == reportMode)) {
if (originalValidationReport != null && TrackerBundleReportMode.WARNINGS == reportMode) {
validationReport.addWarnings(originalValidationReport.getWarnings());
} else if (originalValidationReport != null && TrackerBundleReportMode.FULL == reportMode) {
validationReport.addWarnings(originalValidationReport.getWarnings());
importReportBuilder.timingsStats(originalImportReport.getTimingsStats());
}

importReportBuilder.validationReport(validationReport);
Expand Down
Loading

0 comments on commit bc2befc

Please sign in to comment.