From 2b18777060d421c38a41717ed03238ae3245102a Mon Sep 17 00:00:00 2001 From: Michael Paperin Date: Mon, 9 Oct 2023 13:55:12 -0700 Subject: [PATCH] Added envelop mapping for CorrelationId, added checks to NSB interop unit test --- .../InteropTests/NServiceBus/NServiceBusSpecs.cs | 16 +++++++++++----- .../Internal/RabbitMqEndpoint.NServiceBus.cs | 3 ++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/Transports/RabbitMQ/InteropTests/NServiceBus/NServiceBusSpecs.cs b/src/Transports/RabbitMQ/InteropTests/NServiceBus/NServiceBusSpecs.cs index 0ce2804e0..afe9f407e 100644 --- a/src/Transports/RabbitMQ/InteropTests/NServiceBus/NServiceBusSpecs.cs +++ b/src/Transports/RabbitMQ/InteropTests/NServiceBus/NServiceBusSpecs.cs @@ -23,19 +23,21 @@ public async Task nservicebus_sends_message_to_wolverine() ResponseHandler.Received.Clear(); var id = Guid.NewGuid(); - + var messageId = Guid.NewGuid().ToString(); var session = await theFixture.Wolverine.ExecuteAndWaitAsync(async () => { + var options = new PublishOptions(); + options.SetMessageId(messageId); var sender = theFixture.NServiceBus.Services.GetRequiredService(); - await sender.Publish(new ResponseMessage { Id = id }); + await sender.Publish(new ResponseMessage { Id = id }, options); }, 60000); var envelope = ResponseHandler.Received.FirstOrDefault(); envelope.Message.ShouldBeOfType().Id.ShouldBe(id); envelope.ShouldNotBeNull(); - envelope.CorrelationId.ShouldNotBeNull(); + envelope.CorrelationId.ShouldBe(messageId); envelope.Id.ShouldNotBe(Guid.Empty); envelope.ConversationId.ShouldNotBe(Guid.Empty); } @@ -46,12 +48,15 @@ public async Task nservicebus_sends_interface_to_wolverine_who_only_understands_ ResponseHandler.Received.Clear(); var id = Guid.NewGuid(); - + var messageId = Guid.NewGuid().ToString(); var session = await theFixture.Wolverine.ExecuteAndWaitAsync(async () => { + var options = new SendOptions(); + options.SetMessageId(messageId); + options.SetDestination("wolverine"); var sender = theFixture.NServiceBus.Services.GetRequiredService(); - await sender.Send("wolverine", x => x.Id = id ); + await sender.Send(x => x.Id = id, options); }, 60000); var envelope = ResponseHandler.Received.FirstOrDefault(); @@ -59,6 +64,7 @@ public async Task nservicebus_sends_interface_to_wolverine_who_only_understands_ envelope.ShouldNotBeNull(); envelope.CorrelationId.ShouldNotBeNull(); + envelope.CorrelationId.ShouldBe(messageId); envelope.Id.ShouldNotBe(Guid.Empty); envelope.ConversationId.ShouldNotBe(Guid.Empty); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs index 7c685a967..110320e49 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs @@ -14,7 +14,8 @@ public void UseNServiceBusInterop() { m.MapPropertyToHeader(x => x.ConversationId, "NServiceBus.ConversationId"); m.MapPropertyToHeader(x => x.SentAt, "NServiceBus.TimeSent"); - + m.MapPropertyToHeader(x => x.CorrelationId!, "NServiceBus.CorrelationId"); + var replyAddress = new Lazy(() => { var replyEndpoint = (RabbitMqEndpoint)_parent.ReplyEndpoint()!;