Skip to content

Commit

Permalink
Improvements to the Rabbit MQ tests, upgraded to latest Rabbit MQ client
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Aug 20, 2024
1 parent 61fd4ba commit a5ae83a
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,31 @@ namespace Wolverine.RabbitMQ.Tests.ConventionalRouting;

public static class ConventionalRoutingTestDefaults
{
public static bool RoutingMessageOnly(Type type) => type == typeof(RoutedMessage);
public static bool RoutingMessageOnly(Type type) => type == typeof(ConventionallyRoutedMessage);
}


public abstract class ConventionalRoutingContext : IDisposable
{
private IHost _host;

internal bool DisableListenerDiscovery { get; set; }

internal IWolverineRuntime theRuntime
{
get
{
if (_host == null)
{
_host = WolverineHost.For(opts =>
opts.UseRabbitMq().UseConventionalRouting().AutoProvision().AutoPurgeOnStartup());
{
opts.UseRabbitMq().UseConventionalRouting().AutoProvision().AutoPurgeOnStartup();

if (DisableListenerDiscovery)
{
opts.Discovery.DisableConventionalDiscovery();
}
});
}

return _host.Services.GetRequiredService<IWolverineRuntime>();
Expand Down Expand Up @@ -56,6 +65,11 @@ internal void ConfigureConventions(Action<RabbitMqMessageRoutingConvention> conf
{
_host = WolverineHost.For(opts =>
{
if (DisableListenerDiscovery)
{
opts.Discovery.DisableConventionalDiscovery();
}

opts.UseRabbitMq().UseConventionalRouting(configure).AutoProvision().AutoPurgeOnStartup();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
namespace Wolverine.RabbitMQ.Tests.ConventionalRouting;

[MessageIdentity("routed")]
public class RoutedMessage;
public class ConventionallyRoutedMessage;
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
namespace Wolverine.RabbitMQ.Tests.ConventionalRouting;
using Wolverine.Attributes;

namespace Wolverine.RabbitMQ.Tests.ConventionalRouting;

[WolverineIgnore]
public record HeadersMessage;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Wolverine.RabbitMQ.Tests.ConventionalRouting;

public class RoutedMessageHandler
{
public void Handle(RoutedMessage message)
public void Handle(ConventionallyRoutedMessage message)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public void disable_sender_with_lambda()
[Fact]
public void exclude_types()
{
ConfigureConventions(c => { c.ExcludeTypes(t => t == typeof(PublishedMessage)); });
ConfigureConventions(c =>
{
c.ExcludeTypes(t => t == typeof(PublishedMessage) || t == typeof(HeadersMessage));
});

AssertNoRoutes<PublishedMessage>();

Expand Down Expand Up @@ -85,10 +88,10 @@ public void disable_listener_by_lambda()
{
ConfigureConventions(c =>
{
c.IncludeTypes(t => t == typeof(RoutedMessage));
c.IncludeTypes(t => t == typeof(ConventionallyRoutedMessage));
c.QueueNameForListener(t =>
{
if (t == typeof(RoutedMessage))
if (t == typeof(ConventionallyRoutedMessage))
{
return null; // should not be routed
}
Expand All @@ -110,7 +113,7 @@ public void configure_listener()
{
ConfigureConventions(c =>
{
c.IncludeTypes(t => t == typeof(RoutedMessage));
c.IncludeTypes(t => t == typeof(ConventionallyRoutedMessage));
c.ConfigureListeners((x, _) => { x.ListenerCount(6); });
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public async Task send_from_one_node_to_another_all_with_conventional_routing()
var session = await _sender.TrackActivity()
.AlsoTrack(_receiver)
.IncludeExternalTransports()
.SendMessageAndWaitAsync(new RoutedMessage());
.SendMessageAndWaitAsync(new ConventionallyRoutedMessage());

var received = session
.AllRecordsInOrder()
.Where(x => x.Envelope.Message?.GetType() == typeof(RoutedMessage))
.Where(x => x.Envelope.Message?.GetType() == typeof(ConventionallyRoutedMessage))
.Single(x => x.MessageEventType == MessageEventType.Received);

received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,47 @@ public class end_to_end_with_conventional_routing_custom_exchange : IDisposable

public end_to_end_with_conventional_routing_custom_exchange()
{
_sender = WolverineHost.For(opts =>
_receiver = WolverineHost.For(opts =>
{
opts.UseRabbitMq().UseConventionalRouting(conventions =>
{
conventions.ExchangeNameForSending(type => type.Name + "_headers");
conventions.IncludeTypes(x => x == typeof(HeadersMessage));
conventions.ConfigureSending((x, c) =>
conventions.ConfigureListeners((x, c) =>
{
if (c.MessageType == typeof(HeadersMessage))
{
x.ExchangeType(ExchangeType.Headers);
x.BindToExchange<HeadersMessage>(ExchangeType.Headers, arguments: new Dictionary<string, object>()
{
{"tenant-id", "tenant-id"}
});
}
});
})
})
.AutoProvision().AutoPurgeOnStartup();
opts.DisableConventionalDiscovery();
opts.ServiceName = "Sender";
opts.ServiceName = "Receiver";
});

_receiver = WolverineHost.For(opts =>
_sender = WolverineHost.For(opts =>
{
opts.UseRabbitMq().UseConventionalRouting(conventions =>
{
conventions.IncludeTypes(x => x == typeof(HeadersMessage));
conventions.ConfigureListeners((x, c) =>
{
if (c.MessageType == typeof(HeadersMessage))
conventions.ExchangeNameForSending(type => type.Name + "_headers");
conventions.IncludeTypes(x => x == typeof(HeadersMessage));
conventions.ConfigureSending((x, c) =>
{
x.BindToExchange<HeadersMessage>(ExchangeType.Headers, arguments: new Dictionary<string, object>()
if (c.MessageType == typeof(HeadersMessage))
{
{"tenant-id", "tenant-id"}
});
}
});
x.ExchangeType(ExchangeType.Headers);
}
});
})
.AutoProvision().AutoPurgeOnStartup();
opts.ServiceName = "Receiver";
opts.DisableConventionalDiscovery();
opts.ServiceName = "Sender";
});


}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public async Task send_from_one_node_to_another_all_with_conventional_routing()
.AlsoTrack(_receiver)
.IncludeExternalTransports()
.Timeout(30.Seconds())
.SendMessageAndWaitAsync(new RoutedMessage());
.SendMessageAndWaitAsync(new ConventionallyRoutedMessage());

var received = session
.AllRecordsInOrder()
.Where(x => x.Envelope.Message?.GetType() == typeof(RoutedMessage))
.Where(x => x.Envelope.Message?.GetType() == typeof(ConventionallyRoutedMessage))
.Single(x => x.MessageEventType == MessageEventType.Received);

received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ public class when_discovering_a_sender_with_all_defaults : ConventionalRoutingCo
private readonly MessageRoute theRoute;
public when_discovering_a_sender_with_all_defaults()
{
DisableListenerDiscovery = true;
ConfigureConventions(x=> x.IncludeTypes(ConventionalRoutingTestDefaults.RoutingMessageOnly));
theRoute = PublishingRoutesFor<PublishedMessage>().Single() as MessageRoute;
theRoute = PublishingRoutesFor<ConventionallyRoutedMessage>().Single() as MessageRoute;
}

[Fact]
Expand All @@ -28,7 +29,7 @@ public void should_have_exactly_one_route()
public void routed_to_rabbit_mq_exchange()
{
var endpoint = theRoute.Sender.Endpoint.ShouldBeOfType<RabbitMqExchange>();
endpoint.ExchangeName.ShouldBe(typeof(PublishedMessage).ToMessageTypeName());
endpoint.ExchangeName.ShouldBe(typeof(ConventionallyRoutedMessage).ToMessageTypeName());
}

[Fact]
Expand All @@ -42,7 +43,7 @@ public void endpoint_mode_is_inline_by_default()
public async Task has_declared_exchange()
{
// The rabbit object construction is lazy, so force it to happen
await new MessageBus(theRuntime).SendAsync(new PublishedMessage());
await new MessageBus(theRuntime).SendAsync(new ConventionallyRoutedMessage());

var endpoint = theRoute.Sender.Endpoint.ShouldBeOfType<RabbitMqExchange>();
theTransport.Exchanges.Contains(endpoint.ExchangeName).ShouldBeTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void MapEnvelopeToOutgoing(Envelope envelope, IBasicProperties outgoing)
}
}

public void MapIncomingToEnvelope(Envelope envelope, ReadOnlyBasicProperties incoming)
public void MapIncomingToEnvelope(Envelope envelope, IReadOnlyBasicProperties incoming)
{
envelope.CorrelationId = incoming.CorrelationId;
envelope.ContentType = "application/json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void WriteReplyToAddress(Envelope e, IBasicProperties props)
props.Headers["NServiceBus.ReplyToAddress"] = replyAddress.Value;
}

void ReadReplyUri(Envelope e, ReadOnlyBasicProperties props)
void ReadReplyUri(Envelope e, IReadOnlyBasicProperties props)
{
if (props.Headers.TryGetValue("NServiceBus.ReplyToAddress", out var raw))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace Wolverine.RabbitMQ.Internal;
/// Rabbit MQ IBasicProperties object. Custom implementations of this can be used
/// to create interoperability with non-Wolverine applications through Rabbit MQ
/// </summary>
public interface IRabbitMqEnvelopeMapper : IEnvelopeMapper<ReadOnlyBasicProperties, IBasicProperties>;
public interface IRabbitMqEnvelopeMapper : IEnvelopeMapper<IReadOnlyBasicProperties, IBasicProperties>;

internal class RabbitMqEnvelopeMapper : EnvelopeMapper<ReadOnlyBasicProperties, IBasicProperties>, IRabbitMqEnvelopeMapper
internal class RabbitMqEnvelopeMapper : EnvelopeMapper<IReadOnlyBasicProperties, IBasicProperties>, IRabbitMqEnvelopeMapper
{
public RabbitMqEnvelopeMapper(Endpoint endpoint, IWolverineRuntime runtime) : base(endpoint)
{
Expand All @@ -23,7 +23,7 @@ public RabbitMqEnvelopeMapper(Endpoint endpoint, IWolverineRuntime runtime) : ba
MapProperty(x => x.ContentType!, (e, p) => e.ContentType = p.ContentType,
(e, p) => p.ContentType = e.ContentType);

Action<Envelope, ReadOnlyBasicProperties> readId = (e, props) =>
Action<Envelope, IReadOnlyBasicProperties> readId = (e, props) =>
{
if (Guid.TryParse(props.MessageId, out var id))
{
Expand Down Expand Up @@ -58,7 +58,7 @@ protected override void writeOutgoingHeader(IBasicProperties outgoing, string ke
}

// TODO -- this needs to be open for customizations. See the NServiceBus interop
protected override bool tryReadIncomingHeader(ReadOnlyBasicProperties incoming, string key, out string? value)
protected override bool tryReadIncomingHeader(IReadOnlyBasicProperties incoming, string key, out string? value)
{
if (incoming.Headers == null)
{
Expand All @@ -76,7 +76,7 @@ protected override bool tryReadIncomingHeader(ReadOnlyBasicProperties incoming,
return false;
}

protected override void writeIncomingHeaders(ReadOnlyBasicProperties incoming, Envelope envelope)
protected override void writeIncomingHeaders(IReadOnlyBasicProperties incoming, Envelope envelope)
{
if (incoming.Headers == null) return;
foreach (var pair in incoming.Headers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public RabbitMqTransport() : this(ProtocolName)
private void configureDefaults(ConnectionFactory factory)
{
factory.AutomaticRecoveryEnabled = true;
factory.DispatchConsumersAsync = true;
factory.ClientProvidedName ??= "Wolverine";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ internal class WorkerQueueMessageConsumer : AsyncDefaultBasicConsumer, IDisposab
private readonly CancellationToken _cancellation;
private readonly RabbitMqListener _listener;
private readonly ILogger _logger;
private readonly IEnvelopeMapper<ReadOnlyBasicProperties, IBasicProperties> _mapper;
private readonly IRabbitMqEnvelopeMapper _mapper;
private readonly IReceiver _workerQueue;
private bool _latched;

public WorkerQueueMessageConsumer(IChannel channel, IReceiver workerQueue, ILogger logger,
RabbitMqListener listener,
IEnvelopeMapper<ReadOnlyBasicProperties, IBasicProperties> mapper, Uri address, CancellationToken cancellation) : base(channel)
IRabbitMqEnvelopeMapper mapper, Uri address, CancellationToken cancellation) : base(channel)
{
_workerQueue = workerQueue;
_logger = logger;
Expand All @@ -31,14 +31,14 @@ public void Dispose()
_latched = true;
}

public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange,
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
return HandleBasicDeliverImpl(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
}

public async Task HandleBasicDeliverImpl(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (_latched || _cancellation.IsCancellationRequested)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<ProjectReference Include="..\..\..\Wolverine\Wolverine.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.3" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.8" />
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="8.0.0" />
</ItemGroup>

Expand Down

0 comments on commit a5ae83a

Please sign in to comment.