From 94fd060f274d6899ff8088993239f2d1404532d5 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 18 Oct 2023 08:23:38 -0500 Subject: [PATCH] Initial Kafka Messaging Transport. Closes GH-390 Barebones KafkaTransport Roughed in more of the Kafka transport pieces Roughed in Kafka compliance tests All the compliance tests for kafka are working tests for the stateful resource model in kafka Basics of the Kafka transport are done Last organzation and XML API comments on Kafka code --- build/build.cs | 1 + docker-compose.yml | 29 +++- .../Compliance/TransportCompliance.cs | 2 +- .../Internal/SqsSenderProtocol.cs | 2 +- .../KafkaTransportTests.cs | 35 +++++ .../NoParallelization.cs | 3 + .../StatefulResourceSmokeTests.cs | 138 ++++++++++++++++++ .../Kafka/Wolverine.Kafka.Tests/UnitTest1.cs | 9 -- .../broadcast_to_topic_async.cs | 85 +++++++++++ .../broadcast_to_topic_rules.cs | 99 +++++++++++++ .../Kafka/Wolverine.Kafka.Tests/compliance.cs | 110 ++++++++++++++ .../Wolverine.Kafka/IKafkaEnvelopeMapper.cs | 9 ++ .../Internals/InlineKafkaSender.cs | 29 ++++ .../Internals/KafkaEnvelopeMapper.cs | 30 ++++ .../Internals/KafkaListener.cs | 86 +++++++++++ .../Internals/KafkaTransport.cs | 58 ++++++++ .../KafkaListenerConfiguration.cs | 16 ++ .../Wolverine.Kafka/KafkaSenderProtocol.cs | 32 ++++ .../KafkaSubscriberConfiguration.cs | 11 ++ .../Kafka/Wolverine.Kafka/KafkaTopic.cs | 108 ++++++++++++++ .../KafkaTransportExpression.cs | 72 +++++++++ .../KafkaTransportExtensions.cs | 128 ++++++++++++++++ .../Wolverine.Kafka/Wolverine.Kafka.csproj | 11 +- .../MqttTransportTests.cs | 8 + .../MQTT/Wolverine.MQTT/MqttTopic.cs | 2 + .../FailureAcknowledgement.cs | 7 + src/Wolverine/Transports/BrokerResource.cs | 2 + 27 files changed, 1109 insertions(+), 13 deletions(-) create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs delete mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs diff --git a/build/build.cs b/build/build.cs index e3963579a..12a298f06 100644 --- a/build/build.cs +++ b/build/build.cs @@ -144,6 +144,7 @@ private static void Main(string[] args) "./src/Transports/Azure/Wolverine.AzureServiceBus", "./src/Transports/AWS/Wolverine.AmazonSqs", "./src/Transports/MQTT/Wolverine.MQTT", + "./src/Transports/Kafka/Wolverine.Kafka", "./src/Persistence/Wolverine.RDBMS", "./src/Persistence/Wolverine.Postgresql", "./src/Persistence/Wolverine.EntityFrameworkCore", diff --git a/docker-compose.yml b/docker-compose.yml index 2fc1a52ee..10f8c4219 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,9 @@ version: '3' + +networks: + app-tier: + driver: bridge + services: postgresql: image: "postgres:latest" @@ -43,4 +48,26 @@ services: - DOCKER_HOST=unix:///var/run/docker.sock volumes: - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" - - "/var/run/docker.sock:/var/run/docker.sock" \ No newline at end of file + - "/var/run/docker.sock:/var/run/docker.sock" + + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/src/Testing/TestingSupport/Compliance/TransportCompliance.cs b/src/Testing/TestingSupport/Compliance/TransportCompliance.cs index af193b932..156a58a6c 100644 --- a/src/Testing/TestingSupport/Compliance/TransportCompliance.cs +++ b/src/Testing/TestingSupport/Compliance/TransportCompliance.cs @@ -209,7 +209,7 @@ public virtual async Task can_apply_requeue_mechanics() var session = await theSender.TrackActivity(Fixture.DefaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() - .Timeout(60.Seconds()) + .Timeout(30.Seconds()) .ExecuteAndWaitAsync(c => c.EndpointFor(theOutboundAddress).SendAsync(new Message2())); session.FindSingleTrackedMessageOfType(MessageEventType.MessageSucceeded) diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs index 151888be9..25584ad46 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs @@ -8,7 +8,7 @@ namespace Wolverine.AmazonSqs.Internal; -internal class SqsSenderProtocol : ISenderProtocol +internal class SqsSenderProtocol :ISenderProtocol { private readonly ILogger _logger; private readonly AmazonSqsQueue _queue; diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs new file mode 100644 index 000000000..f8dafa436 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs @@ -0,0 +1,35 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; + +namespace Wolverine.Kafka.Tests; + +public class KafkaTransportTests +{ + [Theory] + [InlineData("kafka://topic/one", "one")] + [InlineData("kafka://topic/one.two", "one.two")] + [InlineData("kafka://topic/one.two/", "one.two")] + [InlineData("kafka://topic/one.two.three", "one.two.three")] + public void get_topic_name_from_uri(string uriString, string expected) + { + KafkaTopic.TopicNameForUri(new Uri(uriString)) + .ShouldBe(expected); + } + + [Fact] + public void build_uri_for_endpoint() + { + var transport = new KafkaTransport(); + new KafkaTopic(transport, "one.two", EndpointRole.Application) + .Uri.ShouldBe(new Uri("Kafka://topic/one.two")); + } + + [Fact] + public void endpoint_name_is_topic_name() + { + var transport = new KafkaTransport(); + new KafkaTopic(transport, "one.two", EndpointRole.Application) + .EndpointName.ShouldBe("one.two"); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs new file mode 100644 index 000000000..b1fa88422 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs @@ -0,0 +1,3 @@ +using Xunit; + +[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)] \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs new file mode 100644 index 000000000..339329bba --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs @@ -0,0 +1,138 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton; +using Shouldly; + +namespace Wolverine.Kafka.Tests; + +public class StatefulResourceSmokeTests +{ + private IHostBuilder ConfigureBuilder(bool autoProvision, int starting = 1) + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + if (autoProvision) + { + opts.UseKafka("localhost:29092").AutoProvision(); + } + else + { + opts.UseKafka("localhost:29092");; + } + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + }); + } + + [Fact] + public async Task run_setup() + { + var result = await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "setup" }); + result.ShouldBe(0); + } + + + [Fact] + public async Task statistics() + { + (await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + var result = await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "statistics" }); + + result.ShouldBe(0); + } + + [Fact] + public async Task check_positive() + { + (await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + var result = await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "check" }); + + result.ShouldBe(0); + } + + // [Fact] + // public async Task check_negative() + // { + // var result = await ConfigureBuilder(false, 10) + // .RunOaktonCommands(new[] { "resources", "check" }); + // + // result.ShouldBe(1); + // } + + [Fact] + public async Task clear_state() + { + (await ConfigureBuilder(false, 20) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + (await ConfigureBuilder(false, 20) + .RunOaktonCommands(new[] { "resources", "clear" })).ShouldBe(0); + } + + [Fact] + public async Task teardown() + { + (await ConfigureBuilder(false, 30) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + (await ConfigureBuilder(false, 30) + .RunOaktonCommands(new[] { "resources", "teardown" })).ShouldBe(0); + } +} + +public class SRMessage1 +{ +} + +public class SRMessage2 +{ +} + +public class SRMessage3 +{ +} + +public class SRMessage4 +{ +} + +public class SRMessageHandlers +{ + public Task Handle(SRMessage1 message) + { + return Task.Delay(100.Milliseconds()); + } + + public Task Handle(SRMessage2 message) + { + return Task.Delay(100.Milliseconds()); + } + + public Task Handle(SRMessage3 message) + { + return Task.Delay(100.Milliseconds()); + } + + public Task Handle(SRMessage4 message) + { + return Task.Delay(100.Milliseconds()); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs deleted file mode 100644 index cecb10a27..000000000 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Wolverine.Kafka.Tests; - -public class UnitTest1 -{ - [Fact] - public void Test1() - { - } -} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs new file mode 100644 index 000000000..36ea8a1be --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs @@ -0,0 +1,85 @@ +using System.Diagnostics; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton.Resources; +using Shouldly; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class broadcast_to_topic_async : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _sender; + private IHost _receiver; + + public broadcast_to_topic_async(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.ListenToKafkaTopic("incoming.one"); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + } + + [Fact] + public async Task broadcast() + { + var session = await _sender.TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .ExecuteAndWaitAsync(m => m.BroadcastToTopicAsync("incoming.one", new ColorMessage("blue"))); + + var received = session.Received.SingleMessage(); + received.Color.ShouldBe("blue"); + } + + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + await _receiver.StopAsync(); + } +} + +public class ColorMessage +{ + public ColorMessage() + { + } + + public ColorMessage(string color) + { + Color = color; + } + + public string Color { get; set; } +} + +public static class ColorMessageHandler +{ + public static void Handle(ColorMessage message) + { + Debug.WriteLine("Got " + message.Color); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs new file mode 100644 index 000000000..05688d5d7 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs @@ -0,0 +1,99 @@ +using System.Diagnostics; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton.Resources; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class broadcast_to_topic_rules : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _sender; + private IHost _receiver; + + public broadcast_to_topic_rules(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + + opts.PublishAllMessages().ToKafkaTopics(); + + opts.ServiceName = "sender"; + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.ListenToKafkaTopic("red"); + opts.ListenToKafkaTopic("green"); + opts.ListenToKafkaTopic("blue"); + opts.ListenToKafkaTopic("purple"); + + opts.ServiceName = "receiver"; + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + } + + [Fact] + public async Task route_by_derived_topics_1() + { + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new RedMessage("one")); + + session.Received.SingleEnvelope() + .Destination.ShouldBe(new Uri("kafka://topic/red")); + } + + [Fact] + public async Task route_by_derived_topics_2() + { + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new GreenMessage("one")); + + session.Received.SingleEnvelope() + .Destination.ShouldBe(new Uri("kafka://topic/green")); + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + await _receiver.StopAsync(); + } +} + +[Topic("red")] +public record RedMessage(string Name); + +[Topic("green")] +public record GreenMessage(string Name); + +public static class ColoredMessageHandler +{ + public static void Handle(RedMessage m) => Debug.WriteLine("Got red " + m.Name); + public static void Handle(GreenMessage m) => Debug.WriteLine("Got green " + m.Name); +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs new file mode 100644 index 000000000..c674d152f --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs @@ -0,0 +1,110 @@ +using Microsoft.Extensions.DependencyInjection; +using Oakton.Resources; +using Shouldly; +using TestingSupport; +using TestingSupport.Compliance; +using Wolverine.Runtime; + +namespace Wolverine.Kafka.Tests; + + +public class BufferedComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + + + + public BufferedComplianceFixture() : base(new Uri("kafka://topic/receiver"), 120) + { + + } + + public async Task InitializeAsync() + { + var receiverTopic = "buffered.receiver"; + var senderTopic = "buffered.sender"; + + OutboundAddress = new Uri("kafka://topic/" + receiverTopic); + + await SenderIs(opts => + { + opts.UseKafka("localhost:29092"); + + opts.ListenToKafkaTopic(senderTopic); + + opts.PublishAllMessages().ToKafkaTopic(receiverTopic).BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(); + }); + + await ReceiverIs(opts => + { + opts.UseKafka("localhost:29092"); + + opts.ListenToKafkaTopic(receiverTopic).Named("receiver").BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +[Collection("acceptance")] +public class BufferedSendingAndReceivingCompliance : TransportCompliance +{ + +} + + +public class InlineComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + + public static int Number = 0; + + public InlineComplianceFixture() : base(new Uri("kafka://topic/receiver"), 120) + { + + } + + public async Task InitializeAsync() + { + var receiverTopic = "receiver.inline"; + var senderTopic = "sender.inline"; + + OutboundAddress = new Uri("kafka://topic/" + receiverTopic); + + await SenderIs(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + + opts.ListenToKafkaTopic(senderTopic).UseForReplies(); + + opts.PublishAllMessages().ToKafkaTopic(receiverTopic).SendInline(); + + opts.Services.AddResourceSetupOnStartup(); + }); + + await ReceiverIs(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + + opts.ListenToKafkaTopic(receiverTopic).Named("receiver").ProcessInline(); + + opts.Services.AddResourceSetupOnStartup(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +[Collection("acceptance")] +public class InlineSendingAndReceivingCompliance : TransportCompliance +{ + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs new file mode 100644 index 000000000..a68f8fdd9 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs @@ -0,0 +1,9 @@ +using Confluent.Kafka; +using Wolverine.Transports; + +namespace Wolverine.Kafka; + +public interface IKafkaEnvelopeMapper : IEnvelopeMapper, Message> +{ + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs new file mode 100644 index 000000000..b486781e3 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs @@ -0,0 +1,29 @@ +using Confluent.Kafka; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka.Internals; + +public class InlineKafkaSender : ISender +{ + private readonly KafkaTopic _topic; + + public InlineKafkaSender(KafkaTopic topic) + { + _topic = topic; + Destination = topic.Uri; + } + + public bool SupportsNativeScheduledSend => false; + public Uri Destination { get; } + public async Task PingAsync() + { + throw new NotImplementedException(); + } + + public async ValueTask SendAsync(Envelope envelope) + { + var message = _topic.Mapper.CreateMessage(envelope); + using var producer = new ProducerBuilder(_topic.Parent.ProducerConfig).Build(); + await producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs new file mode 100644 index 000000000..4ef937b34 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs @@ -0,0 +1,30 @@ +using System.Text; +using Confluent.Kafka; +using Wolverine.Configuration; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +internal class KafkaEnvelopeMapper : EnvelopeMapper, Message>, IKafkaEnvelopeMapper +{ + public KafkaEnvelopeMapper(Endpoint endpoint) : base(endpoint) + { + } + + protected override void writeOutgoingHeader(Message outgoing, string key, string value) + { + outgoing.Headers.Add(key, Encoding.Default.GetBytes(value)); + } + + protected override bool tryReadIncomingHeader(Message incoming, string key, out string value) + { + if (incoming.Headers.TryGetLastBytes(key, out var bytes)) + { + value = Encoding.Default.GetString(bytes); + return true; + } + + value = default!; + return false; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs new file mode 100644 index 000000000..fd4c1ace7 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -0,0 +1,86 @@ +using Confluent.Kafka; +using JasperFx.Core; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +internal class KafkaListener : IListener, IDisposable +{ + private readonly IConsumer _consumer; + private CancellationTokenSource _cancellation = new(); + private readonly Task _runner; + private readonly ConsumerConfig _config; + private readonly IReceiver _receiver; + + public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver) + { + Address = topic.Uri; + _consumer = new ConsumerBuilder(config).Build(); + var mapper = topic.Mapper; + + _config = config; + _receiver = receiver; + + _runner = Task.Run(async () => + { + _consumer.Subscribe(topic.TopicName); + try + { + while (!_cancellation.IsCancellationRequested) + { + // TODO -- watch that this isn't EnableAutoCommit = false + // TODO -- wrap try catch around this + var result = _consumer.Consume(_cancellation.Token); + var message = result.Message; + + var envelope = mapper.CreateEnvelope(result.Topic, message); + + await receiver.ReceivedAsync(this, envelope); + } + } + catch (OperationCanceledException) + { + // Shutting down + } + finally + { + _consumer.Close(); + } + }, _cancellation.Token); + } + + public ValueTask CompleteAsync(Envelope envelope) + { + if (_config.EnableAutoCommit != null) + { + _consumer.Commit(); + } + + return ValueTask.CompletedTask; + } + + public ValueTask DeferAsync(Envelope envelope) + { + // Really just a retry + return _receiver.ReceivedAsync(this, envelope); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return ValueTask.CompletedTask; + } + + public Uri Address { get; } + public async ValueTask StopAsync() + { + _cancellation.Cancel(); + await _runner; + } + + public void Dispose() + { + _consumer.SafeDispose(); + _runner.Dispose(); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs new file mode 100644 index 000000000..526b6ebff --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -0,0 +1,58 @@ +using Confluent.Kafka; +using JasperFx.Core; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +public class KafkaTransport : BrokerTransport +{ + public Cache Topics { get; } + + public ProducerConfig ProducerConfig { get; } = new(); + public ConsumerConfig ConsumerConfig { get; } = new(); + + public AdminClientConfig AdminClientConfig { get; } = new(); + + public KafkaTransport() : base("kafka", "Kafka Topics") + { + Topics = new Cache(topicName => new KafkaTopic(this, topicName, EndpointRole.Application)); + } + + protected override IEnumerable endpoints() + { + return Topics; + } + + protected override KafkaTopic findEndpointByUri(Uri uri) + { + var topicName = KafkaTopic.TopicNameForUri(uri); + return Topics[topicName]; + } + + + protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) + { + ConsumerConfig.GroupId ??= runtime.Options.ServiceName; + ProducerConfig.ClientId ??= runtime.Options.ServiceName; + + var topics = Topics[KafkaTopic.WolverineTopicsName]; + topics.RoutingType = RoutingMode.ByTopic; + topics.OutgoingRules.Add( + new TopicRoutingRule()); // t + } + + public override ValueTask ConnectAsync(IWolverineRuntime runtime) + { + foreach (var endpoint in Topics) endpoint.Compile(runtime); + + return ValueTask.CompletedTask; + } + + public override IEnumerable DiagnosticColumns() + { + yield break; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs new file mode 100644 index 000000000..2407cd407 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs @@ -0,0 +1,16 @@ +using Wolverine.Configuration; + +namespace Wolverine.Kafka; + +public class KafkaListenerConfiguration : ListenerConfiguration +{ + public KafkaListenerConfiguration(KafkaTopic endpoint) : base(endpoint) + { + } + + public KafkaListenerConfiguration(Func source) : base(source) + { + } + + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs new file mode 100644 index 000000000..2768e395a --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs @@ -0,0 +1,32 @@ +using Confluent.Kafka; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka; + +public class KafkaSenderProtocol : ISenderProtocol +{ + private readonly KafkaTopic _topic; + + public KafkaSenderProtocol(KafkaTopic topic) + { + _topic = topic; + } + + public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch batch) + { + using var producer = new ProducerBuilder(_topic.Parent.ProducerConfig).Build(); + + var tasks = new List(); + foreach (var envelope in batch.Messages) + { + // TODO -- separate try/catch here! + + var message = _topic.Mapper.CreateMessage(envelope); + var task = producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); + tasks.Add(task); + } + + await Task.WhenAll(tasks); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs new file mode 100644 index 000000000..b9216a16a --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs @@ -0,0 +1,11 @@ +using Wolverine.Configuration; + +namespace Wolverine.Kafka; + +public class KafkaSubscriberConfiguration : SubscriberConfiguration +{ + internal KafkaSubscriberConfiguration(KafkaTopic endpoint) : base(endpoint) + { + } + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs new file mode 100644 index 000000000..09810710b --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -0,0 +1,108 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Logging; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka; + +public class KafkaTopic : Endpoint, IBrokerEndpoint +{ + // Strictly an identifier for the endpoint + public const string WolverineTopicsName = "wolverine.topics"; + + public KafkaTransport Parent { get; } + + public KafkaTopic(KafkaTransport parent, string topicName, EndpointRole role) : base(new Uri("kafka://topic/" + topicName), role) + { + Parent = parent; + EndpointName = topicName; + TopicName = topicName; + + Mapper = new KafkaEnvelopeMapper(this); + } + + public override bool AutoStartSendingAgent() + { + return true; + } + + public string TopicName { get; } + + public IKafkaEnvelopeMapper Mapper { get; set; } + + public static string TopicNameForUri(Uri uri) + { + return uri.Segments.Last().Trim('/'); + } + + public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) + { + var listener = new KafkaListener(this, Parent.ConsumerConfig, receiver); + return ValueTask.FromResult((IListener)listener); + } + + protected override ISender CreateSender(IWolverineRuntime runtime) + { + return Mode == EndpointMode.Inline + ? new InlineKafkaSender(this) + : new BatchedSender(this, new KafkaSenderProtocol(this), runtime.Cancellation, + runtime.LoggerFactory.CreateLogger()); + } + + public async ValueTask CheckAsync() + { + if (TopicName == WolverineTopicsName) return true; // don't care, this is just a marker + try + { + using var client = new ProducerBuilder(Parent.ProducerConfig).Build(); + await client.ProduceAsync(TopicName, new Message + { + Key = "ping", + Value = "ping" + }); + + + return true; + } + catch (Exception) + { + return false; + } + } + + public async ValueTask TeardownAsync(ILogger logger) + { + if (TopicName == WolverineTopicsName) return; // don't care, this is just a marker + using var client = new AdminClientBuilder(Parent.AdminClientConfig).Build(); + await client.DeleteTopicsAsync(new string[] { TopicName }); + } + + public async ValueTask SetupAsync(ILogger logger) + { + if (TopicName == WolverineTopicsName) return; // don't care, this is just a marker + + using var client = new AdminClientBuilder(Parent.AdminClientConfig).Build(); + + try + { + await client.CreateTopicsAsync(new[] + { + new TopicSpecification + { + Name = TopicName + } + }); + + logger.LogInformation("Created Kafka topic {Topic}", TopicName); + } + catch (CreateTopicsException e) + { + if (e.Message.Contains("already exists.")) return; + throw; + } + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs new file mode 100644 index 000000000..6d0de953e --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs @@ -0,0 +1,72 @@ +using Confluent.Kafka; +using Wolverine.Kafka.Internals; + +namespace Wolverine.Kafka; + +public class KafkaTransportExpression +{ + private readonly KafkaTransport _transport; + + internal KafkaTransportExpression(KafkaTransport transport) + { + _transport = transport; + } + + /// + /// Configure both the producer and consumer config of the underlying Kafka client + /// + /// + /// + public KafkaTransportExpression ConfigureClient(Action configure) + { + configure(_transport.ConsumerConfig); + configure(_transport.ProducerConfig); + configure(_transport.AdminClientConfig); + + return this; + } + + /// + /// Configure the Kafka message producers within the Wolverine transport + /// + /// + /// + public KafkaTransportExpression ConfigureProducers(Action configure) + { + configure(_transport.ProducerConfig); + return this; + } + + /// + /// Configure the Kafka message consumers within the Wolverine transport + /// + /// + /// + public KafkaTransportExpression ConfigureConsumers(Action configure) + { + configure(_transport.ConsumerConfig); + return this; + } + + /// + /// Create newly used Kafka topics on endpoint activation if the topic is missing + /// + /// + /// + public KafkaTransportExpression AutoProvision(Action? configure = null) + { + _transport.AutoProvision = true; + configure?.Invoke(_transport.AdminClientConfig); + return this; + } + + /// + /// Deletes and rebuilds topics on application startup + /// + /// + public KafkaTransportExpression DeleteExistingTopicsOnStartup() + { + _transport.AutoPurgeAllQueues = true; + return this; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs new file mode 100644 index 000000000..51f0328a8 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -0,0 +1,128 @@ +using System.Text; +using Confluent.Kafka; +using JasperFx.Core.Reflection; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; + +namespace Wolverine.Kafka; + +public static class KafkaTransportExtensions +{ + /// + /// Quick access to the Kafka Transport within this application. + /// This is for advanced usage + /// + /// + /// + internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints) + { + var transports = endpoints.As().Transports; + + return transports.GetOrCreate(); + } + + /// + /// Add a connection to an Kafka broker within this application + /// + /// + /// + /// + public static KafkaTransportExpression UseKafka(this WolverineOptions options, string bootstrapServers) + { + var transport = options.KafkaTransport(); + transport.ConsumerConfig.BootstrapServers = bootstrapServers; + transport.ProducerConfig.BootstrapServers = bootstrapServers; + transport.AdminClientConfig.BootstrapServers = bootstrapServers; + + return new KafkaTransportExpression(transport); + } + + + /// + /// Listen for incoming messages at the designated Kafka topic name + /// + /// + /// The name of the Rabbit MQ queue + /// + /// Optional configuration for this Rabbit Mq queue if being initialized by Wolverine + /// + public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOptions endpoints, string topicName) + { + var transport = endpoints.KafkaTransport(); + + var endpoint = transport.Topics[topicName]; + endpoint.EndpointName = topicName; + endpoint.IsListener = true; + + return new KafkaListenerConfiguration(endpoint); + } + + /// + /// Publish messages to an Kafka topic + /// + /// + /// + /// + public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName) + { + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + + var topic = transport.Topics[topicName]; + + // This is necessary unfortunately to hook up the subscription rules + publishing.To(topic.Uri); + + return new KafkaSubscriberConfiguration(topic); + } + + /// + /// Publish messages to Kafka topics based on Wolverine's rules for deriving topic + /// names from a message type + /// + /// + /// + /// + public static KafkaSubscriberConfiguration ToKafkaTopics(this IPublishToExpression publishing) + { + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + + var topic = transport.Topics[KafkaTopic.WolverineTopicsName]; + + // This is necessary unfortunately to hook up the subscription rules + publishing.To(topic.Uri); + + return new KafkaSubscriberConfiguration(topic); + } + + internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message message) + { + var envelope = new Envelope + { + GroupId = message.Key, + Data = Encoding.Default.GetBytes(message.Value), + TopicName = topicName + }; + + message.Headers ??= new Headers(); // prevent NRE + + mapper.MapIncomingToEnvelope(envelope, message); + + return envelope; + } + + internal static Message CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope) + { + var message = new Message + { + Key = envelope.GroupId, + Value = Encoding.Default.GetString(envelope.Data), + Headers = new Headers() + }; + + mapper.MapEnvelopeToOutgoing(envelope, message); + + return message; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj index 9e54942fc..fbcde2f34 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj +++ b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj @@ -1,7 +1,16 @@ - + Kafka Transport for Wolverine Messaging Systems + portable + WolverineFx.Kafka + false + false + false + false + false + enable + Wolverine.Kafka diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs index 8ff4bc311..49aa0b55a 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs @@ -24,4 +24,12 @@ public void build_uri_for_endpoint() new MqttTopic("one/two", transport, EndpointRole.Application) .Uri.ShouldBe(new Uri("mqtt://topic/one/two")); } + + [Fact] + public void endpoint_name_is_topic_name() + { + var transport = new MqttTransport(); + new MqttTopic("one/two", transport, EndpointRole.Application) + .EndpointName.ShouldBe("one/two"); + } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs index 20fc5c488..594daeb75 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs @@ -24,6 +24,8 @@ public MqttTopic(string topicName, MqttTransport parent, EndpointRole role) : ba TopicName = topicName.Trim('/'); Parent = parent; + EndpointName = topicName; + EnvelopeMapper = new MqttEnvelopeMapper(this); Mode = EndpointMode.BufferedInMemory; } diff --git a/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs b/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs index 3b63e7bb0..42938471c 100644 --- a/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs +++ b/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs @@ -1,8 +1,15 @@  +using System.Diagnostics; + namespace Wolverine.Runtime.RemoteInvocation; public class FailureAcknowledgement { + public FailureAcknowledgement() + { + Debug.WriteLine("Why?"); + } + public Guid RequestId { get; init; } public string Message { get; init; } = null!; diff --git a/src/Wolverine/Transports/BrokerResource.cs b/src/Wolverine/Transports/BrokerResource.cs index 3770a6c2c..07776cbc5 100644 --- a/src/Wolverine/Transports/BrokerResource.cs +++ b/src/Wolverine/Transports/BrokerResource.cs @@ -95,6 +95,8 @@ public async Task DetermineStatus(CancellationToken token) }; var columns = _transport.DiagnosticColumns().ToArray(); + if (!columns.Any()) return table; + foreach (var column in columns) table.AddColumn(new TableColumn(column.Header) { Alignment = column.Alignment });