Skip to content

Commit

Permalink
Fix possible thread leak
Browse files Browse the repository at this point in the history
  • Loading branch information
IAM20 authored and Myeonghyeon-Lee committed Sep 1, 2020
1 parent 8d1263c commit 2cca953
Showing 1 changed file with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public <R> Flux<R> queryFlux(
BlockingQueue<FluxItem<R>> queue = new LinkedBlockingQueue<>(queueSize);
AtomicBoolean isClosed = new AtomicBoolean(false);

return generateFluxFromQueue(queue, isClosed)
return generateFluxFromQueue(queue, bufferTimeout, isClosed)
.doOnCancel(() -> isClosed.set(true))
.doFirst(() -> scheduler.schedule(() -> {
try {
Expand All @@ -201,9 +201,13 @@ public void processRow(ResultSet resultSet, int rowNum) throws SQLException {
} catch (UncategorizedSQLException e) {
/* Sort of db timeout. */
isClosed.set(true);
insertToBlockingQueue(queue, endItem(), isClosed, bufferTimeout);
logger.error("Failed to generate flux.", e);
return;
} catch (Exception e) {
isClosed.set(true);
insertToBlockingQueue(queue, endItem(), isClosed, bufferTimeout);
logger.error("Failed to generate flux.", e);
throw e; /* To propagate exception to subscriber */
}

Expand All @@ -223,7 +227,7 @@ private <R> void insertToBlockingQueue(
if (!queue.offer(item, bufferTimeout, TimeUnit.MILLISECONDS)) {
/* Close the flux. */
isClosed.set(true);
throw new TimeoutException("Can't insert into blocking queue.");
throw new TimeoutException("Cannot insert into blocking queue.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -234,7 +238,7 @@ private <R> void insertToBlockingQueue(
}

private <R> Flux<R> generateFluxFromQueue(
BlockingQueue<FluxItem<R>> queue, AtomicBoolean isClosed) {
BlockingQueue<FluxItem<R>> queue, long bufferTimeout, AtomicBoolean isClosed) {

return Flux.generate(sink -> {
if (isClosed.get()) {
Expand All @@ -243,7 +247,12 @@ private <R> Flux<R> generateFluxFromQueue(
"Database Connection is closed."));
}
try {
FluxItem<R> row = queue.take();
FluxItem<R> row = queue.poll(bufferTimeout, TimeUnit.MILLISECONDS);
if (row == null) {
sink.error(new DataAccessResourceFailureException(
"Cannot take element from blocking queue."));
return;
}
if (row.isEnd()) {
sink.complete();
return;
Expand Down

0 comments on commit 2cca953

Please sign in to comment.