Skip to content

Commit

Permalink
Add recalling API for C# client
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jan 8, 2025
1 parent d2f9588 commit b784336
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 4 deletions.
9 changes: 9 additions & 0 deletions csharp/rocketmq-client-csharp/ClientManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public async Task Shutdown()
request, response, metadata);
}

public async Task<RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>>
RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout)
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout);
return new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(
request, response, metadata);
}

public async Task<RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>> SendMessage(
Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout)
{
Expand Down
10 changes: 10 additions & 0 deletions csharp/rocketmq-client-csharp/IClientManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ Task<RpcInvocation<HeartbeatRequest, HeartbeatResponse>> Heartbeat(Endpoints end
/// <returns>Task of response.</returns>
Task<RpcInvocation<NotifyClientTerminationRequest, NotifyClientTerminationResponse>> NotifyClientTermination(
Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout);

/// <summary>
/// Recall messages.
/// </summary>
/// <param name="endpoints">The target endpoints.</param>
/// <param name="request">gRPC request of recalling messages.</param>
/// <param name="timeout">Request max duration.</param>
/// <returns>Task of response.</returns>
Task<RpcInvocation<RecallMessageRequest, RecallMessageResponse>> RecallMessage(
Endpoints endpoints, RecallMessageRequest request, TimeSpan timeout);

/// <summary>
/// Send message to remote endpoints.
Expand Down
24 changes: 24 additions & 0 deletions csharp/rocketmq-client-csharp/IRecallReceipt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Org.Apache.Rocketmq
{
public interface IRecallReceipt
{
string MessageId { get; }
}
}
2 changes: 2 additions & 0 deletions csharp/rocketmq-client-csharp/IRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Task<ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Me
Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
NotifyClientTerminationRequest request, TimeSpan timeout);

Task<RecallMessageResponse> RecallMessage(Metadata metadata, RecallMessageRequest request, TimeSpan timeout);

Task Shutdown();
}
}
1 change: 1 addition & 0 deletions csharp/rocketmq-client-csharp/ISendReceipt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq
public interface ISendReceipt
{
string MessageId { get; }
string RecallHandle { get; }
}
}
27 changes: 27 additions & 0 deletions csharp/rocketmq-client-csharp/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,33 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}

public async Task<IRecallReceipt> RecallMessage(string topic, string recallhandle)
{
var recallReceipt = await RecallMessage0(topic, recallhandle);
return recallReceipt;
}

private async Task<RecallReceipt> RecallMessage0(string topic, string recallhandle)
{
if (State.Running != State)
{
throw new InvalidOperationException("Producer is not running");
}
if (recallhandle == null)
{
throw new InvalidOperationException("Recall handle is invalid");
}
var request = new Proto.RecallMessageRequest
{
Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic },
RecallHandle = recallhandle
};
var invocation =
await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout);
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
return new RecallReceipt(invocation.Response.MessageId);
}

public class Builder
{
private ClientConfig _clientConfig;
Expand Down
34 changes: 34 additions & 0 deletions csharp/rocketmq-client-csharp/RecallReceipt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Org.Apache.Rocketmq
{
public sealed class RecallReceipt : IRecallReceipt
{
public RecallReceipt(string messageId)
{
MessageId = messageId;
}

public string MessageId { get; }

public override string ToString()
{
return $"{nameof(MessageId)}: {MessageId}";
}
}
}
9 changes: 9 additions & 0 deletions csharp/rocketmq-client-csharp/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,14 @@ internal static HttpMessageHandler CreateHttpHandler()
var call = _stub.NotifyClientTerminationAsync(request, callOptions);
return await call.ResponseAsync;
}

public async Task<Proto::RecallMessageResponse> RecallMessage(Metadata metadata, Proto.RecallMessageRequest request, TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);

var call = _stub.RecallMessageAsync(request, callOptions);
return await call.ResponseAsync;
}
}
}
12 changes: 9 additions & 3 deletions csharp/rocketmq-client-csharp/SendReceipt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@ namespace Org.Apache.Rocketmq
{
public sealed class SendReceipt : ISendReceipt
{
private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue)
private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle)
{
MessageId = messageId;
TransactionId = transactionId;
MessageQueue = messageQueue;
Offset = offset;
RecallHandle = recallHandle;
}

public string MessageId { get; }

public string RecallHandle { get; }

public long Offset { get; }

public string TransactionId { get; }

Expand All @@ -40,7 +46,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message

public override string ToString()
{
return $"{nameof(MessageId)}: {MessageId}";
return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}";
}

public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
Expand All @@ -58,7 +64,7 @@ public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue m

// May throw exception.
StatusChecker.Check(status, invocation.Request, invocation.RequestId);
return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList();
}
}
}
9 changes: 9 additions & 0 deletions csharp/tests/ClientManagerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public void TestNotifyClientTermination()
_clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1));
// Expect no exception thrown.
}

[TestMethod]
public void TestRecallMessage()
{
var request = new RecallMessageRequest();
_clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
_clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1));
// Expect no exception thrown.
}

private Client CreateTestClient()
{
Expand Down
42 changes: 42 additions & 0 deletions csharp/tests/ProducerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,48 @@ public async Task TestSendFailureWithTopic()
mockClientManager.Verify(cm => cm.SendMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Exactly(maxAttempts));
}

[TestMethod]
public async Task TestRecall()
{
var producer = CreateTestClient();
producer.State = State.Running;
var metadata = producer.Sign();
var recallReceipt = new RecallReceipt(MessageIdGenerator.GetInstance().Next());
var recallMessageResponse = new Proto.RecallMessageResponse
{
Status = new Proto.Status
{
Code = Proto.Code.Ok
},
MessageId = recallReceipt.MessageId
};
var recallMessageInvocation = new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(null,
recallMessageResponse, metadata);
var mockClientManager = new Mock<IClientManager>();
producer.SetClientManager(mockClientManager.Object);
mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(recallMessageInvocation));
await producer.RecallMessage("testTopic", "handle");
mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
}

[TestMethod]
[ExpectedException(typeof(ArgumentException))]
public async Task TestRecallFailure()
{
var producer = CreateTestClient();
producer.State = State.Running;
var mockClientManager = new Mock<IClientManager>();
producer.SetClientManager(mockClientManager.Object);
var exception = new ArgumentException();
mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>())).Throws(exception);
await producer.RecallMessage("testTopic", "handle");
mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
}

private Producer CreateTestClient()
{
Expand Down
2 changes: 1 addition & 1 deletion protos

0 comments on commit b784336

Please sign in to comment.