From a6590658940d78e325edb5c338cf456df12e3c07 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 18 Dec 2024 12:21:42 -0600 Subject: [PATCH] Addressed an error with PostgreSQL transport configuration w/o an explicit message store schema configuration in IntegrateWitWolverine(). Closes GH-1175 --- .../Bugs/Bug_1175_schema_name_with_queues.cs | 94 +++++++++++++++++++ .../Wolverine.Marten/MartenIntegration.cs | 2 +- 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs diff --git a/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs b/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs new file mode 100644 index 000000000..9f9bc27c5 --- /dev/null +++ b/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs @@ -0,0 +1,94 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Marten.Storage; +using Microsoft.Extensions.Hosting; +using Oakton.Resources; +using Shouldly; +using Weasel.Core; +using Wolverine; +using Wolverine.Configuration; +using Wolverine.Marten; +using Wolverine.Postgresql; +using Wolverine.Tracking; + +namespace MartenTests.Bugs; + +public class Bug_1175_schema_name_with_queues +{ + [Fact] + public async Task send_messages_with_postgresql_queueing() + { + using var sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "Service"; + + opts.ListenToPostgresqlQueue("response").MaximumParallelMessages(14, ProcessingOrder.UnOrdered); + opts.PublishMessage().ToPostgresqlQueue("request"); + + opts.Services.AddMarten(opt => + { + opt.Connection(Servers.PostgresConnectionString); + opt.Events.TenancyStyle = TenancyStyle.Conjoined; + }) + .UseLightweightSessions() + .IntegrateWithWolverine(options => + { + options.AutoCreate = AutoCreate.CreateOrUpdate; + options.MessageStorageSchemaName = "sender"; + }); + + opts.Services.AddResourceSetupOnStartup(); + + }).StartAsync(); + + using var listener = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "Listener"; + + opts.ListenToPostgresqlQueue("request").MaximumParallelMessages(14, ProcessingOrder.UnOrdered); + opts.PublishMessage().ToPostgresqlQueue("response"); + + opts.Services.AddMarten(opt => + { + opt.Connection(Servers.PostgresConnectionString); + opt.Events.TenancyStyle = TenancyStyle.Conjoined; + }) + .UseLightweightSessions() + .IntegrateWithWolverine(options => + { + options.AutoCreate = AutoCreate.CreateOrUpdate; + options.MessageStorageSchemaName = "listener"; + }); + + opts.Services.AddResourceSetupOnStartup(); + + }).StartAsync(); + + var tracked = await sender.TrackActivity().AlsoTrack(listener).SendMessageAndWaitAsync(new ColorRequest("red")); + tracked.Received.SingleMessage().Color.ShouldBe("red"); + tracked.Received.SingleEnvelope() + .Destination.ShouldBe(new Uri("postgresql://response/")); + + } +} + +public record ColorRequest(string Color); +public record ColorResponse(string Color); + +public static class ColorRequestHandler +{ + public static async Task Handle(ColorRequest request) + { + await Task.Delay(Random.Shared.Next(0, 500).Milliseconds()); + return new ColorResponse(request.Color); + } +} + +public static class ColorResponseHandler +{ + public static void Handle(ColorResponse response) => Debug.WriteLine("Got color response for " + response.Color); +} diff --git a/src/Persistence/Wolverine.Marten/MartenIntegration.cs b/src/Persistence/Wolverine.Marten/MartenIntegration.cs index 96c8ef957..f976c1625 100644 --- a/src/Persistence/Wolverine.Marten/MartenIntegration.cs +++ b/src/Persistence/Wolverine.Marten/MartenIntegration.cs @@ -57,7 +57,7 @@ public void Configure(WolverineOptions options) var transport = options.Transports.GetOrCreate(); transport.TransportSchemaName = TransportSchemaName; - transport.MessageStorageSchemaName = MessageStorageSchemaName; + transport.MessageStorageSchemaName = MessageStorageSchemaName ?? "public"; } ///