diff --git a/.github/workflows/java_coverage.yml b/.github/workflows/java_coverage.yml index 0ad1e25ed..cd9289d77 100644 --- a/.github/workflows/java_coverage.yml +++ b/.github/workflows/java_coverage.yml @@ -30,4 +30,5 @@ jobs: with: flags: java fail_ci_if_error: true + token: e7eb01be-398b-4f7f-a73e-dc35c428cb50 verbose: true diff --git a/README-CN.md b/README-CN.md index 25131b36d..5ae40d3ed 100644 --- a/README-CN.md +++ b/README-CN.md @@ -17,15 +17,16 @@ * 可用 - ✅ * 进行中 - 🚧 -| 特性 | Java | C/C++ | C# | Golang | Rust | Python | Node.js | PHP | -| ---------------------------------------------- | :---: | :---: | :---: | :----: | :---: | :----: | :-----: | :---: | -| Producer with standard messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | -| Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | +| 特性 | Java | C/C++ | C# | Golang | Rust | Python | Node.js | PHP | +|------------------------------------------------| :---: |:------:|:-----:|:------:|:----:|:------:|:-------:| :---: | +| Producer with standard messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with recalling timed/delay messages | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | +| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | +| Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | ## 先决条件和构建 diff --git a/README.md b/README.md index 0772a1726..937d9c314 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al | Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with recalling timed/delay messages | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | | Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | | Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java index 71a3a57bb..37d127254 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java @@ -76,6 +76,31 @@ public interface Producer extends Closeable { */ Transaction beginTransaction() throws ClientException; + /** + * Recall message synchronously, only delay message is supported for now. + * + *
{@code
+     * SendReceipt receipt = producer.send(message);
+     * String recallHandle = receipt.getRecallHandle();
+     * }
+ * + * @param topic the topic of the operation + * @param recallHandle the handle to identify a message to recall + * @return the returned receipt, or throw exception if response status is not OK. + */ + RecallReceipt recallMessage(String topic, String recallHandle) throws ClientException; + + /** + * Recall message asynchronously. + * + *

This method returns immediately, the result is included in the {@link CompletableFuture}; + * + * @param topic the topic of the operation + * @param recallHandle the handle to identify a message to recall + * @return a future that indicates the receipt + */ + CompletableFuture recallMessageAsync(String topic, String recallHandle); + /** * Closes the producer and releases all related resources. * diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java new file mode 100644 index 000000000..7ebeb54b4 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.producer; + +import org.apache.rocketmq.client.apis.message.MessageId; + +public interface RecallReceipt { + MessageId getMessageId(); +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java index dc96f55fa..794da0872 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java @@ -24,4 +24,7 @@ */ public interface SendReceipt { MessageId getMessageId(); + + // Unique handle to identify a message to recall, only delay message is supported for now + String getRecallHandle(); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java index ce7d2e1b0..e9ff20763 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -170,6 +172,16 @@ public abstract RpcFuture endTran public abstract RpcFuture notifyClientTermination(Endpoints endpoints, NotifyClientTerminationRequest request, Duration duration); + /** + * recall message asynchronously, the method ensures no throwable. + * @param endpoints request endpoints. + * @param request recall message request. + * @param duration request max duration. + * @return invocation of response future. + */ + public abstract RpcFuture recallMessage(Endpoints endpoints, + RecallMessageRequest request, Duration duration); + /** * Establish telemetry session stream to server. * diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java index 7630fefba..76cfb617b 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -344,6 +346,21 @@ public RpcFuture endTransaction(E } } + @Override + public RpcFuture recallMessage(Endpoints endpoints, + RecallMessageRequest request, Duration duration) { + try { + final Metadata metadata = client.sign(); + final Context context = new Context(endpoints, metadata); + final RpcClient rpcClient = getRpcClient(endpoints); + final ListenableFuture future = + rpcClient.recallMessage(metadata, request, asyncWorker, duration); + return new RpcFuture<>(context, request, future); + } catch (Throwable t) { + return new RpcFuture<>(t); + } + } + @Override public StreamObserver telemetry(Endpoints endpoints, Duration duration, StreamObserver responseObserver) throws ClientException { diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index a8021770b..1945639b3 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -23,6 +23,8 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.NotifyClientTerminationRequest; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; @@ -50,16 +52,19 @@ import java.util.function.Function; import java.util.stream.Collectors; import net.javacrumbs.futureconverter.java8guava.FutureConverter; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.RecallReceipt; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.apis.producer.Transaction; import org.apache.rocketmq.client.apis.producer.TransactionChecker; import org.apache.rocketmq.client.apis.producer.TransactionResolution; import org.apache.rocketmq.client.java.exception.InternalErrorException; +import org.apache.rocketmq.client.java.exception.StatusChecker; import org.apache.rocketmq.client.java.exception.TooManyRequestsException; import org.apache.rocketmq.client.java.hook.MessageHookPoints; import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus; @@ -250,6 +255,18 @@ public Transaction beginTransaction() { return new TransactionImpl(this); } + @Override + public RecallReceipt recallMessage(String topic, String recallHandle) throws ClientException { + final ListenableFuture future = recallMessage0(topic, recallHandle); + return handleClientFuture(future); + } + + @Override + public CompletableFuture recallMessageAsync(String topic, String recallHandle) { + final ListenableFuture future = recallMessage0(topic, recallHandle); + return FutureConverter.toCompletableFuture(future); + } + @Override public void close() { this.stopAsync().awaitTerminated(); @@ -561,4 +578,32 @@ private ListenableFuture getPublishingLoadBalancer(final return Futures.transform(getRouteData(topic), topicRouteData -> updatePublishingLoadBalancer(topic, topicRouteData), MoreExecutors.directExecutor()); } + + ListenableFuture recallMessage0(String topic, String recallHandle) { + if (!this.isRunning()) { + final IllegalStateException e = new IllegalStateException("Producer is not running now"); + log.error("Unable to recall message because producer is not running, state={}, clientId={}", + this.state(), clientId); + return Futures.immediateFailedFuture(e); + } + if (StringUtils.isEmpty(recallHandle)) { + return Futures.immediateFailedFuture(new IllegalArgumentException("recall handle is invalid")); + } + final RecallMessageRequest request = RecallMessageRequest.newBuilder() + .setTopic(apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(topic) + .build()) + .setRecallHandle(recallHandle) + .build(); + final Duration requestTimeout = clientConfiguration.getRequestTimeout(); + final RpcFuture future = + this.getClientManager().recallMessage(endpoints, request, requestTimeout); + + return Futures.transformAsync(future, response -> { + final Status status = response.getStatus(); + StatusChecker.check(status, future); + return Futures.immediateFuture(new RecallReceiptImpl(response.getMessageId())); + }, MoreExecutors.directExecutor()); + } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java new file mode 100644 index 000000000..b6ba0ed6b --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.impl.producer; + +import com.google.common.base.MoreObjects; +import org.apache.rocketmq.client.apis.message.MessageId; +import org.apache.rocketmq.client.apis.producer.RecallReceipt; +import org.apache.rocketmq.client.java.message.MessageIdCodec; + +public class RecallReceiptImpl implements RecallReceipt { + private final MessageId messageId; + + public RecallReceiptImpl(String messageIdStr) { + messageId = MessageIdCodec.getInstance().decode(messageIdStr); + } + + @Override + public MessageId getMessageId() { + return messageId; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("messageId", messageId) + .toString(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java index 5f9f403f7..ed2fbf79c 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java @@ -39,14 +39,17 @@ public class SendReceiptImpl implements SendReceipt { private final MessageId messageId; private final String transactionId; + private final String recallHandle; private final MessageQueueImpl messageQueue; private final long offset; - private SendReceiptImpl(MessageId messageId, String transactionId, MessageQueueImpl messageQueue, long offset) { + private SendReceiptImpl(MessageId messageId, String transactionId, + MessageQueueImpl messageQueue, long offset, String recallHandle) { this.messageId = messageId; this.transactionId = transactionId; this.messageQueue = messageQueue; this.offset = offset; + this.recallHandle = recallHandle; } @Override @@ -54,6 +57,11 @@ public MessageId getMessageId() { return messageId; } + @Override + public String getRecallHandle() { + return recallHandle; + } + public MessageQueueImpl getMessageQueue() { return messageQueue; } @@ -87,7 +95,8 @@ public static List processResponseInvocation(MessageQueueImpl m final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId()); final String transactionId = entry.getTransactionId(); final long offset = entry.getOffset(); - final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset); + final String recallHandle = entry.getRecallHandle(); + final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset, recallHandle); sendReceipts.add(impl); } return sendReceipts; @@ -97,6 +106,7 @@ public static List processResponseInvocation(MessageQueueImpl m public String toString() { return MoreObjects.toStringHelper(this) .add("messageId", messageId) + .add("recallHandle", recallHandle) .toString(); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java index 4c5e8b76b..c555082f8 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -186,6 +188,18 @@ ListenableFuture endTransaction(Metadata metadata, ListenableFuture notifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, Executor executor, Duration duration); + /** + * Recall message asynchronously. + * + * @param metadata gRPC request header metadata. + * @param request recall message request + * @param executor gRPC asynchronous executor. + * @param duration request max duration. + * @return invocation of response future. + */ + ListenableFuture recallMessage(Metadata metadata, + RecallMessageRequest request, Executor executor, Duration duration); + /** * Start a streaming request and get the request observer. * diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java index a40cd3989..77c2184ed 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java @@ -34,6 +34,8 @@ import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -212,6 +214,14 @@ public ListenableFuture notifyClientTermination .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request); } + @Override + public ListenableFuture recallMessage(Metadata metadata, + RecallMessageRequest request, Executor executor, Duration duration) { + this.activityNanoTime = System.nanoTime(); + return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).recallMessage(request); + } + @Override public StreamObserver telemetry(Metadata metadata, Executor executor, Duration duration, StreamObserver responseObserver) { diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java index 46d82339a..b6ac2ec36 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java @@ -25,6 +25,7 @@ import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.RecallMessageRequest; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.SendMessageRequest; import io.grpc.Metadata; @@ -135,4 +136,12 @@ public void testNotifyClientTermination() { CLIENT_MANAGER.notifyClientTermination(null, request, Duration.ofSeconds(1)); // Expect no exception thrown. } + + @Test + public void testRecallMessage() { + RecallMessageRequest request = RecallMessageRequest.newBuilder().build(); + CLIENT_MANAGER.recallMessage(fakeEndpoints(), request, Duration.ofSeconds(1)); + CLIENT_MANAGER.recallMessage(null, request, Duration.ofSeconds(1)); + // Expect no exception thrown. + } } \ No newline at end of file diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java index 60bb0fefd..20533a8c7 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java @@ -24,10 +24,13 @@ import static org.mockito.Mockito.verify; import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; import apache.rocketmq.v2.MessageQueue; import apache.rocketmq.v2.MessageType; import apache.rocketmq.v2.Permission; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.Resource; +import apache.rocketmq.v2.Status; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Service; import java.util.ArrayList; @@ -38,10 +41,15 @@ import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.java.exception.InternalErrorException; +import org.apache.rocketmq.client.java.impl.ClientManagerImpl; +import org.apache.rocketmq.client.java.message.MessageIdCodec; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.route.TopicRouteData; +import org.apache.rocketmq.client.java.rpc.RpcFuture; import org.apache.rocketmq.client.java.tool.TestBase; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -109,4 +117,37 @@ public void testSendFailureWithTopic() throws ClientException { verify(producer, times(maxAttempts)).send0(any(Endpoints.class), anyList(), any(MessageQueueImpl.class)); producer.close(); } + + @Test + public void testRecall() throws Exception { + final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0); + final String messageId = MessageIdCodec.getInstance().nextMessageId().toString(); + final RecallReceiptImpl recallReceiptImpl = new RecallReceiptImpl(messageId); + Mockito.doReturn(Futures.immediateFuture(recallReceiptImpl)).when(producer).recallMessage0(any(), any()); + producer.recallMessage(FAKE_TOPIC_0, "handle"); + verify(producer, times(1)).recallMessage0(any(), any()); + producer.close(); + } + + @Test + public void testRecallFailure() { + RecallMessageResponse response = RecallMessageResponse.newBuilder() + .setMessageId("") + .setStatus(Status.newBuilder().setCode(Code.INTERNAL_ERROR).build()) + .build(); + final ClientManagerImpl clientManager = mock(ClientManagerImpl.class); + final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0); + Mockito.doReturn(clientManager).when(producer).getClientManager(); + Mockito.doReturn(new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response))) + .when(clientManager).recallMessage(any(), any(), any()); + + Assert.assertThrows(InternalErrorException.class, () -> { + producer.recallMessage(FAKE_TOPIC_0, "handle"); + }); + producer.recallMessageAsync(FAKE_TOPIC_0, "handle").whenComplete((r, t) -> { + Assert.assertTrue(t instanceof InternalErrorException); + }); + verify(producer, times(2)).recallMessage0(any(), any()); + producer.close(); + } } \ No newline at end of file