Skip to content

Commit

Permalink
Add Latch for consumers (#18655)
Browse files Browse the repository at this point in the history
(cherry picked from commit 8c6de37)
  • Loading branch information
mohityadav766 committed Nov 15, 2024
1 parent 64a29d1 commit 67c3ed7
Showing 1 changed file with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,15 @@ private void performReindex(JobExecutionContext jobExecutionContext) throws Inte
private void processEntityReindex(JobExecutionContext jobExecutionContext)
throws InterruptedException {
int numConsumers = jobData.getConsumerThreads();
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
int latchCount = getTotalLatchCount(jobData.getEntities());
CountDownLatch producerLatch = new CountDownLatch(latchCount);
CountDownLatch consumerLatch = new CountDownLatch(latchCount + numConsumers);
submitProducerTask(producerLatch);
submitConsumerTask(jobExecutionContext);
submitConsumerTask(jobExecutionContext, consumerLatch);

producerLatch.await();
sendPoisonPills(numConsumers);
consumerLatch.await();
}

private void submitProducerTask(CountDownLatch producerLatch) {
Expand Down Expand Up @@ -368,12 +371,12 @@ private void submitProducerTask(CountDownLatch producerLatch) {
}
}

private void submitConsumerTask(JobExecutionContext jobExecutionContext) {
private void submitConsumerTask(JobExecutionContext jobExecutionContext, CountDownLatch latch) {
for (int i = 0; i < jobData.getConsumerThreads(); i++) {
consumerExecutor.submit(
() -> {
try {
consumeTasks(jobExecutionContext);
consumeTasks(jobExecutionContext, latch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Consumer thread interrupted.");
Expand All @@ -382,7 +385,8 @@ private void submitConsumerTask(JobExecutionContext jobExecutionContext) {
}
}

private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {
private void consumeTasks(JobExecutionContext jobExecutionContext, CountDownLatch latch)
throws InterruptedException {
while (true) {
IndexingTask<?> task = taskQueue.take();
LOG.info(
Expand All @@ -391,9 +395,10 @@ private void consumeTasks(JobExecutionContext jobExecutionContext) throws Interr
task.currentEntityOffset());
if (task == IndexingTask.POISON_PILL) {
LOG.debug("Received POISON_PILL. Consumer thread terminating.");
latch.countDown();
break;
}
processTask(task, jobExecutionContext);
processTask(task, jobExecutionContext, latch);
}
}

Expand Down Expand Up @@ -589,7 +594,8 @@ public void stopJob() {
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 60, TimeUnit.SECONDS);
}

private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
private void processTask(
IndexingTask<?> task, JobExecutionContext jobExecutionContext, CountDownLatch latch) {
String entityType = task.entityType();
ResultList<?> entities = task.entities();
Map<String, Object> contextData = new HashMap<>();
Expand Down Expand Up @@ -632,7 +638,7 @@ private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionC

} catch (Exception e) {
synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
jobData.setFailure(
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
Expand All @@ -646,6 +652,7 @@ private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionC
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
} finally {
sendUpdates(jobExecutionContext);
latch.countDown();
}
}

Expand Down

0 comments on commit 67c3ed7

Please sign in to comment.