From d5ff52e8cb0c8eace959a02a97a55ae3e22dd974 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 18 Dec 2024 11:02:19 -0600 Subject: [PATCH] Addressing the ability to fully customize the external message table option --- .../Transport/external_message_tables.cs | 27 +++++++++++++++++++ .../Transport/external_message_tables.cs | 27 +++++++++++++++++++ .../PostgresqlMessageStore.cs | 4 +-- .../Persistence/SqlServerMessageStore.cs | 4 +-- 4 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs b/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs index 70fdc598..626d500a 100644 --- a/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs +++ b/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs @@ -150,6 +150,33 @@ public async Task end_to_end_default_variable_message_types() var envelope = tracked.Received.SingleEnvelope(); envelope.Destination.ShouldBe(new Uri("external-table://external.incoming1/")); } + + [Fact] + public async Task end_to_end_default_variable_message_types_customize_table_in_every_possible_way() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UsePostgresqlPersistenceAndTransport(Servers.PostgresConnectionString, "external"); + + opts.ListenForMessagesFromExternalDatabaseTable("external", "incoming1", table => + { + table.IdColumnName = "pk"; + table.TimestampColumnName = "added"; + table.JsonBodyColumnName = "message_body"; + table.MessageTypeColumnName = "message_kind"; + + table.PollingInterval = 1.Seconds(); + }); + + }).StartAsync(); + + var tracked = await host.TrackActivity().Timeout(1.Minutes()).WaitForMessageToBeReceivedAt(host).ExecuteAndWaitAsync( + _ => host.SendMessageThroughExternalTable("external.incoming1", new Message2())); + + var envelope = tracked.Received.SingleEnvelope(); + envelope.Destination.ShouldBe(new Uri("external-table://external.incoming1/")); + } } diff --git a/src/Persistence/SqlServerTests/Transport/external_message_tables.cs b/src/Persistence/SqlServerTests/Transport/external_message_tables.cs index 9c4b9659..cbaf3dec 100644 --- a/src/Persistence/SqlServerTests/Transport/external_message_tables.cs +++ b/src/Persistence/SqlServerTests/Transport/external_message_tables.cs @@ -149,6 +149,33 @@ public async Task end_to_end_default_variable_message_types() envelope.Destination.ShouldBe(new Uri("external-table://outgoing.incoming1/")); } + [Fact] + public async Task end_to_end_default_variable_message_types_customize_table_in_every_possible_way() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseSqlServerPersistenceAndTransport(Servers.SqlServerConnectionString, "outside"); + + opts.ListenForMessagesFromExternalDatabaseTable("outside", "incoming1", table => + { + table.IdColumnName = "pk"; + table.TimestampColumnName = "added"; + table.JsonBodyColumnName = "message_body"; + table.MessageTypeColumnName = "message_kind"; + + table.PollingInterval = 1.Seconds(); + }); + + }).StartAsync(); + + var tracked = await host.TrackActivity().Timeout(1.Minutes()).WaitForMessageToBeReceivedAt(host).ExecuteAndWaitAsync( + _ => host.SendMessageThroughExternalTable("outside.incoming1", new Message2())); + + var envelope = tracked.Received.SingleEnvelope(); + envelope.Destination.ShouldBe(new Uri("external-table://outside.incoming1/")); + } + } public static class Message1Handler diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs index 05905548..d8c76254 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs @@ -116,12 +116,12 @@ public override ISchemaObject AddExternalMessageTable(ExternalMessageTable defin table.AddColumn(definition.JsonBodyColumnName, "jsonb").NotNull(); if (definition.TimestampColumnName.IsNotEmpty()) { - table.AddColumn("timestamp").DefaultValueByExpression("((now() at time zone 'utc'))"); + table.AddColumn(definition.TimestampColumnName).DefaultValueByExpression("((now() at time zone 'utc'))"); } if (definition.MessageTypeColumnName.IsNotEmpty()) { - table.AddColumn("message_type"); + table.AddColumn(definition.MessageTypeColumnName); } return table; diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index 387f7516..9091afea 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -285,12 +285,12 @@ public override ISchemaObject AddExternalMessageTable(ExternalMessageTable defin table.AddColumn(definition.JsonBodyColumnName, "varbinary(max)").NotNull(); if (definition.TimestampColumnName.IsNotEmpty()) { - table.AddColumn("timestamp").DefaultValueByExpression("SYSDATETIMEOFFSET()"); + table.AddColumn(definition.TimestampColumnName).DefaultValueByExpression("SYSDATETIMEOFFSET()"); } if (definition.MessageTypeColumnName.IsNotEmpty()) { - table.AddColumn("message_type", "varchar(250)"); + table.AddColumn(definition.MessageTypeColumnName, "varchar(250)"); } return table;