diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index e42a29da..b061b5c9 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -120,6 +120,15 @@ public async Task Shutdown() request, response, metadata); } + public async Task> + RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var metadata = _client.Sign(); + var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout); + return new RpcInvocation( + request, response, metadata); + } + public async Task> SendMessage( Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout) { diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index 743df9fe..ac9108dc 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -61,6 +61,16 @@ Task> Heartbeat(Endpoints end /// Task of response. Task> NotifyClientTermination( Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout); + + /// + /// Recall messages. + /// + /// The target endpoints. + /// gRPC request of recalling messages. + /// Request max duration. + /// Task of response. + Task> RecallMessage( + Endpoints endpoints, RecallMessageRequest request, TimeSpan timeout); /// /// Send message to remote endpoints. diff --git a/csharp/rocketmq-client-csharp/IRecallReceipt.cs b/csharp/rocketmq-client-csharp/IRecallReceipt.cs new file mode 100644 index 00000000..8291cd66 --- /dev/null +++ b/csharp/rocketmq-client-csharp/IRecallReceipt.cs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public interface IRecallReceipt + { + string MessageId { get; } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs b/csharp/rocketmq-client-csharp/IRpcClient.cs index 8145ea18..eb369c2d 100644 --- a/csharp/rocketmq-client-csharp/IRpcClient.cs +++ b/csharp/rocketmq-client-csharp/IRpcClient.cs @@ -52,6 +52,8 @@ Task ForwardMessageToDeadLetterQueue(Me Task NotifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout); + Task RecallMessage(Metadata metadata, RecallMessageRequest request, TimeSpan timeout); + Task Shutdown(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs b/csharp/rocketmq-client-csharp/ISendReceipt.cs index f1004b5b..eeba4e03 100644 --- a/csharp/rocketmq-client-csharp/ISendReceipt.cs +++ b/csharp/rocketmq-client-csharp/ISendReceipt.cs @@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq public interface ISendReceipt { string MessageId { get; } + string RecallHandle { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 24f1a0ac..4f92303d 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -350,6 +350,33 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } + public async Task RecallMessage(string topic, string recallhandle) + { + var recallReceipt = await RecallMessage0(topic, recallhandle); + return recallReceipt; + } + + private async Task RecallMessage0(string topic, string recallhandle) + { + if (State.Running != State) + { + throw new InvalidOperationException("Producer is not running"); + } + if (recallhandle == null) + { + throw new InvalidOperationException("Recall handle is invalid"); + } + var request = new Proto.RecallMessageRequest + { + Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic }, + RecallHandle = recallhandle + }; + var invocation = + await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout); + StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); + return new RecallReceipt(invocation.Response.MessageId); + } + public class Builder { private ClientConfig _clientConfig; diff --git a/csharp/rocketmq-client-csharp/RecallReceipt.cs b/csharp/rocketmq-client-csharp/RecallReceipt.cs new file mode 100644 index 00000000..80cf120c --- /dev/null +++ b/csharp/rocketmq-client-csharp/RecallReceipt.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public sealed class RecallReceipt : IRecallReceipt + { + public RecallReceipt(string messageId) + { + MessageId = messageId; + } + + public string MessageId { get; } + + public override string ToString() + { + return $"{nameof(MessageId)}: {MessageId}"; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs index eeff96e5..346ead28 100644 --- a/csharp/rocketmq-client-csharp/RpcClient.cs +++ b/csharp/rocketmq-client-csharp/RpcClient.cs @@ -189,5 +189,14 @@ internal static HttpMessageHandler CreateHttpHandler() var call = _stub.NotifyClientTerminationAsync(request, callOptions); return await call.ResponseAsync; } + + public async Task RecallMessage(Metadata metadata, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var deadline = DateTime.UtcNow.Add(timeout); + var callOptions = new CallOptions(metadata, deadline); + + var call = _stub.RecallMessageAsync(request, callOptions); + return await call.ResponseAsync; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index c9fe8014..06567b49 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -23,14 +23,20 @@ namespace Org.Apache.Rocketmq { public sealed class SendReceipt : ISendReceipt { - private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue) + private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle) { MessageId = messageId; TransactionId = transactionId; MessageQueue = messageQueue; + Offset = offset; + RecallHandle = recallHandle; } public string MessageId { get; } + + public string RecallHandle { get; } + + public long Offset { get; } public string TransactionId { get; } @@ -40,7 +46,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message public override string ToString() { - return $"{nameof(MessageId)}: {MessageId}"; + return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}"; } public static IEnumerable ProcessSendMessageResponse(MessageQueue mq, @@ -58,7 +64,7 @@ public static IEnumerable ProcessSendMessageResponse(MessageQueue m // May throw exception. StatusChecker.Check(status, invocation.Request, invocation.RequestId); - return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList(); + return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList(); } } } \ No newline at end of file diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs index 5e4e7eef..e1b07fb7 100644 --- a/csharp/tests/ClientManagerTest.cs +++ b/csharp/tests/ClientManagerTest.cs @@ -120,6 +120,15 @@ public void TestNotifyClientTermination() _clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1)); // Expect no exception thrown. } + + [TestMethod] + public void TestRecallMessage() + { + var request = new RecallMessageRequest(); + _clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1)); + _clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } private Client CreateTestClient() { diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs index ce0cca15..bc38208e 100644 --- a/csharp/tests/ProducerTest.cs +++ b/csharp/tests/ProducerTest.cs @@ -95,6 +95,48 @@ public async Task TestSendFailureWithTopic() mockClientManager.Verify(cm => cm.SendMessage(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(maxAttempts)); } + + [TestMethod] + public async Task TestRecall() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var metadata = producer.Sign(); + var recallReceipt = new RecallReceipt(MessageIdGenerator.GetInstance().Next()); + var recallMessageResponse = new Proto.RecallMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + MessageId = recallReceipt.MessageId + }; + var recallMessageInvocation = new RpcInvocation(null, + recallMessageResponse, metadata); + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny())).Returns(Task.FromResult(recallMessageInvocation)); + await producer.RecallMessage("testTopic", "handle"); + mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public async Task TestRecallFailure() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + var exception = new ArgumentException(); + mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny())).Throws(exception); + await producer.RecallMessage("testTopic", "handle"); + mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + } private Producer CreateTestClient() { diff --git a/protos b/protos index 70eb1eff..df8f85cd 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit 70eb1effd4b415678fc7214cedd3a289b0f926b9 +Subproject commit df8f85cdfa850b204adea777076c2d3d8fb0c3fc