diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/AbstractQueryVisitor.java b/src/main/java/com/salesforce/dataloader/action/visitor/AbstractQueryVisitor.java index f875785a..119530e3 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/AbstractQueryVisitor.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/AbstractQueryVisitor.java @@ -33,6 +33,7 @@ import com.salesforce.dataloader.config.Messages; import com.salesforce.dataloader.controller.Controller; import com.salesforce.dataloader.dao.DataWriterInterface; +import com.salesforce.dataloader.dao.csv.CSVFileReader; import com.salesforce.dataloader.exception.DataAccessObjectException; import com.salesforce.dataloader.exception.DataAccessObjectInitializationException; import com.salesforce.dataloader.exception.ExtractException; @@ -41,10 +42,13 @@ import com.salesforce.dataloader.mapping.SOQLMapper; import com.salesforce.dataloader.model.Row; import com.salesforce.dataloader.model.TableHeader; +import com.salesforce.dataloader.model.TableRow; import com.sforce.async.AsyncApiException; import com.sforce.soap.partner.fault.ApiFault; import com.sforce.ws.ConnectionException; +import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -89,15 +93,20 @@ public AbstractQueryVisitor(AbstractExtractAction action, Controller controller, public final void visit() throws DataAccessObjectException, OperationException { try { if (getProgressMonitor().isCanceled()) return; - final int size = executeQuery(getSoql()); - if (size == 0) { - getLogger().info(Messages.getMessage(getClass(), "noneReturned")); - } else { - if (getProgressMonitor().isCanceled()) return; - startWriteExtraction(size); - writeExtraction(); - flushResults(); - } + String soql = getSoqlForNextBatch(); + while (soql != null) { + final int size = executeQuery(soql); + if (size == 0) { + getLogger().info(Messages.getMessage(getClass(), "noneReturned")); + } else { + if (getProgressMonitor().isCanceled()) + return; + startWriteExtraction(size); + writeExtraction(); + flushResults(); + } + soql = getSoqlForNextBatch(); + } } catch (final ApiFault e) { throw new ExtractException(e.getExceptionMessage(), e); } catch (final ConnectionException e) { @@ -116,9 +125,70 @@ protected abstract void writeExtraction() throws AsyncApiException, ExtractExcep protected boolean writeStatus() { return getConfig().getBoolean(AppConfig.PROP_ENABLE_EXTRACT_STATUS_OUTPUT); } - - private String getSoql() { - return this.soql; + + public static final int DEFAULT_MAX_SOQL_CHAR_LENGTH = 500; + public static final int MAX_IDLOOKUP_FIELD_LENGTH = 255; + private int daoLastProcessedRow = 0; + private CSVFileReader csvReader = null; + private String inClauseColName = null; + private int numRows = 0; + private boolean isGetSoqlForNextBatchCalled = false; + private int maxSoqlCharLength = DEFAULT_MAX_SOQL_CHAR_LENGTH; + private static final String INFILE = "IN"; + private String getSoqlForNextBatch() throws OperationException { + List inClauseFileAndColumnNameList = parseInClauseForFileAndColumnName(soql); + if (inClauseFileAndColumnNameList.size() == 2) { + inClauseColName = inClauseFileAndColumnNameList.get(1).strip(); + if (inClauseColName.startsWith("'") || inClauseColName.startsWith("\"")) { + inClauseColName = inClauseColName.substring(1, inClauseColName.length()); + } + if (inClauseColName.endsWith("'") || inClauseColName.endsWith("\"")) { + inClauseColName = inClauseColName.substring(0, inClauseColName.length() - 1); + } + if (csvReader == null) { + String filePath = inClauseFileAndColumnNameList.get(0).strip(); + if (filePath.startsWith("'") || filePath.startsWith("\"")) { + filePath = filePath.substring(1, filePath.length()); + } + if (filePath.endsWith("'") || filePath.endsWith("\"")) { + filePath = filePath.substring(0, filePath.length() - 1); + } + try { + csvReader = new CSVFileReader(new File(filePath), controller.getAppConfig(), false, false); + csvReader.open(); + List daoColList = csvReader.getColumnNames(); + boolean columnExists = false; + for (String colName : daoColList) { + if (inClauseColName.equals(colName)) { + columnExists = true; + break; + } + } + if (!columnExists) { + throw new OperationException("Column " + inClauseColName + " not found in the file"); + } + numRows = csvReader.getTotalRows(); + } catch (DataAccessObjectException e) { + throw new OperationException("Error reading file for IN clause", e); + } + } + String batchSoql = null; + try { + batchSoql = constructSoqlFromFile(soql, csvReader, inClauseColName); + } catch (IOException | DataAccessObjectException e) { + throw new OperationException("Error reading file for INFILE clause", e); + } + if (batchSoql == null) { + csvReader.close(); + csvReader = null; + } + return batchSoql; + } else if (!isGetSoqlForNextBatchCalled) { + isGetSoqlForNextBatchCalled = true; + return soql; + } else { + return null; + } } private DataWriterInterface getQueryWriter() { @@ -308,5 +378,70 @@ protected void startWriteExtraction(int size) { protected SOQLMapper getMapper() { return (SOQLMapper)super.getMapper(); } + + public void setMaxSoqlCharLength(int maxSoqlCharLength) { + this.maxSoqlCharLength = maxSoqlCharLength; + } + + private static final String IN_CLAUSE = " IN "; + private String constructSoqlFromFile(String soql, CSVFileReader csvReader, String columnName) throws IOException, DataAccessObjectException { + if (daoLastProcessedRow == numRows) { + return null; + } + String[] soqlParts = soql.toUpperCase().split(IN_CLAUSE); + String[] soqlAfterInClauseParts = soqlParts[1].split("\\)"); + String soqlAfterInClause = ""; + int idxOfSoqlAfterInClause = 0; + if (soqlAfterInClauseParts.length > 1) { + soqlAfterInClause = soqlAfterInClauseParts[1]; + idxOfSoqlAfterInClause = soql.toUpperCase().indexOf(soqlAfterInClause); + soqlAfterInClause = soql.substring(idxOfSoqlAfterInClause); + } + StringBuilder soqlBuilder = new StringBuilder(soql.substring(0, + soql.toUpperCase().indexOf(IN_CLAUSE))); + soqlBuilder.append(IN_CLAUSE + "("); + + boolean firstRowOfCurrentBatch = true; + int soqlLength = soqlBuilder.length() + MAX_IDLOOKUP_FIELD_LENGTH + 4 + soqlAfterInClause.length(); + while (daoLastProcessedRow < numRows + && soqlLength < this.maxSoqlCharLength) { + if (firstRowOfCurrentBatch) { + firstRowOfCurrentBatch = false; + } else { + soqlBuilder.append(","); + } + TableRow row = csvReader.readTableRow(); + soqlBuilder.append("'"); + soqlBuilder.append(row.get(columnName)); + soqlBuilder.append("'"); + soqlLength = soqlBuilder.length() + MAX_IDLOOKUP_FIELD_LENGTH + 4 + soqlAfterInClause.length(); + daoLastProcessedRow++; + } + soqlBuilder.append(") "); + soqlBuilder.append(soqlAfterInClause); + logger.info("Constructed SOQL: " + soqlBuilder.toString()); + return soqlBuilder.toString(); + } + + private static final String IN_CLAUSE_REGEX = "\\s+IN\\s+\\(\\s*\\{\\s*([^}]+)\\s*\\}\\s*,\\s*\\{\\s*([^}]+)\\s*\\}\\s*\\)"; + private List parseInClauseForFileAndColumnName(String input) { + List values = new ArrayList<>(); + Pattern pattern = Pattern.compile(IN_CLAUSE_REGEX); + Matcher matcher = pattern.matcher(input); + + if (matcher.find()) { + String inClause = matcher.group(1); + String[] items = inClause.split(","); + for (String item : items) { + values.add(item.strip()); + } + inClause = matcher.group(2); + items = inClause.split(","); + for (String item : items) { + values.add(item.strip()); + } + } + return values; + } } diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java index 84b1dfa8..e0974349 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java @@ -157,7 +157,7 @@ public void setJobInfo(JobInfo jinfo) { this.jobInfo = jinfo; } - void createJob() throws AsyncApiException { + void createJob(String soql) throws AsyncApiException { JobInfo job = new JobInfo(); final OperationEnum op = this.appConfig.getOperationInfo().getBulkOperationEnum(); job.setOperation(op); @@ -201,7 +201,7 @@ void createJob() throws AsyncApiException { } } if (isBulkV2QueryJob()) { - job.setObject(this.appConfig.getString(AppConfig.PROP_EXTRACT_SOQL)); + job.setObject(soql); logger.info("going to create BulkV2 query job"); } job = this.connection.createJob(job); diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkLoadVisitor.java b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkLoadVisitor.java index 576c25a8..825f8211 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkLoadVisitor.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkLoadVisitor.java @@ -156,7 +156,7 @@ protected BulkApiVisitorUtil getVisitorUtil() { @Override protected void loadBatch() throws DataAccessObjectException, OperationException, BatchSizeLimitException { try { - if (!this.jobUtil.hasJob()) this.jobUtil.createJob(); + if (!this.jobUtil.hasJob()) this.jobUtil.createJob(null); createBatches(); clearArrays(); } catch (final AsyncApiException e) { diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1QueryVisitor.java b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1QueryVisitor.java index 73de2b32..0ce8f0a8 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1QueryVisitor.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1QueryVisitor.java @@ -65,7 +65,7 @@ public BulkV1QueryVisitor(AbstractExtractAction action, Controller controller, I protected int executeQuery(String soql) throws AsyncApiException, OperationException { final BulkApiVisitorUtil jobUtil = new BulkApiVisitorUtil(getController(), getProgressMonitor(), getRateCalculator(), false); - jobUtil.createJob(); + jobUtil.createJob(soql); try { jobUtil.createBatch(new ByteArrayInputStream(soql.getBytes(AppConfig.BULK_API_ENCODING))); } catch (final UnsupportedEncodingException e) { diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV2QueryVisitor.java b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV2QueryVisitor.java index fa1dcc7a..636bb0d5 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV2QueryVisitor.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV2QueryVisitor.java @@ -58,7 +58,7 @@ public BulkV2QueryVisitor(AbstractExtractAction action, Controller controller, I protected int executeQuery(String soql) throws AsyncApiException, OperationException { final BulkApiVisitorUtil jobUtil = new BulkApiVisitorUtil(getController(), getProgressMonitor(), getRateCalculator(), false); - jobUtil.createJob(); + jobUtil.createJob(soql); this.jobId = jobUtil.getJobId(); jobUtil.awaitCompletionAndCloseJob(); return jobUtil.getRecordsProcessed();