Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved ConnectionHandler #183

Merged
merged 2 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Tutorials/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ await AnsiConsole.Live(table)

});
}
private static async ValueTask<List<MessageId>> PublishMessages(string topic, int count, string message, PulsarClient client)
private static async ValueTask<List<MessageIdAdv>> PublishMessages(string topic, int count, string message, PulsarClient client)
{
var keys = new List<MessageId>();
var keys = new List<MessageIdAdv>();
var builder = new ProducerConfigBuilder<byte[]>()
.Topic(topic);
var producer = await client.NewProducerAsync(builder);
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar.Test.API/BatchMessageIdTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void EqualsTest()
var batchMsgId2 = new BatchMessageId(1, 1, 1, 1);
var batchMsgId3 = new BatchMessageId(0, 0, 0, 1);
var batchMsgId4 = new BatchMessageId(0, 0, 0, -1);
var msgId = new MessageId(0, 0, 0);
var msgId = new MessageIdAdv(0, 0, 0);

Assert.True(batchMsgId1.Equals(batchMsgId1));
Assert.False(batchMsgId1.Equals(batchMsgId2));
Expand Down
42 changes: 21 additions & 21 deletions src/SharpPulsar.Test.API/MessageIdCompareToTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class MessageIdCompareToTest
[Fact]
public virtual void TestEqual()
{
var MessageId1 = new MessageId(123L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId1 = new MessageIdAdv(123L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);

var batchMessageId1 = new BatchMessageId(234L, 345L, 456, 567);
var batchMessageId2 = new BatchMessageId(234L, 345L, 456, 567);
Expand All @@ -43,10 +43,10 @@ public virtual void TestEqual()
[Fact]
public virtual void TestGreaterThan()
{
var MessageId1 = new MessageId(124L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(123L, 344L, 567);
var MessageId4 = new MessageId(123L, 344L, 566);
var MessageId1 = new MessageIdAdv(124L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(123L, 344L, 567);
var MessageId4 = new MessageIdAdv(123L, 344L, 566);

var batchMessageId1 = new BatchMessageId(235L, 345L, 456, 567);
var batchMessageId2 = new BatchMessageId(234L, 346L, 456, 567);
Expand Down Expand Up @@ -75,10 +75,10 @@ public virtual void TestGreaterThan()
[Fact]
public virtual void TestLessThan()
{
var MessageId1 = new MessageId(124L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(123L, 344L, 567);
var MessageId4 = new MessageId(123L, 344L, 566);
var MessageId1 = new MessageIdAdv(124L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(123L, 344L, 567);
var MessageId4 = new MessageIdAdv(123L, 344L, 566);

var batchMessageId1 = new BatchMessageId(235L, 345L, 456, 567);
var batchMessageId2 = new BatchMessageId(234L, 346L, 456, 567);
Expand Down Expand Up @@ -107,7 +107,7 @@ public virtual void TestLessThan()
[Fact]
public virtual void TestCompareDifferentType()
{
var MessageId = new MessageId(123L, 345L, 567);
var MessageId = new MessageIdAdv(123L, 345L, 567);
var batchMessageId1 = new BatchMessageId(123L, 345L, 566, 789);
var batchMessageId2 = new BatchMessageId(123L, 345L, 567, 789);
var batchMessageId3 = new BatchMessageId(MessageId);
Expand All @@ -121,7 +121,7 @@ public virtual void TestCompareDifferentType()
[Fact]
public virtual void CompareToSymmetricTest()
{
var simpleMessageId = new MessageId(123L, 345L, 567);
var simpleMessageId = new MessageIdAdv(123L, 345L, 567);
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
var batchMessageId1 = new BatchMessageId(123L, 345L, 567, -1);
var batchMessageId2 = new BatchMessageId(123L, 345L, 567, 1);
Expand All @@ -140,7 +140,7 @@ public virtual void CompareToSymmetricTest()
[Fact]
public virtual void TestMessageIdCompareToTopicMessageId()
{
var MessageId = new MessageId(123L, 345L, 567);
var MessageId = new MessageIdAdv(123L, 345L, 567);
var topicMessageId1 = new TopicMessageId("test-topic-partition-0", "test-topic", new BatchMessageId(123L, 345L, 566, 789));
var topicMessageId2 = new TopicMessageId("test-topic-partition-0", "test-topic", new BatchMessageId(123L, 345L, 567, 789));
var topicMessageId3 = new TopicMessageId("test-topic-partition-0", "test-topic", new BatchMessageId(MessageId));
Expand All @@ -157,8 +157,8 @@ public virtual void TestBatchMessageIdCompareToTopicMessageId()
var MessageId1 = new BatchMessageId(123L, 345L, 567, 789);
var MessageId2 = new BatchMessageId(123L, 345L, 567, 0);
var MessageId3 = new BatchMessageId(123L, 345L, 567, -1);
var topicMessageId1 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageId(123L, 345L, 566));
var topicMessageId2 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageId(123L, 345L, 567));
var topicMessageId1 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageIdAdv(123L, 345L, 566));
var topicMessageId2 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageIdAdv(123L, 345L, 567));
Assert.True(MessageId1.CompareTo(topicMessageId1) > 0, "Expected to be greater than");
Assert.True(MessageId1.CompareTo(topicMessageId2) > 0, "Expected to be greater than");
Assert.True(MessageId2.CompareTo(topicMessageId2) > 0, "Expected to be greater than");
Expand Down Expand Up @@ -187,9 +187,9 @@ public virtual void TestMultiMessageIdEqual()

// 1 item
var topic1 = "topicName1";
var MessageId1 = new MessageId(123L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(345L, 456L, 567);
var MessageId1 = new MessageIdAdv(123L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(345L, 456L, 567);

var item1 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId1 } });
var item2 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId2 } });
Expand Down Expand Up @@ -263,9 +263,9 @@ public virtual void TestMultiMessageIdCompareto()

// 1 item
var topic1 = "topicName1";
var MessageId1 = new MessageId(123L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(345L, 456L, 567);
var MessageId1 = new MessageIdAdv(123L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(345L, 456L, 567);

var item1 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId1 } });
var item2 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId2 } });
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar.Test.API/MessageIdUtilsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class MessageIdUtilsTest
[InlineData(39, 5)]
public void TestId(long ledger, long entry)
{
var id = new MessageId(ledger, entry, -1);
var id = new MessageIdAdv(ledger, entry, -1);
var offset = MessageIdUtils.GetOffset(id);
var id1 = MessageIdUtils.GetMessageId(offset);
Assert.Equal(id, id1);
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test.API/MessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public virtual void TestTopicMessageImplReplicatedInfo()
builder.ReplicatedFrom = from;
var payload = ReadOnlySequence<byte>.Empty;
var msg = Message<byte[]>.Create(builder, payload, ISchema<int>.Bytes);
msg.SetMessageId(new MessageId(-1, -1, -1));
msg.SetMessageId(new MessageIdAdv(-1, -1, -1));
var topicMessage = new TopicMessage<byte[]>(topicName, topicName, msg, null);

Assert.True(topicMessage.Replicated);
Expand All @@ -210,7 +210,7 @@ public virtual void TestTopicMessageImplNoReplicatedInfo()
var builder = new MessageMetadata();
var payload = ReadOnlySequence<byte>.Empty;
var msg = Message<byte[]>.Create(builder, payload, ISchema<int>.Bytes);
msg.SetMessageId(new MessageId(-1, -1, -1));
msg.SetMessageId(new MessageIdAdv(-1, -1, -1));
var topicMessage = new TopicMessage<byte[]>(topicName, topicName, msg, null);

Assert.False(topicMessage.Replicated);
Expand Down
37 changes: 19 additions & 18 deletions src/SharpPulsar.Test/AcknowledgementsGroupingTrackerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public async Task TestAckTracker()
var unack = _client.ActorSystem.ActorOf(UnAckedChunckedMessageIdSequenceMap.Prop());
var tracker = _client.ActorSystem.ActorOf(PersistentAcknowledgmentsGroupingTracker<byte[]>.Prop(unack, consumer.ConsumerActor, consumer.ConsumerActor/*dummy*/, 1, consumer.ConsumerActor, conf));

var msg1 = new MessageId(5, 1, 0);
var msg2 = new MessageId(5, 2, 0);
var msg3 = new MessageId(5, 3, 0);
var msg4 = new MessageId(5, 4, 0);
var msg5 = new MessageId(5, 5, 0);
var msg6 = new MessageId(5, 6, 0);
var msg1 = new MessageIdAdv(5, 1, 0);
var msg2 = new MessageIdAdv(5, 2, 0);
var msg3 = new MessageIdAdv(5, 3, 0);
var msg4 = new MessageIdAdv(5, 4, 0);
var msg5 = new MessageIdAdv(5, 5, 0);
var msg6 = new MessageIdAdv(5, 6, 0);
var isDuplicate = await tracker.Ask<bool>(new IsDuplicate(msg1));
Assert.False(isDuplicate);
tracker.Tell(new AddAcknowledgment(msg1, CommandAck.AckType.Individual, new Dictionary<string, long>()));
Expand Down Expand Up @@ -134,17 +134,18 @@ public async Task TestAckTracker()
public async Task TestImmediateAckingTracker()
{

var builder = new ConsumerConfigBuilder<byte[]>();
builder.AcknowledgmentGroupTime(TimeSpan.Zero);
builder.Topic($"TestAckTracker-{Guid.NewGuid()}");
builder.SubscriptionName($"TestAckTracker-sub-{Guid.NewGuid()}");
var builder = new ConsumerConfigBuilder<byte[]>()
.AcknowledgmentGroupTime(TimeSpan.Zero)
.IsAckReceiptEnabled(false)
.Topic($"TestAckTracker-{Guid.NewGuid()}")
.SubscriptionName($"TestAckTracker-sub-{Guid.NewGuid()}");
var conf = builder.ConsumerConfigurationData;
var consumer = await _client.NewConsumerAsync(builder);
var unack = _client.ActorSystem.ActorOf(UnAckedChunckedMessageIdSequenceMap.Prop());
var tracker = _client.ActorSystem.ActorOf(PersistentAcknowledgmentsGroupingTracker<byte[]>.Prop(unack, consumer.ConsumerActor, consumer.ConsumerActor/*dummy*/, 1, consumer.ConsumerActor, conf));

var msg1 = new MessageId(5, 1, 0);
var msg2 = new MessageId(5, 2, 0);
var msg1 = new MessageIdAdv(5, 1, 0);
var msg2 = new MessageIdAdv(5, 2, 0);

var isDuplicate = await tracker.Ask<bool>(new IsDuplicate(msg1));
Assert.False(isDuplicate);
Expand Down Expand Up @@ -178,12 +179,12 @@ public async Task TestAckTrackerMultiAck()
var unack = _client.ActorSystem.ActorOf(UnAckedChunckedMessageIdSequenceMap.Prop());
var tracker = _client.ActorSystem.ActorOf(PersistentAcknowledgmentsGroupingTracker<byte[]>.Prop(unack, consumer.ConsumerActor, consumer.ConsumerActor/*dummy*/, 1, consumer.ConsumerActor, conf));

var msg1 = new MessageId(5, 1, 0);
var msg2 = new MessageId(5, 2, 0);
var msg3 = new MessageId(5, 3, 0);
var msg4 = new MessageId(5, 4, 0);
var msg5 = new MessageId(5, 5, 0);
var msg6 = new MessageId(5, 6, 0);
var msg1 = new MessageIdAdv(5, 1, 0);
var msg2 = new MessageIdAdv(5, 2, 0);
var msg3 = new MessageIdAdv(5, 3, 0);
var msg4 = new MessageIdAdv(5, 4, 0);
var msg5 = new MessageIdAdv(5, 5, 0);
var msg6 = new MessageIdAdv(5, 6, 0);

var isDuplicate = await tracker.Ask<bool>(new IsDuplicate(msg1));
Assert.False(isDuplicate);
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test/ConsumerRedeliveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ public async Task TestOrderedRedelivery(bool ackReceiptEnabled)
await consumer1.RedeliverUnacknowledgedMessagesAsync(messageIds);
_output.WriteLine($"MessageIds: [{messageIds.Count}]");
//await Task.Delay(1000);
MessageId lastMsgId = null;
MessageIdAdv lastMsgId = null;
var count = 1;
for (var i = 0; i < totalMsgs / 2; i++)
{
var message = (Message<byte[]>)await consumer1.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if (message != null)
{
var msgId = (MessageId)message.MessageId;
var msgId = (MessageIdAdv)message.MessageId;
if (lastMsgId != null)
{
Assert.True(lastMsgId.LedgerId <= msgId.LedgerId, "lastMsgId: " + lastMsgId + " -- msgId: " + msgId);
Expand Down
6 changes: 3 additions & 3 deletions src/SharpPulsar.Test/EventSourceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,16 @@ public virtual async Task ReaderSourceTaggedTest()

Assert.True(receivedCount > 0);
}
private async Task<ISet<MessageId>> PublishMessages(string topic, int count)
private async Task<ISet<MessageIdAdv>> PublishMessages(string topic, int count)
{
var ids = new HashSet<MessageId>();
var ids = new HashSet<MessageIdAdv>();
var builder = new ProducerConfigBuilder<DataOpEx>()
.Topic(topic);
var producer = await _client.NewProducerAsync(AvroSchema<DataOpEx>.Of(typeof(DataOpEx)), builder);
for (var i = 0; i < count; i++)
{
var key = "key" + i;
MessageId id = null;
MessageIdAdv id = null;
if (i % 2 == 0)
id = await producer.NewMessage().Key(key).Property("twitter", "mestical").Value(new DataOpEx { Text = "my-event-message-" + i, EventTime = DateTimeHelper.CurrentUnixTimeMillis() }).SendAsync();
else
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test/MultiTopicsConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public async Task TestMultiTopicConsumer()
await consumer.CloseAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
}

private async Task<List<MessageId>> PublishMessages(string topic, int count, string message)
private async Task<List<MessageIdAdv>> PublishMessages(string topic, int count, string message)
{
var keys = new List<MessageId>();
var keys = new List<MessageIdAdv>();
var builder = new ProducerConfigBuilder<byte[]>()
.Topic(topic);
var producer = await _client.NewProducerAsync(builder);
Expand Down
5 changes: 3 additions & 2 deletions src/SharpPulsar.Test/NegativeAcksTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task TestAddAndRemove()
var size = await tracker.Ask<long>(Size.Instance);//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
Assert.Equal(0, size);

var mid = new MessageId(1L, 1L, -1);
var mid = new MessageIdAdv(1L, 1L, -1);
var added = await tracker.Ask<bool>(new Add(mid));//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
Assert.True(added);
added = await tracker.Ask<bool>(new Add(mid));//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
Expand Down Expand Up @@ -172,7 +172,8 @@ private async Task TestNegativeAcks(bool batching, bool usePartition, CommandSub
// There should be no more messages
//Assert.Null(nu);
await producer.CloseAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.CloseAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.UnsubscribeAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.CloseAsync();
}
public async Task InitializeAsync()
{
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar.Test/ZeroQueueSizeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public async Task TestPauseAndResume()
//await Task.Delay(10000);
//Assert.True(latch.Value.Wait(TimeSpan.FromSeconds(2)), "Timed out waiting for message listener acks");

//await consumer.UnsubscribeAsync();//;//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030 https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.UnsubscribeAsync();//;//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030 https://xunit.net/xunit.analyzers/rules/xUnit1030
await producer.CloseAsync();//;//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030 https://xunit.net/xunit.analyzers/rules/xUnit1030
}
public async Task InitializeAsync()
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar/Batch/BatchMessageContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ public override ProducerActor<T>.OpSendMsg<T> CreateOpSendMsg()
}
private ReadOnlySequence<byte> SendMessage(long producerId, long sequenceId, int numMessages, IMessageId messageId, MessageMetadata msgMetadata, byte[] compressedPayload)
{
if (messageId is MessageId)
if (messageId is MessageIdAdv)
{
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageId)messageId).LedgerId, ((MessageId)messageId).EntryId, msgMetadata, compressedPayload);
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageIdAdv)messageId).LedgerId, ((MessageIdAdv)messageId).EntryId, msgMetadata, compressedPayload);
}
else
{
Expand Down
10 changes: 5 additions & 5 deletions src/SharpPulsar/Batch/BatchMessageId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/// </summary>
namespace SharpPulsar.Batch
{
public class BatchMessageId : MessageId
public class BatchMessageId : MessageIdAdv
{
private const int NoBatch = -1;
public int BatchIndex { get; }
Expand Down Expand Up @@ -68,7 +68,7 @@ public virtual int CompareTo(object o)
return Compare(other);
}

if (o is MessageId id)
if (o is MessageIdAdv id)
{
int res = base.CompareTo(id);
if (res == 0 && BatchIndex > NoBatch)
Expand Down Expand Up @@ -100,7 +100,7 @@ public override bool Equals(object obj)
return LedgerId == other1.LedgerId && EntryId == other1.EntryId && PartitionIndex == other1.PartitionIndex && BatchIndex == other1.BatchIndex && BatchSize == other1.BatchSize;
}

if (obj is MessageId other)
if (obj is MessageIdAdv other)
{
return LedgerId == other.LedgerId && EntryId == other.EntryId && PartitionIndex == other.PartitionIndex && BatchIndex == NoBatch;
}
Expand Down Expand Up @@ -136,9 +136,9 @@ public virtual bool AckCumulative(int batchsize)

public virtual int BatchSize => Acker.BatchSize;

public virtual MessageId PrevBatchMessageId()
public virtual MessageIdAdv PrevBatchMessageId()
{
return new MessageId(LedgerId, EntryId - 1, PartitionIndex);
return new MessageIdAdv(LedgerId, EntryId - 1, PartitionIndex);
}

private int Compare(BatchMessageId m)
Expand Down
Loading
Loading