From 0f6339fd8506cb3d7b62d4600b7e7ad2d486d211 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 19 Dec 2023 15:48:03 +0100 Subject: [PATCH 1/5] [FIXES] `ClientSelectedRole`, `ISchema`, `SqlSourceMethod`, `ReaderSourceMethod`, `UnAckedMessageTracker`, `ConsumerActor`, `UnAckedMessageRedeliveryTracker`, `PartitionedProducerActor`, `SchemaType` --- src/SharpPulsar.Trino/Trino/ClientSelectedRole.cs | 2 +- src/SharpPulsar/ConsumerActor.cs | 12 ++++-------- src/SharpPulsar/Events/ReaderSourceMethod.cs | 4 ++-- src/SharpPulsar/Events/SqlSourceMethod.cs | 4 ++-- src/SharpPulsar/Interfaces/ISchema.cs | 6 +++--- src/SharpPulsar/PartitionedProducerActor.cs | 2 +- src/SharpPulsar/Shared/SchemaType.cs | 6 ++++-- .../Tracker/UnAckedMessageRedeliveryTracker.cs | 9 ++------- src/SharpPulsar/Tracker/UnAckedMessageTracker.cs | 2 +- 9 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/SharpPulsar.Trino/Trino/ClientSelectedRole.cs b/src/SharpPulsar.Trino/Trino/ClientSelectedRole.cs index 0ee28bc2c..432b5b125 100644 --- a/src/SharpPulsar.Trino/Trino/ClientSelectedRole.cs +++ b/src/SharpPulsar.Trino/Trino/ClientSelectedRole.cs @@ -32,7 +32,7 @@ public class ClientSelectedRole private readonly string _role; public ClientSelectedRole(Type type, string role) { - if (type == null) + if (type == Type.None) throw new NullReferenceException("type is null"); _role = Condition.RequireNonNull(role, "Role", "role is null"); if (type == Type.Role && !string.IsNullOrWhiteSpace(role)) diff --git a/src/SharpPulsar/ConsumerActor.cs b/src/SharpPulsar/ConsumerActor.cs index ee9c3650f..73773828b 100644 --- a/src/SharpPulsar/ConsumerActor.cs +++ b/src/SharpPulsar/ConsumerActor.cs @@ -2795,7 +2795,7 @@ private bool SeekCheckState(string seekBy) { if (State.ConnectionState == HandlerState.State.Closing || State.ConnectionState == HandlerState.State.Closed) { - throw new PulsarClientException.AlreadyClosedException($"The consumer {ConsumerName} was already closed when seeking the subscription {Subscription} of the topic {_topicName} to {seekBy}"); + throw new AlreadyClosedException($"The consumer {ConsumerName} was already closed when seeking the subscription {Subscription} of the topic {_topicName} to {seekBy}"); } @@ -2909,7 +2909,7 @@ private async ValueTask InternalGetLastMessageIdAsync( if (State.ConnectionState == HandlerState.State.Closing || State.ConnectionState == HandlerState.State.Closed) { - throw new PulsarClientException.AlreadyClosedException($"The consumer {ConsumerName} was already closed when the subscription {Subscription} of the topic {_topicName} getting the last message id"); + throw new AlreadyClosedException($"The consumer {ConsumerName} was already closed when the subscription {Subscription} of the topic {_topicName} getting the last message id"); } var opTimeoutMs = _clientConfigurationData.OperationTimeout; @@ -2943,12 +2943,8 @@ private async ValueTask InternalGetLastMessageIdAsync( { var result = await cnx.Ask(payload); IMessageId lastMessageId; - MessageId markDeletePosition = null; - if (result.MarkDeletePosition != null) - { - markDeletePosition = new MessageId(result.MarkDeletePosition.LedgerId, result.MarkDeletePosition.EntryId, -1); - } - _log.Info($"[{Topic}][{Subscription}] Successfully getLastMessageId {result.LedgerId}:{result.EntryId}"); + var markDeletePosition = new MessageId(result.MarkDeletePosition.LedgerId, result.MarkDeletePosition.EntryId, -1); + _log.Info($"[{Topic}][{Subscription}] Successfully getLastMessageId {result.LedgerId}:{result.EntryId}"); if (result.BatchIndex < 0) { lastMessageId = new MessageId(result.LedgerId, result.EntryId, result.Partition); diff --git a/src/SharpPulsar/Events/ReaderSourceMethod.cs b/src/SharpPulsar/Events/ReaderSourceMethod.cs index f7e63b165..32465bb5c 100644 --- a/src/SharpPulsar/Events/ReaderSourceMethod.cs +++ b/src/SharpPulsar/Events/ReaderSourceMethod.cs @@ -66,7 +66,7 @@ public ReaderSource CurrentEvents() public ReaderSource TaggedEvents(Tag tag) { - if (tag == null) + if (tag == new Tag(tag.Key, tag.Value)) throw new ArgumentException("Tag is null"); var actorName = Regex.Replace(_topic, @"[^\w\d]", ""); @@ -77,7 +77,7 @@ public ReaderSource TaggedEvents(Tag tag) } public ReaderSource CurrentTaggedEvents(Tag tag) { - if (tag == null) + if (tag == new Tag(tag.Key, tag.Value)) throw new ArgumentException("Tag is null"); var actorName = Regex.Replace(_topic, @"[^\w\d]", ""); diff --git a/src/SharpPulsar/Events/SqlSourceMethod.cs b/src/SharpPulsar/Events/SqlSourceMethod.cs index 14acd2ace..9eea672b4 100644 --- a/src/SharpPulsar/Events/SqlSourceMethod.cs +++ b/src/SharpPulsar/Events/SqlSourceMethod.cs @@ -48,7 +48,7 @@ public SqlSource CurrentEvents() public SqlSource CurrentTaggedEvents(Tag tag) { - if (tag == null) + if (tag == new Tag(tag.Key, tag.Value)) throw new ArgumentException("Tag is null"); var buffer = new BufferBlock(); @@ -71,7 +71,7 @@ public SqlSource Events() public SqlSource TaggedEvents(Tag tag) { - if (tag == null) + if (tag == new Tag(tag.Key, tag.Value)) throw new ArgumentException("Tag is null"); var buffer = new BufferBlock(); diff --git a/src/SharpPulsar/Interfaces/ISchema.cs b/src/SharpPulsar/Interfaces/ISchema.cs index e9951f3d2..ff6e5a80a 100644 --- a/src/SharpPulsar/Interfaces/ISchema.cs +++ b/src/SharpPulsar/Interfaces/ISchema.cs @@ -237,9 +237,9 @@ static ISchema Avro(ISchemaDefinition schemaDefinition) /// /// the POJO class to be used to extract the JSON schema /// a Schema instance - static ISchema Json(Type pojo) + static ISchema Json(Type pojo) { - return DefaultImplementation.NewJsonSchema(ISchemaDefinition.Builder().WithPojo(pojo).Build()); + return DefaultImplementation.NewJsonSchema(ISchemaDefinition.Builder().WithPojo(pojo).Build()); } /// @@ -247,7 +247,7 @@ static ISchema Json(Type pojo) /// /// the definition of the schema /// a Schema instance - static ISchema Json(ISchemaDefinition schemaDefinition) + static ISchema Json(ISchemaDefinition schemaDefinition) { return DefaultImplementation.NewJsonSchema(schemaDefinition); } diff --git a/src/SharpPulsar/PartitionedProducerActor.cs b/src/SharpPulsar/PartitionedProducerActor.cs index 0e59dfe7e..54c9a5e40 100644 --- a/src/SharpPulsar/PartitionedProducerActor.cs +++ b/src/SharpPulsar/PartitionedProducerActor.cs @@ -478,7 +478,7 @@ internal void Run() } // if last auto update not completed yet, do nothing. - if (_partitionsAutoUpdateFuture == null || _partitionsAutoUpdateFuture.IsCompleted) + if (_partitionsAutoUpdateFuture.IsCompleted) { _partitionsAutoUpdateFuture = _topicsPartitionChangedListener.OnTopicsExtended(new List { Topic }); } diff --git a/src/SharpPulsar/Shared/SchemaType.cs b/src/SharpPulsar/Shared/SchemaType.cs index 0bc9cfef4..729ddaaf0 100644 --- a/src/SharpPulsar/Shared/SchemaType.cs +++ b/src/SharpPulsar/Shared/SchemaType.cs @@ -171,7 +171,8 @@ public sealed class SchemaType private static readonly List _valueList = new List(); - static SchemaType() + [System.Obsolete] + static SchemaType() { _valueList.Add(NONE); _valueList.Add(STRING); @@ -254,7 +255,8 @@ public int Value } } - public static SchemaType ValueOf(int Value) + [System.Obsolete] + public static SchemaType ValueOf(int Value) { switch (Value) { diff --git a/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs b/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs index 6898d9827..8e984fb44 100644 --- a/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs +++ b/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs @@ -238,13 +238,8 @@ internal override bool Remove(IMessageId messageId) var messageIdWrapper = _unackMessageIdWrapper.ValueOf(messageId); try { - var removed = false; - var exist = RedeliveryMessageIdPartitionMap.Remove(messageIdWrapper); - if (exist != null) - { - removed = exist; - } - return removed || AckTimeoutMessages.Remove(messageId) != null; + var removed = RedeliveryMessageIdPartitionMap.Remove(messageIdWrapper); + return removed || AckTimeoutMessages.Remove(messageId); } finally { diff --git a/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs b/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs index e98782c9a..a68d58953 100644 --- a/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs +++ b/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs @@ -164,7 +164,7 @@ internal async ValueTask AddChunkedMessageIdsAndRemoveFromSequenceMap(IMessageId if (messageId is MessageId) { var chunkedMsgIds = await Unack.Ask(new UnAckedChunckedMessageIdSequenceMapCmd(UnAckedCommand.Get, new List { messageId })); ; - if (chunkedMsgIds != null && chunkedMsgIds.MessageIds.Length > 0) + if (chunkedMsgIds.MessageIds.Length > 0) { MessageIds = messageIds.Select(m => m).Concat(chunkedMsgIds.MessageIds.Select(c => c)).ToHashSet(); } From 15d676ba1f11202c87da6e4a5242a918b171d6c5 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 19 Dec 2023 16:30:05 +0100 Subject: [PATCH 2/5] [FIX] `Tag is null` --- src/SharpPulsar/Events/ReaderSourceMethod.cs | 4 ++-- src/SharpPulsar/Events/SqlSourceMethod.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/SharpPulsar/Events/ReaderSourceMethod.cs b/src/SharpPulsar/Events/ReaderSourceMethod.cs index 32465bb5c..0eb6ad936 100644 --- a/src/SharpPulsar/Events/ReaderSourceMethod.cs +++ b/src/SharpPulsar/Events/ReaderSourceMethod.cs @@ -66,7 +66,7 @@ public ReaderSource CurrentEvents() public ReaderSource TaggedEvents(Tag tag) { - if (tag == new Tag(tag.Key, tag.Value)) + if (tag.Key == null && tag.Value == null) throw new ArgumentException("Tag is null"); var actorName = Regex.Replace(_topic, @"[^\w\d]", ""); @@ -77,7 +77,7 @@ public ReaderSource TaggedEvents(Tag tag) } public ReaderSource CurrentTaggedEvents(Tag tag) { - if (tag == new Tag(tag.Key, tag.Value)) + if (tag.Key == null && tag.Value == null) throw new ArgumentException("Tag is null"); var actorName = Regex.Replace(_topic, @"[^\w\d]", ""); diff --git a/src/SharpPulsar/Events/SqlSourceMethod.cs b/src/SharpPulsar/Events/SqlSourceMethod.cs index 9eea672b4..bb63736a0 100644 --- a/src/SharpPulsar/Events/SqlSourceMethod.cs +++ b/src/SharpPulsar/Events/SqlSourceMethod.cs @@ -48,7 +48,7 @@ public SqlSource CurrentEvents() public SqlSource CurrentTaggedEvents(Tag tag) { - if (tag == new Tag(tag.Key, tag.Value)) + if (tag.Key == null && tag.Value == null) throw new ArgumentException("Tag is null"); var buffer = new BufferBlock(); @@ -71,7 +71,7 @@ public SqlSource Events() public SqlSource TaggedEvents(Tag tag) { - if (tag == new Tag(tag.Key, tag.Value)) + if (tag.Key == null && tag.Value == null) throw new ArgumentException("Tag is null"); var buffer = new BufferBlock(); From 5973f715a1d7b2a550a1ca12fe4cde5a5b2af1aa Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 19 Dec 2023 16:47:24 +0100 Subject: [PATCH 3/5] [FIX] `chunkedMsgIds != null` --- src/SharpPulsar/Tracker/UnAckedMessageTracker.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs b/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs index a68d58953..4031b9bf6 100644 --- a/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs +++ b/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs @@ -163,10 +163,10 @@ internal async ValueTask AddChunkedMessageIdsAndRemoveFromSequenceMap(IMessageId { if (messageId is MessageId) { - var chunkedMsgIds = await Unack.Ask(new UnAckedChunckedMessageIdSequenceMapCmd(UnAckedCommand.Get, new List { messageId })); ; - if (chunkedMsgIds.MessageIds.Length > 0) + UnAckedChunckedMessageIdSequenceMapCmdResponse? chunkedMsgIds = await Unack.Ask(new UnAckedChunckedMessageIdSequenceMapCmd(UnAckedCommand.Get, new List { messageId })); ; + if (chunkedMsgIds != null && chunkedMsgIds?.MessageIds.Length > 0) { - MessageIds = messageIds.Select(m => m).Concat(chunkedMsgIds.MessageIds.Select(c => c)).ToHashSet(); + MessageIds = messageIds.Select(m => m).Concat(chunkedMsgIds?.MessageIds.Select(c => c)).ToHashSet(); } Unack.Tell(new UnAckedChunckedMessageIdSequenceMapCmd(UnAckedCommand.Remove, new List { messageId })); } From ede7994e85517731a6b0746904a7ceb0deab1927 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 20 Dec 2023 15:23:23 +0100 Subject: [PATCH 4/5] more fixes --- .../Schema/AvroSchemaTest.cs | 40 +++++++++--------- src/SharpPulsar.Test.API/SchemaTestUtils.cs | 42 +++++++++---------- src/SharpPulsar.Test.Token/TokenTests.cs | 2 +- .../PulsarFixture.cs | 22 +++++----- .../PulsarOAuthFixture.cs | 25 +++++------ .../PulsarTokenFixture.cs | 24 +++++------ src/SharpPulsar/ClientCnx.cs | 3 +- src/SharpPulsar/ConsumerActor.cs | 2 +- .../Exceptions/PulsarClientException.cs | 3 -- src/SharpPulsar/MessageRouterBase.cs | 2 +- .../ControlledClusterFailoverActor.cs | 10 +++-- src/SharpPulsar/Shared/SchemaType.cs | 2 +- .../SocketImpl/SocketClientActor.cs | 3 -- src/SharpPulsar/Table/TableViewActor.cs | 1 - .../UnAckedMessageRedeliveryTracker.cs | 4 +- .../TransactionImpl/TransactionActor.cs | 2 +- 16 files changed, 92 insertions(+), 95 deletions(-) diff --git a/src/SharpPulsar.Test.API/Schema/AvroSchemaTest.cs b/src/SharpPulsar.Test.API/Schema/AvroSchemaTest.cs index 647ae7abb..6a9706de5 100644 --- a/src/SharpPulsar.Test.API/Schema/AvroSchemaTest.cs +++ b/src/SharpPulsar.Test.API/Schema/AvroSchemaTest.cs @@ -33,15 +33,15 @@ public class AvroSchemaTest { private class DefaultStruct { - public int Field1 { get; set; } - public string Field2 { get; set; } + public int? Field1 { get; set; } + public string? Field2 { get; set; } public long? Field3 { get; set; } } private class StructWithAnnotations { - public int Field1 { get; set; } - public string Field2 { get; set; } + public int? Field1 { get; set; } + public string? Field2 { get; set; } public long? Field3 { get; set; } } @@ -53,10 +53,11 @@ private class SchemaLogicalType : IEquatable public long TimeMillis { get; set; } public long TimestampMicros { get; set; } public long TimeMicros { get; set; } - public bool Equals(SchemaLogicalType other) + public bool Equals(SchemaLogicalType? other) { - if (Decimal == other.Decimal && Date == other.Date && TimestampMillis == other.TimestampMillis - && TimeMillis == other.TimeMillis && TimestampMicros == other.TimestampMicros && TimeMicros == other.TimeMicros) + if (Decimal == other?.Decimal && Date == other?.Date && TimestampMillis == other?.TimestampMillis + && TimeMillis == other?.TimeMillis && TimestampMicros == other?.TimestampMicros + && TimeMicros == other?.TimeMicros) return true; return false; } @@ -151,9 +152,9 @@ public class Bar : IEquatable { public bool Field1 { get; set; } - public bool Equals(Bar other) + public bool Equals(Bar? other) { - if (Field1 == other.Field1) + if (Field1 == other?.Field1) return true; return false; } @@ -162,19 +163,20 @@ public bool Equals(Bar other) public class Foo : IEquatable { public Color Color { get; set; } - public string Field1 { get; set; } - public string Field2 { get; set; } - public string Field3 { get; set; } - public Bar Field4 { get; set; } - public string Field5 { get; set; } + public string? Field1 { get; set; } + public string? Field2 { get; set; } + public string? Field3 { get; set; } + public Bar? Field4 { get; set; } + public string? Field5 { get; set; } - public bool Equals(Foo other) + public bool Equals(Foo? other) { - if (Field1 == other.Field1 && Field2 == other.Field2 && Field3 == other.Field3 - && Field4?.Field1 == other.Field4?.Field1 && Field5 == other.Field5) + if (Field1 == other?.Field1 && Field2 == other?.Field2 && Field3 == other?.Field3 + && Field4?.Field1 == other?.Field4?.Field1 && Field5 == other?.Field5) return true; return false; } + } [Serializable] public enum Color @@ -189,10 +191,10 @@ public class LogicalMessage : ISpecificRecord public DateTime CreatedTime { get; set; } public AvroDecimal Size { get; set; } - public string DayOfWeek { get; set; } + public required string DayOfWeek { get; set; } [Ignore] - public Avro.Schema Schema { get; set; } + public Avro.Schema? Schema { get; set; } public object Get(int fieldPos) { diff --git a/src/SharpPulsar.Test.API/SchemaTestUtils.cs b/src/SharpPulsar.Test.API/SchemaTestUtils.cs index 8d639faa2..6adf8e694 100644 --- a/src/SharpPulsar.Test.API/SchemaTestUtils.cs +++ b/src/SharpPulsar.Test.API/SchemaTestUtils.cs @@ -29,19 +29,19 @@ public class SchemaTestUtils public class Foo : IEquatable { - public string Field1 { get; set; } - public string Field2 { get; set; } + public string? Field1 { get; set; } + public string? Field2 { get; set; } public int Field3 { get; set; } - public Bar Field4 { get; set; } + public Bar? Field4 { get; set; } public Color Color { get; set; } - public string FieldUnableNull { get; set; } - public bool Equals(Foo other) + public string? FieldUnableNull { get; set; } + public bool Equals(Foo? other) { - if (Field1 == other.Field1 && Field2 == other.Field2 && Field3 == other.Field3 - && Field4?.Field1 == other.Field4?.Field1 && Color == other.Color && FieldUnableNull == other.FieldUnableNull) + if (Field1 == other?.Field1 && Field2 == other?.Field2 && Field3 == other?.Field3 + && Field4?.Field1 == other?.Field4?.Field1 && Color == other?.Color && FieldUnableNull == other?.FieldUnableNull) return true; return false; } @@ -50,11 +50,11 @@ public bool Equals(Foo other) [Serializable] public class FooV2 : IEquatable { - public string Field1 { get; set; } + public string? Field1 { get; set; } public int Field3 { get; set; } - public bool Equals(FooV2 other) + public bool Equals(FooV2? other) { - return Field1 == other.Field1 && Field3 == other.Field3; + return Field1 == other?.Field1 && Field3 == other?.Field3; } } @@ -64,9 +64,9 @@ public class Bar : IEquatable { public bool Field1 { get; set; } - public bool Equals(Bar other) + public bool Equals(Bar? other) { - if (Field1 == other.Field1) + if (Field1 == other?.Field1) return true; return false; } @@ -75,25 +75,25 @@ public bool Equals(Bar other) public class NestedBar { public bool Field1 { get; set; } - public Bar Nested { get; set; } + public Bar? Nested { get; set; } } public class NestedBarList { public bool Field1 { get; set; } - public IList List { get; set; } + public IList? List { get; set; } } public class DerivedFoo : Foo, IEquatable { - public string Field5 { get; set; } + public string? Field5 { get; set; } public int Field6 { get; set; } - public Foo Foo { get; set; } - public bool Equals(DerivedFoo other) + public Foo? Foo { get; set; } + public bool Equals(DerivedFoo? other) { - return Field5 == other.Field5 && Field6 == other.Field6 && Foo == other.Foo; + return Field5 == other?.Field5 && Field6 == other?.Field6 && Foo == other.Foo; } } @@ -105,10 +105,10 @@ public enum Color public class DerivedDerivedFoo : DerivedFoo { - public string Field7 { get; set; } + public string? Field7 { get; set; } public int Field8 { get; set; } - public DerivedFoo DerivedFoo { get; set; } - public Foo Foo2 { get; set; } + public DerivedFoo? DerivedFoo { get; set; } + public Foo? Foo2 { get; set; } } public const string SchemaAvroNotAllowNull = @"{""type"":""record"",""name"":""Foo"",""namespace"":""org.apache.pulsar.client.impl.schema.SchemaTestUtils"",""fields"":[{""name"":""field1"",""type"":[""null"",""string""]," + @"""default"":null},{""name"":""field2"",""type"":[""null"",""string""],""default"":null},{""name"":""field3"",""type"":""int""},{""name"":""field4"",""type"":[""null"",{""type"":" + @"""record"",""name"":""Bar"",""fields"":[{""name"":""field1"",""type"":""boolean""}]}],""default"":null},{""name"":""color"",""type"":[""null"",{""type"":""enum"",""name"":""Color""," + @"""symbols"":[""RED"",""BLUE""]}],""default"":null},{""name"":""fieldUnableNull"",""type"":""string"",""default"":""defaultValue""}]}"; diff --git a/src/SharpPulsar.Test.Token/TokenTests.cs b/src/SharpPulsar.Test.Token/TokenTests.cs index e46a59704..4acdc02cf 100644 --- a/src/SharpPulsar.Test.Token/TokenTests.cs +++ b/src/SharpPulsar.Test.Token/TokenTests.cs @@ -17,7 +17,7 @@ public class TokenTests public TokenTests(ITestOutputHelper output, PulsarTokenFixture fixture) { _output = output; - _client = fixture.PulsarSystem.NewClient(fixture.ConfigBuilder).AsTask().GetAwaiter().GetResult(); + _client = fixture.PulsarSystem?.NewClient(fixture.ConfigBuilder).AsTask().GetAwaiter().GetResult()!; _topic = $"persistent://public/default/token-{Guid.NewGuid()}"; } diff --git a/src/SharpPulsar.TestContainer/PulsarFixture.cs b/src/SharpPulsar.TestContainer/PulsarFixture.cs index 4e370c1d3..2edf6a359 100644 --- a/src/SharpPulsar.TestContainer/PulsarFixture.cs +++ b/src/SharpPulsar.TestContainer/PulsarFixture.cs @@ -8,13 +8,13 @@ namespace SharpPulsar.TestContainer { public class PulsarFixture //: IAsyncLifetime { - public PulsarSystem System; + public PulsarSystem? System; private readonly IConfiguration _configuration; - public PulsarClientConfigBuilder ConfigBuilder; - public ClientConfigurationData ClientConfigurationData; + public PulsarClientConfigBuilder? ConfigBuilder; + public ClientConfigurationData? ClientConfigurationData; public PulsarFixture() { - var path = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + var path = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!; _configuration = GetIConfigurationRoot(path); SetupSystem(); } @@ -37,13 +37,13 @@ public virtual void SetupSystem(string? service = null, string? web = null) var authPluginClassName = clienConfigSetting.GetSection("authPluginClassName").Value; var authParamsString = clienConfigSetting.GetSection("authParamsString").Value; var authCertPath = clienConfigSetting.GetSection("authCertPath").Value; - var connectionsPerBroker = int.Parse(clienConfigSetting.GetSection("connections-per-broker").Value); - var statsInterval = TimeSpan.Parse(clienConfigSetting.GetSection("stats-interval").Value); - var operationTime = int.Parse(clienConfigSetting.GetSection("operationTime").Value); - var allowTlsInsecureConnection = bool.Parse(clienConfigSetting.GetSection("allowTlsInsecureConnection").Value); - var enableTls = bool.Parse(clienConfigSetting.GetSection("enableTls").Value); - var enableTxn = bool.Parse(clienConfigSetting.GetSection("enableTransaction").Value); - var dedicatedConnection = bool.Parse(clienConfigSetting.GetSection("userDedicatedConnection").Value); + var connectionsPerBroker = int.Parse(clienConfigSetting.GetSection("connections-per-broker").Value!); + var statsInterval = TimeSpan.Parse(clienConfigSetting.GetSection("stats-interval").Value!); + var operationTime = int.Parse(clienConfigSetting.GetSection("operationTime").Value!); + var allowTlsInsecureConnection = bool.Parse(clienConfigSetting.GetSection("allowTlsInsecureConnection").Value!); + var enableTls = bool.Parse(clienConfigSetting.GetSection("enableTls").Value!); + var enableTxn = bool.Parse(clienConfigSetting.GetSection("enableTransaction").Value!); + var dedicatedConnection = bool.Parse(clienConfigSetting.GetSection("userDedicatedConnection").Value!); client.EnableTransaction(enableTxn); diff --git a/src/SharpPulsar.TestContainer/PulsarOAuthFixture.cs b/src/SharpPulsar.TestContainer/PulsarOAuthFixture.cs index f9a87c47d..a831ab906 100644 --- a/src/SharpPulsar.TestContainer/PulsarOAuthFixture.cs +++ b/src/SharpPulsar.TestContainer/PulsarOAuthFixture.cs @@ -1,5 +1,6 @@ using System.Reflection; using System.Security.Cryptography.X509Certificates; +using Ductus.FluentDocker.Commands; using Microsoft.Extensions.Configuration; using SharpPulsar.Auth.OAuth2; using SharpPulsar.Builder; @@ -11,14 +12,14 @@ namespace SharpPulsar.TestContainer { public class PulsarOAuthFixture : IAsyncLifetime { - public PulsarClient Client; - public PulsarSystem PulsarSystem; + public PulsarClient? Client; + public PulsarSystem? PulsarSystem; private readonly IConfiguration _configuration; - public ClientConfigurationData ClientConfigurationData; + public ClientConfigurationData? ClientConfigurationData; public PulsarOAuthFixture() { - var path = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + var path = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!; _configuration = GetIConfigurationRoot(path); Container = BuildContainer(); } @@ -62,13 +63,13 @@ public virtual async ValueTask SetupSystem(string? service = null, string? web = var authPluginClassName = clienConfigSetting.GetSection("authPluginClassName").Value; var authParamsString = clienConfigSetting.GetSection("authParamsString").Value; var authCertPath = clienConfigSetting.GetSection("authCertPath").Value; - var connectionsPerBroker = int.Parse(clienConfigSetting.GetSection("connections-per-broker").Value); - var statsInterval = TimeSpan.Parse(clienConfigSetting.GetSection("stats-interval").Value); - var operationTime = int.Parse(clienConfigSetting.GetSection("operationTime").Value); - var allowTlsInsecureConnection = bool.Parse(clienConfigSetting.GetSection("allowTlsInsecureConnection").Value); - var enableTls = bool.Parse(clienConfigSetting.GetSection("enableTls").Value); - var enableTxn = bool.Parse(clienConfigSetting.GetSection("enableTransaction").Value); - var dedicatedConnection = bool.Parse(clienConfigSetting.GetSection("userDedicatedConnection").Value); + var connectionsPerBroker = int.Parse(clienConfigSetting.GetSection("connections-per-broker").Value!); + var statsInterval = TimeSpan.Parse(clienConfigSetting.GetSection("stats-interval").Value!); + var operationTime = int.Parse(clienConfigSetting.GetSection("operationTime").Value!); + var allowTlsInsecureConnection = bool.Parse(clienConfigSetting.GetSection("allowTlsInsecureConnection").Value!); + var enableTls = bool.Parse(clienConfigSetting.GetSection("enableTls").Value!); + var enableTxn = bool.Parse(clienConfigSetting.GetSection("enableTransaction").Value!); + var dedicatedConnection = bool.Parse(clienConfigSetting.GetSection("userDedicatedConnection").Value!); client.EnableTransaction(enableTxn); @@ -137,7 +138,7 @@ private string GetConfigFilePath() { var configFolderName = "Oauth2Files"; var privateKeyFileName = "o-r7y4o-eabanonu.json"; - var startup = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + var startup = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!; var indexOfConfigDir = startup.IndexOf(startup, StringComparison.Ordinal); var examplesFolder = startup.Substring(0, startup.Length - indexOfConfigDir); var configFolder = Path.Combine(examplesFolder, configFolderName); diff --git a/src/SharpPulsar.TestContainer/PulsarTokenFixture.cs b/src/SharpPulsar.TestContainer/PulsarTokenFixture.cs index 3ef937f4c..32ceadcd1 100644 --- a/src/SharpPulsar.TestContainer/PulsarTokenFixture.cs +++ b/src/SharpPulsar.TestContainer/PulsarTokenFixture.cs @@ -18,16 +18,16 @@ public class PulsarTokenFixture : IAsyncLifetime private const string SecretKeyPath = "/pulsar/secret.key"; private const string UserName = "test-user"; private const int Port = 6650; - public PulsarSystem PulsarSystem; - public ClientConfigurationData ClientConfigurationData; - public PulsarClientConfigBuilder ConfigBuilder; - public string Token; + public PulsarSystem? PulsarSystem; + public ClientConfigurationData? ClientConfigurationData; + public PulsarClientConfigBuilder? ConfigBuilder; + public string? Token; private readonly IConfiguration _configuration; private readonly IMessageSink _messageSink; private readonly IContainerService _cluster; public PulsarTokenFixture(IMessageSink messageSink) { - var path = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + var path = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!; _configuration = GetIConfigurationRoot(path); _messageSink = messageSink; @@ -97,13 +97,13 @@ public virtual void SetupSystem() var authPluginClassName = clienConfigSetting.GetSection("authPluginClassName").Value; var authParamsString = clienConfigSetting.GetSection("authParamsString").Value; var authCertPath = clienConfigSetting.GetSection("authCertPath").Value; - var connectionsPerBroker = int.Parse(clienConfigSetting.GetSection("connections-per-broker").Value); - var statsInterval = TimeSpan.Parse(clienConfigSetting.GetSection("stats-interval").Value); - var operationTime = int.Parse(clienConfigSetting.GetSection("operationTime").Value); - var allowTlsInsecureConnection = bool.Parse(clienConfigSetting.GetSection("allowTlsInsecureConnection").Value); - var enableTls = bool.Parse(clienConfigSetting.GetSection("enableTls").Value); - var enableTxn = bool.Parse(clienConfigSetting.GetSection("enableTransaction").Value); - var dedicatedConnection = bool.Parse(clienConfigSetting.GetSection("userDedicatedConnection").Value); + var connectionsPerBroker = int.Parse(clienConfigSetting.GetSection("connections-per-broker").Value!); + var statsInterval = TimeSpan.Parse(clienConfigSetting.GetSection("stats-interval").Value!); + var operationTime = int.Parse(clienConfigSetting.GetSection("operationTime").Value!); + var allowTlsInsecureConnection = bool.Parse(clienConfigSetting.GetSection("allowTlsInsecureConnection").Value!); + var enableTls = bool.Parse(clienConfigSetting.GetSection("enableTls").Value!); + var enableTxn = bool.Parse(clienConfigSetting.GetSection("enableTransaction").Value!); + var dedicatedConnection = bool.Parse(clienConfigSetting.GetSection("userDedicatedConnection").Value!); client.EnableTransaction(enableTxn); diff --git a/src/SharpPulsar/ClientCnx.cs b/src/SharpPulsar/ClientCnx.cs index 5b6de4ed7..46070b08d 100644 --- a/src/SharpPulsar/ClientCnx.cs +++ b/src/SharpPulsar/ClientCnx.cs @@ -67,10 +67,9 @@ internal sealed class ClientCnx : ReceiveActor, IWithUnboundedStash private ICancelable _timeoutTask; - private readonly ICancelable _sendPing; + private readonly ICancelable _sendPing = default; private readonly IActorRef _parent; private readonly IScheduler _scheduler; - private IDisposable _subscriber; // Added for mutual authentication. private IAuthenticationDataProvider _authenticationDataProvider; diff --git a/src/SharpPulsar/ConsumerActor.cs b/src/SharpPulsar/ConsumerActor.cs index 73773828b..888c6e657 100644 --- a/src/SharpPulsar/ConsumerActor.cs +++ b/src/SharpPulsar/ConsumerActor.cs @@ -116,7 +116,7 @@ internal class ConsumerActor : ConsumerActorBase private readonly bool _readCompacted; private readonly bool _resetIncludeHead; - private readonly bool _poolMessages = false; + //private readonly bool _poolMessages = false; private readonly ActorSystem _actorSystem; diff --git a/src/SharpPulsar/Exceptions/PulsarClientException.cs b/src/SharpPulsar/Exceptions/PulsarClientException.cs index 82996d579..2a4c9a299 100644 --- a/src/SharpPulsar/Exceptions/PulsarClientException.cs +++ b/src/SharpPulsar/Exceptions/PulsarClientException.cs @@ -1266,9 +1266,6 @@ public RuntimeException(string message, Exception innerException) : base(message { } - protected RuntimeException(SerializationInfo info, StreamingContext context) : base(info, context) - { - } } } } \ No newline at end of file diff --git a/src/SharpPulsar/MessageRouterBase.cs b/src/SharpPulsar/MessageRouterBase.cs index 7a4adcb96..07e27cb50 100644 --- a/src/SharpPulsar/MessageRouterBase.cs +++ b/src/SharpPulsar/MessageRouterBase.cs @@ -10,7 +10,7 @@ internal abstract class MessageRouterBase : IMessageRouter public abstract int ChoosePartition(IMessage msg, TopicMetadata metadata); public abstract int ChoosePartition(IMessage msg); - protected internal readonly IHash Hash; + protected internal readonly IHash Hash = default; internal MessageRouterBase(HashingScheme hashingScheme) { diff --git a/src/SharpPulsar/ServiceProvider/ControlledClusterFailoverActor.cs b/src/SharpPulsar/ServiceProvider/ControlledClusterFailoverActor.cs index 444121d08..58ef99fd3 100644 --- a/src/SharpPulsar/ServiceProvider/ControlledClusterFailoverActor.cs +++ b/src/SharpPulsar/ServiceProvider/ControlledClusterFailoverActor.cs @@ -77,6 +77,7 @@ private HttpClient BuildHttpClient() var handler = new HttpClientHandler { AllowAutoRedirect = true, + UseProxy = true, MaxAutomaticRedirections = DefaultMaxRedirects }; var client = new HttpClient(handler) @@ -119,6 +120,7 @@ public virtual void Initialize(PulsarClient client) pulsarClient.UpdateTlsTrustCertsFilePath(tlsTrustCertsFilePath); } pulsarClient.UpdateServiceUrl(ServiceUrl); + pulsarClient.ReloadLookUp(); _currentPulsarServiceUrl = ServiceUrl; currentControlledConfiguration = controlledConfiguration; } @@ -154,11 +156,11 @@ protected internal virtual ControlledConfiguration FetchControlledConfiguration( protected internal class ControlledConfiguration { - internal string ServiceUrl; - internal string TlsTrustCertsFilePath; + internal string ServiceUrl = default; + internal string TlsTrustCertsFilePath = default; - internal string AuthPluginClassName; - internal string AuthParamsString; + internal string AuthPluginClassName = default; + internal string AuthParamsString = default; public virtual string ToJson() { diff --git a/src/SharpPulsar/Shared/SchemaType.cs b/src/SharpPulsar/Shared/SchemaType.cs index 729ddaaf0..7e249ecbc 100644 --- a/src/SharpPulsar/Shared/SchemaType.cs +++ b/src/SharpPulsar/Shared/SchemaType.cs @@ -255,7 +255,7 @@ public int Value } } - [System.Obsolete] + public static SchemaType ValueOf(int Value) { switch (Value) diff --git a/src/SharpPulsar/SocketImpl/SocketClientActor.cs b/src/SharpPulsar/SocketImpl/SocketClientActor.cs index a40c52a52..154a7bb18 100644 --- a/src/SharpPulsar/SocketImpl/SocketClientActor.cs +++ b/src/SharpPulsar/SocketImpl/SocketClientActor.cs @@ -45,9 +45,6 @@ internal sealed class SocketClientActor : ReceiveActor private ChunkingPipeline _pipeline; - public event Action OnConnect; - public event Action OnDisconnect; - private PipeReader _pipeReader; private PipeWriter _pipeWriter; diff --git a/src/SharpPulsar/Table/TableViewActor.cs b/src/SharpPulsar/Table/TableViewActor.cs index db7460036..1240fe4e2 100644 --- a/src/SharpPulsar/Table/TableViewActor.cs +++ b/src/SharpPulsar/Table/TableViewActor.cs @@ -49,7 +49,6 @@ internal class TableViewActor : ReceiveActor private readonly IList> _listeners; private IUntypedActorContext _context; private ILoggingAdapter _log; - private ICancelable _partitionChecker; private readonly IActorRef _self; private IActorRef _replyTo; private bool _isPersistentTopic; diff --git a/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs b/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs index 8e984fb44..c1e47d488 100644 --- a/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs +++ b/src/SharpPulsar/Tracker/UnAckedMessageRedeliveryTracker.cs @@ -30,11 +30,11 @@ namespace SharpPulsar.Tracker internal class UnAckedMessageRedeliveryTracker : UnAckedMessageTracker { - private readonly ILoggingAdapter _log; + private readonly ILoggingAdapter _log = default; private ICancelable _timeout; protected internal readonly Dictionary> RedeliveryMessageIdPartitionMap; protected internal readonly ArrayDeque> RedeliveryTimePartitions; - private readonly IScheduler _scheduler; + private readonly IScheduler _scheduler = default; protected internal readonly Dictionary AckTimeoutMessages; private readonly IRedeliveryBackoff _ackTimeoutRedeliveryBackoff; private UnackMessageIdWrapper _unackMessageIdWrapper; diff --git a/src/SharpPulsar/TransactionImpl/TransactionActor.cs b/src/SharpPulsar/TransactionImpl/TransactionActor.cs index 0c9f79112..9446762c9 100644 --- a/src/SharpPulsar/TransactionImpl/TransactionActor.cs +++ b/src/SharpPulsar/TransactionImpl/TransactionActor.cs @@ -58,7 +58,7 @@ internal class TransactionActor : ReceiveActor, IWithUnboundedStash private readonly Dictionary> _registerSubscriptionMap; private IActorRef _tcClient; //TransactionCoordinatorClientImpl private readonly List _sendList; - private ICancelable _timeout = null; + //private ICancelable _timeout = null; public IStash Stash { get; set; } private readonly TxnID _txnId; private TaskCompletionSource _opFuture; From 36c7ee789f7e1793dea66623dd27ee9d3bbc3e39 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 20 Dec 2023 18:11:56 +0100 Subject: [PATCH 5/5] more fixes --- .../BatchMessageIdTest.cs | 4 ++-- src/SharpPulsar.Test.API/CipherHelper.cs | 4 ++-- src/SharpPulsar.Test.API/RangeTest.cs | 4 +++- src/SharpPulsar/BackoffBuilder.cs | 8 +++---- src/SharpPulsar/ConsumerActorBase.cs | 8 +++---- src/SharpPulsar/MessageId.cs | 4 ++-- src/SharpPulsar/ProducerActor.cs | 2 +- .../Schema/LongSchemaVersionResponse.cs | 2 +- src/SharpPulsar/SharpPulsar.csproj | 21 ++++++++++++------- .../TransactionImpl/TransactionActor.cs | 1 - 10 files changed, 33 insertions(+), 25 deletions(-) diff --git a/src/SharpPulsar.Test.API/BatchMessageIdTest.cs b/src/SharpPulsar.Test.API/BatchMessageIdTest.cs index 587d5189a..ca639709c 100644 --- a/src/SharpPulsar.Test.API/BatchMessageIdTest.cs +++ b/src/SharpPulsar.Test.API/BatchMessageIdTest.cs @@ -79,7 +79,7 @@ public void DeserializationTest() try { - var d = JsonSerializer.Serialize(batchMsgId, new JsonSerializerOptions { IgnoreNullValues = false }); + var d = JsonSerializer.Serialize(batchMsgId, new JsonSerializerOptions { }); //Assert.fail("Shouldn't be deserialized"); } catch @@ -93,7 +93,7 @@ public void DeserializationTest() try { - var d = JsonSerializer.Serialize(batchMsgIdToDeserialize, new JsonSerializerOptions { IgnoreNullValues = false }); + var d = JsonSerializer.Serialize(batchMsgIdToDeserialize, new JsonSerializerOptions { }); } catch diff --git a/src/SharpPulsar.Test.API/CipherHelper.cs b/src/SharpPulsar.Test.API/CipherHelper.cs index 6e51b57ef..e7fd5abae 100644 --- a/src/SharpPulsar.Test.API/CipherHelper.cs +++ b/src/SharpPulsar.Test.API/CipherHelper.cs @@ -18,9 +18,9 @@ public CipherHelper(ITestOutputHelper output) public void EncryDecrpt() { var t = "Hello Word!"; - var keyGen = new AesManaged(); + var keyGen = Aes.Create(); var rand = new SecureRandom(); - var ivLen = 256; + //var ivLen = 256; //var iv = 16; var tag = 256; //keyGen.KeySize = tag; diff --git a/src/SharpPulsar.Test.API/RangeTest.cs b/src/SharpPulsar.Test.API/RangeTest.cs index 58127d1e7..d5ece4cb8 100644 --- a/src/SharpPulsar.Test.API/RangeTest.cs +++ b/src/SharpPulsar.Test.API/RangeTest.cs @@ -72,7 +72,9 @@ public void TestInvalid() { var ex = Assert.Throws(() => Range.Of(0, -5)); _output.WriteLine(ex.Message); - Assert.True(ex.Message.Equals("Range end must >= range start.")); + var t = ex.Message.Equals("Range end must >= range start."); + + Assert.True(t); } } diff --git a/src/SharpPulsar/BackoffBuilder.cs b/src/SharpPulsar/BackoffBuilder.cs index dfc81e0dc..2154a6e27 100644 --- a/src/SharpPulsar/BackoffBuilder.cs +++ b/src/SharpPulsar/BackoffBuilder.cs @@ -4,8 +4,8 @@ namespace SharpPulsar { public class BackoffBuilder { - private readonly long _backoffIntervalNanos; - private readonly long _maxBackoffIntervalNanos; + //private readonly long _backoffIntervalNanos; + //private readonly long _maxBackoffIntervalNanos; private TimeSpan _initial; private TimeSpan _max; private readonly DateTimeOffset _clock; @@ -16,8 +16,8 @@ internal BackoffBuilder() _max = TimeSpan.Zero; _mandatoryStop = TimeSpan.Zero; _clock = DateTimeOffset.Now; - _backoffIntervalNanos = 0; - _maxBackoffIntervalNanos = 0; + //_backoffIntervalNanos = 0; + //_maxBackoffIntervalNanos = 0; } public virtual BackoffBuilder SetInitialTime(TimeSpan initial) diff --git a/src/SharpPulsar/ConsumerActorBase.cs b/src/SharpPulsar/ConsumerActorBase.cs index d8684f909..aa6ffed41 100644 --- a/src/SharpPulsar/ConsumerActorBase.cs +++ b/src/SharpPulsar/ConsumerActorBase.cs @@ -270,9 +270,9 @@ protected internal virtual async Task ReconsumeLater(IMessage message, IDicti { ValidateMessageId(message); } - catch (PulsarClientException e) + catch (PulsarClientException) { - throw e; + throw; } await DoReconsumeLater(message, AckType.Individual, customProperties, delayTime).Task; } @@ -284,9 +284,9 @@ protected internal virtual void ReconsumeLater(IMessages messages, TimeSpan d { ValidateMessageId(message); } - catch (PulsarClientException e) + catch (PulsarClientException) { - throw e; + throw; } } try diff --git a/src/SharpPulsar/MessageId.cs b/src/SharpPulsar/MessageId.cs index d14cd3cbe..6319f571f 100644 --- a/src/SharpPulsar/MessageId.cs +++ b/src/SharpPulsar/MessageId.cs @@ -166,9 +166,9 @@ public static MessageId ConvertToMessageId(IMessageId messageId) { return (MessageId)IMessageId.FromByteArray(messageId.ToByteArray()); } - catch (IOException e) + catch (IOException) { - throw e; + throw; } } } diff --git a/src/SharpPulsar/ProducerActor.cs b/src/SharpPulsar/ProducerActor.cs index 76f3c7663..e4982bcd6 100644 --- a/src/SharpPulsar/ProducerActor.cs +++ b/src/SharpPulsar/ProducerActor.cs @@ -1209,7 +1209,7 @@ private byte[] EncryptMessage(MessageMetadata msgMetadata, byte[] compressedPayl _log.Warning($"[{Topic}] [{_producerName}] Failed to encrypt message {e}. Proceeding with publishing unencrypted message"); return compressedPayload; } - throw e; + throw; } return encryptedPayload; } diff --git a/src/SharpPulsar/Protocol/Schema/LongSchemaVersionResponse.cs b/src/SharpPulsar/Protocol/Schema/LongSchemaVersionResponse.cs index 4c29b9188..a4dad83f9 100644 --- a/src/SharpPulsar/Protocol/Schema/LongSchemaVersionResponse.cs +++ b/src/SharpPulsar/Protocol/Schema/LongSchemaVersionResponse.cs @@ -23,7 +23,7 @@ namespace SharpPulsar.Protocol.Schema /// public class LongSchemaVersionResponse { - internal long? Version; + internal long? Version { get; set; } } } \ No newline at end of file diff --git a/src/SharpPulsar/SharpPulsar.csproj b/src/SharpPulsar/SharpPulsar.csproj index 2253648d6..1f63e8b78 100644 --- a/src/SharpPulsar/SharpPulsar.csproj +++ b/src/SharpPulsar/SharpPulsar.csproj @@ -12,18 +12,18 @@ - + - + - - + + - + all @@ -31,8 +31,8 @@ - - + + @@ -76,6 +76,13 @@ + + + + + + + diff --git a/src/SharpPulsar/TransactionImpl/TransactionActor.cs b/src/SharpPulsar/TransactionImpl/TransactionActor.cs index 9446762c9..aa1002f55 100644 --- a/src/SharpPulsar/TransactionImpl/TransactionActor.cs +++ b/src/SharpPulsar/TransactionImpl/TransactionActor.cs @@ -61,7 +61,6 @@ internal class TransactionActor : ReceiveActor, IWithUnboundedStash //private ICancelable _timeout = null; public IStash Stash { get; set; } private readonly TxnID _txnId; - private TaskCompletionSource _opFuture; public TransactionActor(IActorRef client, long transactionTimeoutMs, long txnIdLeastBits, long txnIdMostBits) { _self = Self;