Skip to content

Commit

Permalink
correctly display processing information if bulk v2 has multiple jobs
Browse files Browse the repository at this point in the history
correctly display processing information if bulk v2 has multiple jobs
  • Loading branch information
ashitsalesforce committed Dec 27, 2024
1 parent 20e90a0 commit fd73263
Show file tree
Hide file tree
Showing 16 changed files with 96 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import org.apache.logging.log4j.Logger;
import com.salesforce.dataloader.util.DLLogManager;

import com.salesforce.dataloader.util.LoadRateCalculator;
import com.salesforce.dataloader.action.progress.ILoaderProgress;
import com.salesforce.dataloader.action.visitor.IVisitor;
import com.salesforce.dataloader.config.AppConfig;
Expand Down Expand Up @@ -89,7 +89,9 @@ protected AbstractAction(Controller controller, ILoaderProgress monitor)
this.successWriter = null;
this.errorWriter = null;
}
this.visitor = createVisitor(true);
// Let the IVisitor instance create LoadRateCalculator instance for the first job by passing
// null as the first argument.
this.visitor = createVisitor(null, true);
int retries = -1;
this.enableRetries = controller.getAppConfig().getBoolean(AppConfig.PROP_ENABLE_RETRIES);
if (this.enableRetries) {
Expand All @@ -107,7 +109,7 @@ protected AbstractAction(Controller controller, ILoaderProgress monitor)
protected abstract void checkDao(DataAccessObject dao) throws DataAccessObjectInitializationException;

/** @return a new IVisitor object to be used by this action */
protected abstract IVisitor createVisitor(boolean isFirstJob);
protected abstract IVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob);

/** flushes any remaining records to or from the dao
* @throws BatchSizeLimitException */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.salesforce.dataloader.mapping.LoadMapper;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.DAORowUtil;
import com.salesforce.dataloader.util.LoadRateCalculator;
import com.sforce.ws.ConnectionException;

import java.util.List;
Expand All @@ -58,7 +59,7 @@ protected AbstractLoadAction(Controller controller, ILoaderProgress monitor)
}

@Override
protected abstract DAOLoadVisitor createVisitor(boolean isFirstJob);
protected abstract DAOLoadVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob);

@Override
protected void checkDao(DataAccessObject dao) throws DataAccessObjectInitializationException {
Expand Down Expand Up @@ -97,8 +98,10 @@ protected boolean visit() throws DataAccessObjectException, ParameterLoadExcepti
// retry the same row again
try {
if (this.getConfig().isBulkV2APIEnabled()) {
// create a new visitor for a new job
setVisitor(this.createVisitor(false));
// BulkV2 completed a job.
// Create a new visitor for a new job. However, it should use
// the LoadRateCalculator instance from the current visitor.
setVisitor(this.createVisitor(this.getVisitor().getLoadRateCalculator(), false));
}
successfulVisit = getVisitor().visit(daoRow);
} catch (BatchSizeLimitException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.salesforce.dataloader.action.visitor.bulk.BulkV2QueryVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* Bulk api extract action.
Expand All @@ -47,7 +48,7 @@ public BulkExtractAction(Controller controller, ILoaderProgress monitor)
}

@Override
protected IVisitor createVisitor(boolean isFirstJob) {
protected IVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
if (getController().getAppConfig().isBulkV2APIEnabled() ) {
return new BulkV2QueryVisitor(this, getController(), getMonitor(), getDao(), getSuccessWriter(), getErrorWriter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.mapping.LoadMapper;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Jesper Joergensen, Colin Jarvis
Expand All @@ -49,9 +50,9 @@ public BulkLoadAction(Controller controller, ILoaderProgress monitor)
}

@Override
protected DAOLoadVisitor createVisitor(boolean isFirstJob) {
protected DAOLoadVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
if (this.getConfig().isBulkV2APIEnabled()) {
return new BulkV2LoadVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter(), isFirstJob);
return new BulkV2LoadVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter(), rateCalculator, isFirstJob);
}
return new BulkLoadVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Lexi Viripaeff
Expand All @@ -45,7 +46,7 @@ public DeleteAction(Controller controller, ILoaderProgress monitor) throws DataA
}

@Override
protected DAOLoadVisitor createVisitor(boolean isFirstJob) {
protected DAOLoadVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
if (getController().getAppConfig().isRESTAPIEnabled()
&& getController().getAppConfig().getBoolean(AppConfig.PROP_DELETE_WITH_EXTERNALID)) {
return new RESTDeleteVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.salesforce.dataloader.action.visitor.partner.PartnerInsertVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Lexi Viripaeff
Expand All @@ -43,7 +44,7 @@ public InsertAction(Controller controller, ILoaderProgress monitor) throws DataA
}

@Override
protected DAOLoadVisitor createVisitor(boolean isFirstJob) {
protected DAOLoadVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
return new PartnerInsertVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.salesforce.dataloader.action.visitor.partner.PartnerQueryVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Lexi Viripaeff
Expand All @@ -44,7 +45,7 @@ public PartnerExtractAction(Controller controller, ILoaderProgress monitor)
}

@Override
protected IQueryVisitor createVisitor(boolean isFirstJob) {
protected IQueryVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
return new PartnerQueryVisitor(this, getController(), getMonitor(), getDao(), getSuccessWriter(), getErrorWriter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.salesforce.dataloader.action.visitor.partner.PartnerQueryAllVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* Dataloader action which does a partner api query all operation.
Expand All @@ -46,7 +47,7 @@ public PartnerExtractAllAction(Controller controller, ILoaderProgress monitor)
}

@Override
protected IVisitor createVisitor(boolean isFirstJob) {
protected IVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
return new PartnerQueryAllVisitor(this, getController(), getMonitor(), getDao(), getSuccessWriter(), getErrorWriter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.salesforce.dataloader.action.visitor.partner.PartnerUndeleteVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Lexi Viripaeff
Expand All @@ -43,7 +44,7 @@ public UndeleteAction(Controller controller, ILoaderProgress monitor) throws Dat
}

@Override
protected DAOLoadVisitor createVisitor(boolean isFirstJob) {
protected DAOLoadVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
return new PartnerUndeleteVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.salesforce.dataloader.action.visitor.rest.RESTUpdateVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Lexi Viripaeff
Expand All @@ -42,7 +43,7 @@ public UpdateAction(Controller controller, ILoaderProgress monitor) throws DataA
}

@Override
protected DAOLoadVisitor createVisitor(boolean isFirstJob) {
protected DAOLoadVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
if (getController().getAppConfig().isRESTAPIEnabled()) {
return new RESTUpdateVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.salesforce.dataloader.action.visitor.partner.PartnerUpsertVisitor;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.util.LoadRateCalculator;

/**
* @author Alex Warshavsky
Expand All @@ -43,7 +44,7 @@ public UpsertAction(Controller controller, ILoaderProgress monitor) throws DataA
}

@Override
protected PartnerUpsertVisitor createVisitor(boolean isFirstJob) {
protected PartnerUpsertVisitor createVisitor(LoadRateCalculator rateCalculator, boolean isFirstJob) {
return new PartnerUpsertVisitor(getController(), getMonitor(), getSuccessWriter(), getErrorWriter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,29 @@ public abstract class AbstractVisitor implements IVisitor {
private DataWriter errorWriter;
private long errors;
private long successes;
private final LoadRateCalculator rateCalculator;
private LoadRateCalculator rateCalculator;

public AbstractVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter) {
this(controller, monitor, successWriter, errorWriter, null);
}

public AbstractVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter, LoadRateCalculator rateCalculator) {
this.logger = DLLogManager.getLogger(getClass());
this.controller = controller;
this.monitor = monitor;
this.successWriter = successWriter;
this.errorWriter = errorWriter;
this.rateCalculator = new LoadRateCalculator();
if (rateCalculator == null) {
this.rateCalculator = new LoadRateCalculator();
} else {
this.rateCalculator = rateCalculator;
}
}

public LoadRateCalculator getLoadRateCalculator() {
return this.rateCalculator;
}

protected abstract boolean writeStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.commons.beanutils.*;
import org.apache.commons.text.StringEscapeUtils;
import com.salesforce.dataloader.util.DLLogManager;
import com.salesforce.dataloader.util.LoadRateCalculator;

import org.apache.logging.log4j.Logger;

import java.util.regex.Matcher;
Expand Down Expand Up @@ -89,10 +91,15 @@ public abstract class DAOLoadVisitor extends AbstractVisitor implements DAORowVi
// - https://www.geeksforgeeks.org/how-to-validate-html-tag-using-regular-expression/#
private String richTextRegex = AppConfig.DEFAULT_RICHTEXT_REGEX;
private Field[] cachedFieldAttributesForOperation = null;

protected DAOLoadVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter) {
super(controller, monitor, successWriter, errorWriter);
this(controller, monitor, successWriter, errorWriter, null);
}

protected DAOLoadVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter, LoadRateCalculator rateCalculator) {
super(controller, monitor, successWriter, errorWriter, rateCalculator);

this.columnNames = ((DataReader)controller.getDao()).getColumnNames();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.commons.beanutils.DynaProperty;
import org.apache.logging.log4j.Logger;
import com.salesforce.dataloader.util.DLLogManager;

import com.salesforce.dataloader.util.LoadRateCalculator;
import com.salesforce.dataloader.action.progress.ILoaderProgress;
import com.salesforce.dataloader.action.visitor.DAOLoadVisitor;
import com.salesforce.dataloader.client.DescribeRefObject;
Expand Down Expand Up @@ -79,6 +79,7 @@
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchStateEnum;
import com.sforce.async.CSVReader;
import com.sforce.async.JobStateEnum;

/**
* Visitor for operations using the bulk API client
Expand Down Expand Up @@ -138,7 +139,12 @@ private static class RowResult {

public BulkLoadVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter) {
super(controller, monitor, successWriter, errorWriter);
this(controller, monitor, successWriter, errorWriter, null);
}

public BulkLoadVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter, LoadRateCalculator rateCalculator) {
super(controller, monitor, successWriter, errorWriter, rateCalculator);
this.isDelete = getController().getAppConfig().getOperationInfo().isDelete();
this.jobUtil = new BulkApiVisitorUtil(getController(), getProgressMonitor(), getRateCalculator());
}
Expand Down Expand Up @@ -417,7 +423,9 @@ private void createBatch(ByteArrayOutputStream os, int numRecords) throws AsyncA
}

protected void closeJob() throws OperationException, DataAccessObjectException {
if (this.jobUtil.hasJob()) {
if (this.jobUtil.hasJob()
&& this.jobUtil.getJobInfo().getState() != JobStateEnum.JobComplete
&& this.jobUtil.getJobInfo().getState() != JobStateEnum.Closed) {
try {
this.jobUtil.awaitCompletionAndCloseJob();
} catch (final AsyncApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.salesforce.dataloader.exception.LoadException;
import com.salesforce.dataloader.exception.OperationException;
import com.salesforce.dataloader.util.DLLogManager;
import com.salesforce.dataloader.util.LoadRateCalculator;
import com.sforce.async.AsyncApiException;

public class BulkV2LoadVisitor extends BulkLoadVisitor {
Expand All @@ -52,8 +53,8 @@ public class BulkV2LoadVisitor extends BulkLoadVisitor {
private boolean isFirstJob;

public BulkV2LoadVisitor(Controller controller, ILoaderProgress monitor, DataWriter successWriter,
DataWriter errorWriter, boolean isFirstJob) {
super(controller, monitor, successWriter, errorWriter);
DataWriter errorWriter, LoadRateCalculator rateCalculator, boolean isFirstJob) {
super(controller, monitor, successWriter, errorWriter, rateCalculator);
this.isFirstJob = isFirstJob;
}

Expand Down Expand Up @@ -84,11 +85,13 @@ protected void getResults() throws AsyncApiException, OperationException, DataAc
this.getVisitorUtil().getBulkV2LoadSuccessResults(successWriterFile, !this.isFirstJob);
CSVFileReader csvReader = new CSVFileReader(new File(successWriterFile), appConfig, true, false);
this.setSuccesses(csvReader.getTotalRows());
this.getLoadRateCalculator().setNumSuccessesAcrossCompletedJobs(csvReader.getTotalRows());
csvReader.close();

this.getVisitorUtil().getBulkV2LoadErrorResults(errorWriterFile);
csvReader = new CSVFileReader(new File(errorWriterFile), appConfig, true, false);
this.setErrors(csvReader.getTotalRows());
this.getLoadRateCalculator().setNumErrorsAcrossCompletedJobs(csvReader.getTotalRows());
csvReader.close();
gotUploadResultsFromServer = true;
}
Expand Down
Loading

0 comments on commit fd73263

Please sign in to comment.