Skip to content

Commit

Permalink
#46 Attempt to reduce jvm crashes on macOS
Browse files Browse the repository at this point in the history
  • Loading branch information
patschuh committed Jun 7, 2024
1 parent 604bd84 commit fc3771c
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions src/main/java/at/esque/kafka/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ private void getTopicsForCluster() {
} finally {
stopWatch.stop();
LOGGER.info("Finished getting topics for cluster [{}]", stopWatch);
backGroundTaskHolder.backgroundTaskStopped();
Platform.runLater(() -> backGroundTaskHolder.backgroundTaskStopped());
}
}

Expand Down Expand Up @@ -825,8 +825,10 @@ private void getOldestMessages(TopicMessageTypeConfig topic, Map<String, String>
}
try {
Map<Integer, AtomicLong> messagesConsumed = new HashMap<>();
Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer..."));
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> {
backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer...");
backGroundTaskHolder.setIsInProgress(true);
});
subscribeOrAssignToSelectedPartition(topic, consumerId);
Map<TopicPartition, Long> minOffsets = consumerHandler.getMinOffsets(consumerId);
Map<TopicPartition, Long> maxOffsets = consumerHandler.getMaxOffsets(consumerId);
Expand All @@ -853,7 +855,15 @@ private ObservableList<KafkaMessage> getAndClearBaseList(PinTab tab) {
return null;
}
ObservableList<KafkaMessage> baseList = ((MessagesTabContent) tab.getContent()).getMessageTableView().getBaseList();
baseList.clear();
Platform.runLater(baseList::clear);
while (!baseList.isEmpty()) {
LOGGER.info("Waiting for baseList to be cleared by FX thread");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return baseList;
}

Expand Down Expand Up @@ -900,8 +910,11 @@ private void getNewestMessages(TopicMessageTypeConfig topic, Map<String, String>
UUID consumerId = tempconsumerId;
try {
Map<Integer, AtomicLong> messagesConsumed = new HashMap<>();
Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer..."));
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> {
backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer...");
backGroundTaskHolder.setIsInProgress(true);
}
);
subscribeOrAssignToSelectedPartition(topic, consumerId);
Map<TopicPartition, Long> minOffsets = consumerHandler.getMinOffsets(consumerId);
Map<TopicPartition, Long> maxOffsets = consumerHandler.getMaxOffsets(consumerId);
Expand Down Expand Up @@ -963,7 +976,7 @@ private <KT, VT> void getMessagesContinuously(TopicMessageTypeConfig topic, Map<
runInDaemonThread(() -> {
try {
AtomicLong messagesConsumed = new AtomicLong(0);
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> backGroundTaskHolder.setIsInProgress(true));
subscribeOrAssignToSelectedPartition(topic, consumerId);
consumerHandler.seekToOffset(consumerId, -2);
PinTab tab = getActiveTabOrAddNew(topic, false);
Expand Down Expand Up @@ -995,7 +1008,7 @@ private <KT, VT> void trace(TopicMessageTypeConfig topic, Map<String, String> co
return;
}
try {
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> backGroundTaskHolder.setIsInProgress(true));
List<TopicPartition> topicPatitions;
if (fasttracePartition != null) {
topicPatitions = new ArrayList<>(Collections.singletonList(new TopicPartition(selectedTopic(), fasttracePartition)));
Expand Down Expand Up @@ -1135,8 +1148,11 @@ private void getMessagesFromSpecificOffset(TopicMessageTypeConfig topic, Map<Str
try {
Map<Integer, AtomicLong> messagesConsumed = new HashMap<>();
long specifiedOffset = Long.parseLong(specificOffsetTextField.getText());
Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer..."));
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> {
backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer...");
backGroundTaskHolder.setIsInProgress(true);
}
);
subscribeOrAssignToSelectedPartition(topic, consumerId);
Map<TopicPartition, Long> minOffsets = consumerHandler.getMinOffsets(consumerId);
Map<TopicPartition, Long> maxOffsets = consumerHandler.getMaxOffsets(consumerId);
Expand Down Expand Up @@ -1172,8 +1188,11 @@ private void getMessagesStartingFromInstant(TopicMessageTypeConfig topic, Map<St
}
try {
Map<Integer, AtomicLong> messagesConsumed = new HashMap<>();
Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer..."));
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> {
backGroundTaskHolder.setBackGroundTaskDescription("preparing consumer...");
backGroundTaskHolder.setIsInProgress(true);
}
);
subscribeOrAssignToSelectedPartition(topic, consumerId);
Map<TopicPartition, Long> minOffsets = consumerHandler.getMinOffsets(consumerId);
Map<TopicPartition, Long> maxOffsets = consumerHandler.getMaxOffsets(consumerId);
Expand Down Expand Up @@ -1332,6 +1351,9 @@ private void runInDaemonThread(Runnable runnable) {
Thread daemonThread = new Thread(runnable);
daemonThread.setDaemon(true);
daemonThread.start();
daemonThread.setUncaughtExceptionHandler((t, e) -> {
Platform.runLater(() -> ErrorAlert.show(e, controlledStage));
});
}

// Experimental Area
Expand Down

0 comments on commit fc3771c

Please sign in to comment.