From e0282f86536956ed95d0758ac9ef7aacdaa1d048 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 20 Apr 2024 00:24:55 +0800 Subject: [PATCH] Add support for setting `replicateSubscriptionState` for the subscription (#261) * Add support for setting `replicateSubscriptionState` for the subscription * Forward port 8080 for tests --- src/Pulsar.Client/Api/Configuration.fs | 2 ++ src/Pulsar.Client/Api/ConsumerBuilder.fs | 5 ++++ src/Pulsar.Client/Common/Commands.fs | 6 ++-- src/Pulsar.Client/Internal/ConsumerImpl.fs | 2 +- tests/IntegrationTests/Basic.fs | 31 +++++++++++++++++++++ tests/UnitTests/Common/CommandTests.fs | 7 +++-- tests/compose/standalone/docker-compose.yml | 1 + 7 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/Pulsar.Client/Api/Configuration.fs b/src/Pulsar.Client/Api/Configuration.fs index b8c1782a..3c7cf295 100644 --- a/src/Pulsar.Client/Api/Configuration.fs +++ b/src/Pulsar.Client/Api/Configuration.fs @@ -78,6 +78,7 @@ type ConsumerConfiguration<'T> = MaxPendingChunkedMessage: int AutoAckOldestChunkedMessageOnQueueFull: bool ExpireTimeOfIncompleteChunkedMessage: TimeSpan + ReplicateSubscriptionState: bool } member this.SingleTopic with get() = this.Topics |> Seq.head static member Default = @@ -112,6 +113,7 @@ type ConsumerConfiguration<'T> = MaxPendingChunkedMessage = 10 AutoAckOldestChunkedMessageOnQueueFull = false ExpireTimeOfIncompleteChunkedMessage = TimeSpan.FromSeconds(60.0) + ReplicateSubscriptionState = false } type ProducerConfiguration = diff --git a/src/Pulsar.Client/Api/ConsumerBuilder.fs b/src/Pulsar.Client/Api/ConsumerBuilder.fs index 6f2b40b2..1034e0aa 100644 --- a/src/Pulsar.Client/Api/ConsumerBuilder.fs +++ b/src/Pulsar.Client/Api/ConsumerBuilder.fs @@ -261,6 +261,11 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf { config with ConsumerCryptoFailureAction = action } |> this.With + + member this.ReplicateSubscriptionState replicateSubscriptionState = + { config with + ReplicateSubscriptionState = replicateSubscriptionState } + |> this.With member this.SubscribeAsync(): Task> = createConsumerAsync(verify config, schema, consumerInterceptors) diff --git a/src/Pulsar.Client/Common/Commands.fs b/src/Pulsar.Client/Common/Commands.fs index 917d76fd..31d2ad28 100644 --- a/src/Pulsar.Client/Common/Commands.fs +++ b/src/Pulsar.Client/Common/Commands.fs @@ -256,7 +256,8 @@ let newGetTopicsOfNamespaceRequest (ns : NamespaceName) (requestId : RequestId) let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName) (consumerId: ConsumerId) (requestId: RequestId) (consumerName: string) (subscriptionType: SubscriptionType) (subscriptionInitialPosition: SubscriptionInitialPosition) (readCompacted: bool) (startMessageId: MessageIdData) (durable: bool) (startMessageRollbackDuration: TimeSpan) - (createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel) = + (createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel) + (replicateSubscriptionState: bool)= let schema = getProtoSchema schemaInfo let subType = match subscriptionType with @@ -272,7 +273,8 @@ let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName) | _ -> failwith "Unknown initialPosition type" let request = CommandSubscribe(Topic = %topicName, Subscription = %subscription, subType = subType, ConsumerId = %consumerId, ConsumerName = consumerName, RequestId = %requestId, initialPosition = initialPosition, ReadCompacted = readCompacted, - StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel) + StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel, + ReplicateSubscriptionState = replicateSubscriptionState) match keySharedPolicy with | Some keySharedPolicy -> let meta = KeySharedMeta() diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index 580ebe9b..8e98761a 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -844,7 +844,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien consumerId requestId consumerName consumerConfig.SubscriptionType consumerConfig.SubscriptionInitialPosition consumerConfig.ReadCompacted msgIdData isDurable startMessageRollbackDuration createTopicIfDoesNotExist consumerConfig.KeySharedPolicy - schema.SchemaInfo consumerConfig.PriorityLevel + schema.SchemaInfo consumerConfig.PriorityLevel consumerConfig.ReplicateSubscriptionState try let! response = clientCnx.SendAndWaitForReply requestId payload response |> PulsarResponseType.GetEmpty diff --git a/tests/IntegrationTests/Basic.fs b/tests/IntegrationTests/Basic.fs index e42b9ba9..37330523 100644 --- a/tests/IntegrationTests/Basic.fs +++ b/tests/IntegrationTests/Basic.fs @@ -1,6 +1,8 @@ module Pulsar.Client.IntegrationTests.Basic open System +open System.Net.Http +open System.Text.Json open System.Threading open System.Diagnostics @@ -349,6 +351,35 @@ let tests = Log.Debug("Finished 'Scheduled message should be delivered at requested time'") } + + testTask "Create the replicated subscription should be successful" { + Log.Debug("Started 'Create the replicated subscription should be successful'") + let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N") + let consumerName = "replicated-consumer" + let client = getClient() + let! (_ : IConsumer) = + client.NewConsumer() + .Topic(topicName) + .ConsumerName(consumerName) + .SubscriptionName("replicate") + .SubscriptionType(SubscriptionType.Shared) + .ReplicateSubscriptionState(true) + .SubscribeAsync() + + do! Task.Delay 1000 + let getJsonAsync (url: string) = + async { + use httpClient = new HttpClient() + let! response = httpClient.GetStringAsync(url) |> Async.AwaitTask + return JsonDocument.Parse(response) + } + + let url = "http://localhost:8080/admin/v2/persistent/" + topicName + "/stats" + let json = Async.RunSynchronously (getJsonAsync url) + let isReplicated = json.RootElement.GetProperty("subscriptions").GetProperty("replicate").GetProperty("isReplicated").GetBoolean() + Expect.isTrue "" isReplicated + Log.Debug("Finished 'Create the replicated subscription should be successful'") + } #if !NOTLS // Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper diff --git a/tests/UnitTests/Common/CommandTests.fs b/tests/UnitTests/Common/CommandTests.fs index 77b2d150..b75b421c 100644 --- a/tests/UnitTests/Common/CommandTests.fs +++ b/tests/UnitTests/Common/CommandTests.fs @@ -156,15 +156,16 @@ module CommandsTests = let totalSize, commandSize, command = serializeDeserializeSimpleCommand (newSubscribe topicName %"test-subscription" consumerId requestId consumerName - SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel) + SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel false) - totalSize |> Expect.equal "" 70 - commandSize |> Expect.equal "" 66 + totalSize |> Expect.equal "" 72 + commandSize |> Expect.equal "" 68 command.``type`` |> Expect.equal "" CommandType.Subscribe command.Subscribe.Topic |> Expect.equal "" %topicName command.Subscribe.RequestId |> Expect.equal "" %requestId command.Subscribe.ConsumerId |> Expect.equal "" %consumerId command.Subscribe.ConsumerName |> Expect.equal "" %consumerName + command.Subscribe.ReplicateSubscriptionState |> Expect.equal "" %false } test "newFlow should return correct frame" { diff --git a/tests/compose/standalone/docker-compose.yml b/tests/compose/standalone/docker-compose.yml index 1cc07d08..9a9cf828 100644 --- a/tests/compose/standalone/docker-compose.yml +++ b/tests/compose/standalone/docker-compose.yml @@ -10,6 +10,7 @@ services: ports: - "6650:6650" - "2181:2181" + - "8080:8080" command: > bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"