diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java index 8912355c..a6263711 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java @@ -149,7 +149,7 @@ private Map committedOffsets(AdminFactory adminFactory, St } private void throwExceptionIfCoordinatorIsTerminated() { - if (isLeader && coordinatorThread.isTerminated()) { + if (isLeader && coordinatorThread != null && coordinatorThread.isTerminated()) { throw new IllegalStateException("Coordinator unexpectedly terminated"); } } @@ -245,7 +245,7 @@ private void close(T closeable) { @Override public void close() throws IOException { close(commitRequestListener); - if (isLeader) { + if (isLeader && coordinatorThread != null) { coordinatorThread.terminate(); } close(producer);