Skip to content

Commit

Permalink
Can register Marten or Postgresql integration with NpgsqlDataSource. C…
Browse files Browse the repository at this point in the history
…loses GH-691. Closes GH-747
  • Loading branch information
jeremydmiller committed Mar 13, 2024
1 parent 81bb1ce commit 004f916
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async Task InitializeAsync()
// database by tenant
o.MultiTenantedDatabasesWithMasterDatabaseTable(Servers.PostgresConnectionString, "tenants");
})
.IntegrateWithWolverine("mt", masterDatabaseConnectionString:Servers.PostgresConnectionString)
.IntegrateWithWolverine("mt")

// All detected changes will be applied to all
// the configured tenant databases on startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,22 @@ public static class WolverineOptionsMartenExtensions
/// </summary>
/// <param name="expression"></param>
/// <param name="schemaName">Optionally move the Wolverine envelope storage to a separate schema</param>
/// <param name="masterDataSource">
/// In the case of Marten using a database per tenant, you may wish to
/// explicitly determine the master database for Wolverine where Wolverine will store node and envelope information.
/// This does not have to be one of the tenant databases
/// Wolverine will try to use the master database from the Marten configuration when possible
/// </param>
/// <param name="masterDatabaseConnectionString">
/// In the case of Marten using a database per tenant, you may wish to
/// explicitly determine the master database for Wolverine where Wolverine will store node and envelope information.
/// This does not have to be one of the tenant databases
/// Wolverine will try to use the master database from the Marten configuration when possible
/// </param>
/// <returns></returns>
public static MartenServiceCollectionExtensions.MartenConfigurationExpression IntegrateWithWolverine(
this MartenServiceCollectionExtensions.MartenConfigurationExpression expression, string? schemaName = null,
string? masterDatabaseConnectionString = null)
string? masterDatabaseConnectionString = null, NpgsqlDataSource? masterDataSource = null)
{
if (schemaName.IsNotEmpty() && schemaName != schemaName.ToLowerInvariant())
{
Expand All @@ -58,7 +65,7 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression In
return BuildSinglePostgresqlMessageStore(schemaName, store, runtime, logger);
}

return BuildMultiTenantedMessageDatabase(schemaName, masterDatabaseConnectionString, store, runtime, s);
return BuildMultiTenantedMessageDatabase(schemaName, masterDatabaseConnectionString, masterDataSource, store, runtime, s);
});

expression.Services.AddSingleton<IDatabaseSource, MartenMessageDatabaseDiscovery>();
Expand Down Expand Up @@ -110,21 +117,17 @@ internal static NpgsqlDataSource findMasterDataSource(DocumentStore store, IWolv
}

internal static IMessageStore BuildMultiTenantedMessageDatabase(string schemaName,
string? masterDatabaseConnectionString, DocumentStore store, IWolverineRuntime runtime,
string? masterDatabaseConnectionString, NpgsqlDataSource? masterDataSource, DocumentStore store,
IWolverineRuntime runtime,
IServiceProvider serviceProvider)
{
if (masterDatabaseConnectionString.IsEmpty())
{
throw new ArgumentOutOfRangeException(nameof(masterDatabaseConnectionString),
"Must specify a master Wolverine database connection string in the case of using Marten multi-tenancy with multiple databases");
}

var masterSettings = new DatabaseSettings
{
ConnectionString = masterDatabaseConnectionString,
SchemaName = schemaName,
IsMaster = true,
CommandQueuesEnabled = true
CommandQueuesEnabled = true,
DataSource = masterDataSource
};

var dataSource = findMasterDataSource(store, runtime, masterSettings, serviceProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,28 @@ public static void PersistMessagesWithPostgresql(this WolverineOptions options,
o.Settings.ScheduledJobLockId = $"{schemaName ?? "public"}:scheduled-jobs".GetDeterministicHashCode();
});
}

/// <summary>
/// Register Postgresql backed message persistence to a known connection string
/// </summary>
/// <param name="options"></param>
/// <param name="dataSource"></param>
/// <param name="schemaName">Optional schema name for the Wolverine envelope storage</param>
public static void PersistMessagesWithPostgresql(this WolverineOptions options, NpgsqlDataSource dataSource,
string? schemaName = null)
{
if (schemaName.IsNotEmpty() && schemaName != schemaName.ToLowerInvariant())
{
throw new ArgumentOutOfRangeException(nameof(schemaName),
"The schema name must be in all lower case characters");
}

options.Include<PostgresqlBackedPersistence>(o =>
{
o.Settings.SchemaName = schemaName ?? "public";
o.Settings.DataSource = dataSource;

o.Settings.ScheduledJobLockId = $"{schemaName ?? "public"}:scheduled-jobs".GetDeterministicHashCode();
});
}
}
9 changes: 9 additions & 0 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public async Task StopAsync(CancellationToken cancellationToken)
await _endpoints.DrainAsync();

DurabilitySettings.Cancel();

try
{
// Do this to release pooled connections in Npgsql just in case
await Storage.DisposeAsync();
}
catch (Exception)
{
}
}

private void startInMemoryScheduledJobs()
Expand Down

0 comments on commit 004f916

Please sign in to comment.