Skip to content

Commit

Permalink
read ids from csv file's column for IN clause of extract operation
Browse files Browse the repository at this point in the history
Based on https://ideas.salesforce.com/s/idea/a0B8W00000GdWDxUAN/data-loader-export-based-on-csv-input-file

a SOQL IN clause followed by ({<file name>}, {<column name>}) when specified will be used to construct the SOQL with IN clause based on values specified in the column <column name> of file <file name>.
  • Loading branch information
ashitsalesforce committed Jan 17, 2025
1 parent 70f04fa commit b086863
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.salesforce.dataloader.config.Messages;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.dao.DataWriterInterface;
import com.salesforce.dataloader.dao.csv.CSVFileReader;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.exception.ExtractException;
Expand All @@ -41,10 +42,13 @@
import com.salesforce.dataloader.mapping.SOQLMapper;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableHeader;
import com.salesforce.dataloader.model.TableRow;
import com.sforce.async.AsyncApiException;
import com.sforce.soap.partner.fault.ApiFault;
import com.sforce.ws.ConnectionException;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -89,15 +93,20 @@ public AbstractQueryVisitor(AbstractExtractAction action, Controller controller,
public final void visit() throws DataAccessObjectException, OperationException {
try {
if (getProgressMonitor().isCanceled()) return;
final int size = executeQuery(getSoql());
if (size == 0) {
getLogger().info(Messages.getMessage(getClass(), "noneReturned"));
} else {
if (getProgressMonitor().isCanceled()) return;
startWriteExtraction(size);
writeExtraction();
flushResults();
}
String soql = getSoqlForNextBatch();
while (soql != null) {
final int size = executeQuery(soql);
if (size == 0) {
getLogger().info(Messages.getMessage(getClass(), "noneReturned"));
} else {
if (getProgressMonitor().isCanceled())
return;
startWriteExtraction(size);
writeExtraction();
flushResults();
}
soql = getSoqlForNextBatch();
}
} catch (final ApiFault e) {
throw new ExtractException(e.getExceptionMessage(), e);
} catch (final ConnectionException e) {
Expand All @@ -116,9 +125,70 @@ protected abstract void writeExtraction() throws AsyncApiException, ExtractExcep
protected boolean writeStatus() {
return getConfig().getBoolean(AppConfig.PROP_ENABLE_EXTRACT_STATUS_OUTPUT);
}

private String getSoql() {
return this.soql;

public static final int DEFAULT_MAX_SOQL_CHAR_LENGTH = 500;
public static final int MAX_IDLOOKUP_FIELD_LENGTH = 255;
private int daoLastProcessedRow = 0;
private CSVFileReader csvReader = null;
private String inClauseColName = null;
private int numRows = 0;
private boolean isGetSoqlForNextBatchCalled = false;
private int maxSoqlCharLength = DEFAULT_MAX_SOQL_CHAR_LENGTH;
private static final String INFILE = "IN";
private String getSoqlForNextBatch() throws OperationException {
List<String> inClauseFileAndColumnNameList = parseInClauseForFileAndColumnName(soql);
if (inClauseFileAndColumnNameList.size() == 2) {
inClauseColName = inClauseFileAndColumnNameList.get(1).strip();
if (inClauseColName.startsWith("'") || inClauseColName.startsWith("\"")) {
inClauseColName = inClauseColName.substring(1, inClauseColName.length());
}
if (inClauseColName.endsWith("'") || inClauseColName.endsWith("\"")) {
inClauseColName = inClauseColName.substring(0, inClauseColName.length() - 1);
}
if (csvReader == null) {
String filePath = inClauseFileAndColumnNameList.get(0).strip();
if (filePath.startsWith("'") || filePath.startsWith("\"")) {
filePath = filePath.substring(1, filePath.length());
}
if (filePath.endsWith("'") || filePath.endsWith("\"")) {
filePath = filePath.substring(0, filePath.length() - 1);
}
try {
csvReader = new CSVFileReader(new File(filePath), controller.getAppConfig(), false, false);
csvReader.open();
List<String> daoColList = csvReader.getColumnNames();
boolean columnExists = false;
for (String colName : daoColList) {
if (inClauseColName.equals(colName)) {
columnExists = true;
break;
}
}
if (!columnExists) {
throw new OperationException("Column " + inClauseColName + " not found in the file");
}
numRows = csvReader.getTotalRows();
} catch (DataAccessObjectException e) {
throw new OperationException("Error reading file for IN clause", e);
}
}
String batchSoql = null;
try {
batchSoql = constructSoqlFromFile(soql, csvReader, inClauseColName);
} catch (IOException | DataAccessObjectException e) {
throw new OperationException("Error reading file for INFILE clause", e);
}
if (batchSoql == null) {
csvReader.close();
csvReader = null;
}
return batchSoql;
} else if (!isGetSoqlForNextBatchCalled) {
isGetSoqlForNextBatchCalled = true;
return soql;
} else {
return null;
}
}

private DataWriterInterface getQueryWriter() {
Expand Down Expand Up @@ -308,5 +378,70 @@ protected void startWriteExtraction(int size) {
protected SOQLMapper getMapper() {
return (SOQLMapper)super.getMapper();
}

public void setMaxSoqlCharLength(int maxSoqlCharLength) {
this.maxSoqlCharLength = maxSoqlCharLength;
}

private static final String IN_CLAUSE = " IN ";
private String constructSoqlFromFile(String soql, CSVFileReader csvReader, String columnName) throws IOException, DataAccessObjectException {
if (daoLastProcessedRow == numRows) {
return null;
}
String[] soqlParts = soql.toUpperCase().split(IN_CLAUSE);
String[] soqlAfterInClauseParts = soqlParts[1].split("\\)");
String soqlAfterInClause = "";
int idxOfSoqlAfterInClause = 0;
if (soqlAfterInClauseParts.length > 1) {
soqlAfterInClause = soqlAfterInClauseParts[1];
idxOfSoqlAfterInClause = soql.toUpperCase().indexOf(soqlAfterInClause);
soqlAfterInClause = soql.substring(idxOfSoqlAfterInClause);
}
StringBuilder soqlBuilder = new StringBuilder(soql.substring(0,
soql.toUpperCase().indexOf(IN_CLAUSE)));
soqlBuilder.append(IN_CLAUSE + "(");

boolean firstRowOfCurrentBatch = true;
int soqlLength = soqlBuilder.length() + MAX_IDLOOKUP_FIELD_LENGTH + 4 + soqlAfterInClause.length();
while (daoLastProcessedRow < numRows
&& soqlLength < this.maxSoqlCharLength) {
if (firstRowOfCurrentBatch) {
firstRowOfCurrentBatch = false;
} else {
soqlBuilder.append(",");
}
TableRow row = csvReader.readTableRow();
soqlBuilder.append("'");
soqlBuilder.append(row.get(columnName));
soqlBuilder.append("'");
soqlLength = soqlBuilder.length() + MAX_IDLOOKUP_FIELD_LENGTH + 4 + soqlAfterInClause.length();
daoLastProcessedRow++;
}
soqlBuilder.append(") ");
soqlBuilder.append(soqlAfterInClause);
logger.info("Constructed SOQL: " + soqlBuilder.toString());
return soqlBuilder.toString();
}

private static final String IN_CLAUSE_REGEX = "\\s+IN\\s+\\(\\s*\\{\\s*([^}]+)\\s*\\}\\s*,\\s*\\{\\s*([^}]+)\\s*\\}\\s*\\)";
private List<String> parseInClauseForFileAndColumnName(String input) {
List<String> values = new ArrayList<>();
Pattern pattern = Pattern.compile(IN_CLAUSE_REGEX);
Matcher matcher = pattern.matcher(input);

if (matcher.find()) {
String inClause = matcher.group(1);
String[] items = inClause.split(",");
for (String item : items) {
values.add(item.strip());
}
inClause = matcher.group(2);
items = inClause.split(",");
for (String item : items) {
values.add(item.strip());
}
}
return values;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void setJobInfo(JobInfo jinfo) {
this.jobInfo = jinfo;
}

void createJob() throws AsyncApiException {
void createJob(String soql) throws AsyncApiException {
JobInfo job = new JobInfo();
final OperationEnum op = this.appConfig.getOperationInfo().getBulkOperationEnum();
job.setOperation(op);
Expand Down Expand Up @@ -201,7 +201,7 @@ void createJob() throws AsyncApiException {
}
}
if (isBulkV2QueryJob()) {
job.setObject(this.appConfig.getString(AppConfig.PROP_EXTRACT_SOQL));
job.setObject(soql);
logger.info("going to create BulkV2 query job");
}
job = this.connection.createJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected BulkApiVisitorUtil getVisitorUtil() {
@Override
protected void loadBatch() throws DataAccessObjectException, OperationException, BatchSizeLimitException {
try {
if (!this.jobUtil.hasJob()) this.jobUtil.createJob();
if (!this.jobUtil.hasJob()) this.jobUtil.createJob(null);
createBatches();
clearArrays();
} catch (final AsyncApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public BulkV1QueryVisitor(AbstractExtractAction action, Controller controller, I
protected int executeQuery(String soql) throws AsyncApiException, OperationException {
final BulkApiVisitorUtil jobUtil = new BulkApiVisitorUtil(getController(), getProgressMonitor(),
getRateCalculator(), false);
jobUtil.createJob();
jobUtil.createJob(soql);
try {
jobUtil.createBatch(new ByteArrayInputStream(soql.getBytes(AppConfig.BULK_API_ENCODING)));
} catch (final UnsupportedEncodingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public BulkV2QueryVisitor(AbstractExtractAction action, Controller controller, I
protected int executeQuery(String soql) throws AsyncApiException, OperationException {
final BulkApiVisitorUtil jobUtil = new BulkApiVisitorUtil(getController(), getProgressMonitor(),
getRateCalculator(), false);
jobUtil.createJob();
jobUtil.createJob(soql);
this.jobId = jobUtil.getJobId();
jobUtil.awaitCompletionAndCloseJob();
return jobUtil.getRecordsProcessed();
Expand Down

0 comments on commit b086863

Please sign in to comment.