diff --git a/build/build.cs b/build/build.cs index e3963579a..12a298f06 100644 --- a/build/build.cs +++ b/build/build.cs @@ -144,6 +144,7 @@ private static void Main(string[] args) "./src/Transports/Azure/Wolverine.AzureServiceBus", "./src/Transports/AWS/Wolverine.AmazonSqs", "./src/Transports/MQTT/Wolverine.MQTT", + "./src/Transports/Kafka/Wolverine.Kafka", "./src/Persistence/Wolverine.RDBMS", "./src/Persistence/Wolverine.Postgresql", "./src/Persistence/Wolverine.EntityFrameworkCore", diff --git a/docker-compose.yml b/docker-compose.yml index 2fc1a52ee..10f8c4219 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,9 @@ version: '3' + +networks: + app-tier: + driver: bridge + services: postgresql: image: "postgres:latest" @@ -43,4 +48,26 @@ services: - DOCKER_HOST=unix:///var/run/docker.sock volumes: - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" - - "/var/run/docker.sock:/var/run/docker.sock" \ No newline at end of file + - "/var/run/docker.sock:/var/run/docker.sock" + + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/src/Testing/TestingSupport/Compliance/TransportCompliance.cs b/src/Testing/TestingSupport/Compliance/TransportCompliance.cs index af193b932..156a58a6c 100644 --- a/src/Testing/TestingSupport/Compliance/TransportCompliance.cs +++ b/src/Testing/TestingSupport/Compliance/TransportCompliance.cs @@ -209,7 +209,7 @@ public virtual async Task can_apply_requeue_mechanics() var session = await theSender.TrackActivity(Fixture.DefaultTimeout) .AlsoTrack(theReceiver) .DoNotAssertOnExceptionsDetected() - .Timeout(60.Seconds()) + .Timeout(30.Seconds()) .ExecuteAndWaitAsync(c => c.EndpointFor(theOutboundAddress).SendAsync(new Message2())); session.FindSingleTrackedMessageOfType(MessageEventType.MessageSucceeded) diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs index 151888be9..25584ad46 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs @@ -8,7 +8,7 @@ namespace Wolverine.AmazonSqs.Internal; -internal class SqsSenderProtocol : ISenderProtocol +internal class SqsSenderProtocol :ISenderProtocol { private readonly ILogger _logger; private readonly AmazonSqsQueue _queue; diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs new file mode 100644 index 000000000..f8dafa436 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs @@ -0,0 +1,35 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; + +namespace Wolverine.Kafka.Tests; + +public class KafkaTransportTests +{ + [Theory] + [InlineData("kafka://topic/one", "one")] + [InlineData("kafka://topic/one.two", "one.two")] + [InlineData("kafka://topic/one.two/", "one.two")] + [InlineData("kafka://topic/one.two.three", "one.two.three")] + public void get_topic_name_from_uri(string uriString, string expected) + { + KafkaTopic.TopicNameForUri(new Uri(uriString)) + .ShouldBe(expected); + } + + [Fact] + public void build_uri_for_endpoint() + { + var transport = new KafkaTransport(); + new KafkaTopic(transport, "one.two", EndpointRole.Application) + .Uri.ShouldBe(new Uri("Kafka://topic/one.two")); + } + + [Fact] + public void endpoint_name_is_topic_name() + { + var transport = new KafkaTransport(); + new KafkaTopic(transport, "one.two", EndpointRole.Application) + .EndpointName.ShouldBe("one.two"); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs new file mode 100644 index 000000000..b1fa88422 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/NoParallelization.cs @@ -0,0 +1,3 @@ +using Xunit; + +[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)] \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs new file mode 100644 index 000000000..339329bba --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/StatefulResourceSmokeTests.cs @@ -0,0 +1,138 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton; +using Shouldly; + +namespace Wolverine.Kafka.Tests; + +public class StatefulResourceSmokeTests +{ + private IHostBuilder ConfigureBuilder(bool autoProvision, int starting = 1) + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + if (autoProvision) + { + opts.UseKafka("localhost:29092").AutoProvision(); + } + else + { + opts.UseKafka("localhost:29092");; + } + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + + opts.PublishMessage() + .ToKafkaTopic("sr" + starting++); + }); + } + + [Fact] + public async Task run_setup() + { + var result = await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "setup" }); + result.ShouldBe(0); + } + + + [Fact] + public async Task statistics() + { + (await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + var result = await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "statistics" }); + + result.ShouldBe(0); + } + + [Fact] + public async Task check_positive() + { + (await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + var result = await ConfigureBuilder(false) + .RunOaktonCommands(new[] { "resources", "check" }); + + result.ShouldBe(0); + } + + // [Fact] + // public async Task check_negative() + // { + // var result = await ConfigureBuilder(false, 10) + // .RunOaktonCommands(new[] { "resources", "check" }); + // + // result.ShouldBe(1); + // } + + [Fact] + public async Task clear_state() + { + (await ConfigureBuilder(false, 20) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + (await ConfigureBuilder(false, 20) + .RunOaktonCommands(new[] { "resources", "clear" })).ShouldBe(0); + } + + [Fact] + public async Task teardown() + { + (await ConfigureBuilder(false, 30) + .RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0); + + (await ConfigureBuilder(false, 30) + .RunOaktonCommands(new[] { "resources", "teardown" })).ShouldBe(0); + } +} + +public class SRMessage1 +{ +} + +public class SRMessage2 +{ +} + +public class SRMessage3 +{ +} + +public class SRMessage4 +{ +} + +public class SRMessageHandlers +{ + public Task Handle(SRMessage1 message) + { + return Task.Delay(100.Milliseconds()); + } + + public Task Handle(SRMessage2 message) + { + return Task.Delay(100.Milliseconds()); + } + + public Task Handle(SRMessage3 message) + { + return Task.Delay(100.Milliseconds()); + } + + public Task Handle(SRMessage4 message) + { + return Task.Delay(100.Milliseconds()); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs deleted file mode 100644 index cecb10a27..000000000 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Wolverine.Kafka.Tests; - -public class UnitTest1 -{ - [Fact] - public void Test1() - { - } -} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs new file mode 100644 index 000000000..36ea8a1be --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_async.cs @@ -0,0 +1,85 @@ +using System.Diagnostics; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton.Resources; +using Shouldly; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class broadcast_to_topic_async : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _sender; + private IHost _receiver; + + public broadcast_to_topic_async(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.ListenToKafkaTopic("incoming.one"); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + } + + [Fact] + public async Task broadcast() + { + var session = await _sender.TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .ExecuteAndWaitAsync(m => m.BroadcastToTopicAsync("incoming.one", new ColorMessage("blue"))); + + var received = session.Received.SingleMessage(); + received.Color.ShouldBe("blue"); + } + + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + await _receiver.StopAsync(); + } +} + +public class ColorMessage +{ + public ColorMessage() + { + } + + public ColorMessage(string color) + { + Color = color; + } + + public string Color { get; set; } +} + +public static class ColorMessageHandler +{ + public static void Handle(ColorMessage message) + { + Debug.WriteLine("Got " + message.Color); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs new file mode 100644 index 000000000..05688d5d7 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs @@ -0,0 +1,99 @@ +using System.Diagnostics; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Oakton.Resources; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class broadcast_to_topic_rules : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _sender; + private IHost _receiver; + + public broadcast_to_topic_rules(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + + opts.PublishAllMessages().ToKafkaTopics(); + + opts.ServiceName = "sender"; + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + opts.ListenToKafkaTopic("red"); + opts.ListenToKafkaTopic("green"); + opts.ListenToKafkaTopic("blue"); + opts.ListenToKafkaTopic("purple"); + + opts.ServiceName = "receiver"; + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + } + + [Fact] + public async Task route_by_derived_topics_1() + { + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new RedMessage("one")); + + session.Received.SingleEnvelope() + .Destination.ShouldBe(new Uri("kafka://topic/red")); + } + + [Fact] + public async Task route_by_derived_topics_2() + { + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new GreenMessage("one")); + + session.Received.SingleEnvelope() + .Destination.ShouldBe(new Uri("kafka://topic/green")); + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + await _receiver.StopAsync(); + } +} + +[Topic("red")] +public record RedMessage(string Name); + +[Topic("green")] +public record GreenMessage(string Name); + +public static class ColoredMessageHandler +{ + public static void Handle(RedMessage m) => Debug.WriteLine("Got red " + m.Name); + public static void Handle(GreenMessage m) => Debug.WriteLine("Got green " + m.Name); +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs new file mode 100644 index 000000000..c674d152f --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs @@ -0,0 +1,110 @@ +using Microsoft.Extensions.DependencyInjection; +using Oakton.Resources; +using Shouldly; +using TestingSupport; +using TestingSupport.Compliance; +using Wolverine.Runtime; + +namespace Wolverine.Kafka.Tests; + + +public class BufferedComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + + + + public BufferedComplianceFixture() : base(new Uri("kafka://topic/receiver"), 120) + { + + } + + public async Task InitializeAsync() + { + var receiverTopic = "buffered.receiver"; + var senderTopic = "buffered.sender"; + + OutboundAddress = new Uri("kafka://topic/" + receiverTopic); + + await SenderIs(opts => + { + opts.UseKafka("localhost:29092"); + + opts.ListenToKafkaTopic(senderTopic); + + opts.PublishAllMessages().ToKafkaTopic(receiverTopic).BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(); + }); + + await ReceiverIs(opts => + { + opts.UseKafka("localhost:29092"); + + opts.ListenToKafkaTopic(receiverTopic).Named("receiver").BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +[Collection("acceptance")] +public class BufferedSendingAndReceivingCompliance : TransportCompliance +{ + +} + + +public class InlineComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + + public static int Number = 0; + + public InlineComplianceFixture() : base(new Uri("kafka://topic/receiver"), 120) + { + + } + + public async Task InitializeAsync() + { + var receiverTopic = "receiver.inline"; + var senderTopic = "sender.inline"; + + OutboundAddress = new Uri("kafka://topic/" + receiverTopic); + + await SenderIs(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + + opts.ListenToKafkaTopic(senderTopic).UseForReplies(); + + opts.PublishAllMessages().ToKafkaTopic(receiverTopic).SendInline(); + + opts.Services.AddResourceSetupOnStartup(); + }); + + await ReceiverIs(opts => + { + opts.UseKafka("localhost:29092").AutoProvision(); + + opts.ListenToKafkaTopic(receiverTopic).Named("receiver").ProcessInline(); + + opts.Services.AddResourceSetupOnStartup(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +[Collection("acceptance")] +public class InlineSendingAndReceivingCompliance : TransportCompliance +{ + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs new file mode 100644 index 000000000..a68f8fdd9 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs @@ -0,0 +1,9 @@ +using Confluent.Kafka; +using Wolverine.Transports; + +namespace Wolverine.Kafka; + +public interface IKafkaEnvelopeMapper : IEnvelopeMapper, Message> +{ + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs new file mode 100644 index 000000000..b486781e3 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs @@ -0,0 +1,29 @@ +using Confluent.Kafka; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka.Internals; + +public class InlineKafkaSender : ISender +{ + private readonly KafkaTopic _topic; + + public InlineKafkaSender(KafkaTopic topic) + { + _topic = topic; + Destination = topic.Uri; + } + + public bool SupportsNativeScheduledSend => false; + public Uri Destination { get; } + public async Task PingAsync() + { + throw new NotImplementedException(); + } + + public async ValueTask SendAsync(Envelope envelope) + { + var message = _topic.Mapper.CreateMessage(envelope); + using var producer = new ProducerBuilder(_topic.Parent.ProducerConfig).Build(); + await producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs new file mode 100644 index 000000000..4ef937b34 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs @@ -0,0 +1,30 @@ +using System.Text; +using Confluent.Kafka; +using Wolverine.Configuration; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +internal class KafkaEnvelopeMapper : EnvelopeMapper, Message>, IKafkaEnvelopeMapper +{ + public KafkaEnvelopeMapper(Endpoint endpoint) : base(endpoint) + { + } + + protected override void writeOutgoingHeader(Message outgoing, string key, string value) + { + outgoing.Headers.Add(key, Encoding.Default.GetBytes(value)); + } + + protected override bool tryReadIncomingHeader(Message incoming, string key, out string value) + { + if (incoming.Headers.TryGetLastBytes(key, out var bytes)) + { + value = Encoding.Default.GetString(bytes); + return true; + } + + value = default!; + return false; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs new file mode 100644 index 000000000..fd4c1ace7 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -0,0 +1,86 @@ +using Confluent.Kafka; +using JasperFx.Core; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +internal class KafkaListener : IListener, IDisposable +{ + private readonly IConsumer _consumer; + private CancellationTokenSource _cancellation = new(); + private readonly Task _runner; + private readonly ConsumerConfig _config; + private readonly IReceiver _receiver; + + public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver) + { + Address = topic.Uri; + _consumer = new ConsumerBuilder(config).Build(); + var mapper = topic.Mapper; + + _config = config; + _receiver = receiver; + + _runner = Task.Run(async () => + { + _consumer.Subscribe(topic.TopicName); + try + { + while (!_cancellation.IsCancellationRequested) + { + // TODO -- watch that this isn't EnableAutoCommit = false + // TODO -- wrap try catch around this + var result = _consumer.Consume(_cancellation.Token); + var message = result.Message; + + var envelope = mapper.CreateEnvelope(result.Topic, message); + + await receiver.ReceivedAsync(this, envelope); + } + } + catch (OperationCanceledException) + { + // Shutting down + } + finally + { + _consumer.Close(); + } + }, _cancellation.Token); + } + + public ValueTask CompleteAsync(Envelope envelope) + { + if (_config.EnableAutoCommit != null) + { + _consumer.Commit(); + } + + return ValueTask.CompletedTask; + } + + public ValueTask DeferAsync(Envelope envelope) + { + // Really just a retry + return _receiver.ReceivedAsync(this, envelope); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return ValueTask.CompletedTask; + } + + public Uri Address { get; } + public async ValueTask StopAsync() + { + _cancellation.Cancel(); + await _runner; + } + + public void Dispose() + { + _consumer.SafeDispose(); + _runner.Dispose(); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs new file mode 100644 index 000000000..526b6ebff --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -0,0 +1,58 @@ +using Confluent.Kafka; +using JasperFx.Core; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +public class KafkaTransport : BrokerTransport +{ + public Cache Topics { get; } + + public ProducerConfig ProducerConfig { get; } = new(); + public ConsumerConfig ConsumerConfig { get; } = new(); + + public AdminClientConfig AdminClientConfig { get; } = new(); + + public KafkaTransport() : base("kafka", "Kafka Topics") + { + Topics = new Cache(topicName => new KafkaTopic(this, topicName, EndpointRole.Application)); + } + + protected override IEnumerable endpoints() + { + return Topics; + } + + protected override KafkaTopic findEndpointByUri(Uri uri) + { + var topicName = KafkaTopic.TopicNameForUri(uri); + return Topics[topicName]; + } + + + protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) + { + ConsumerConfig.GroupId ??= runtime.Options.ServiceName; + ProducerConfig.ClientId ??= runtime.Options.ServiceName; + + var topics = Topics[KafkaTopic.WolverineTopicsName]; + topics.RoutingType = RoutingMode.ByTopic; + topics.OutgoingRules.Add( + new TopicRoutingRule()); // t + } + + public override ValueTask ConnectAsync(IWolverineRuntime runtime) + { + foreach (var endpoint in Topics) endpoint.Compile(runtime); + + return ValueTask.CompletedTask; + } + + public override IEnumerable DiagnosticColumns() + { + yield break; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs new file mode 100644 index 000000000..2407cd407 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs @@ -0,0 +1,16 @@ +using Wolverine.Configuration; + +namespace Wolverine.Kafka; + +public class KafkaListenerConfiguration : ListenerConfiguration +{ + public KafkaListenerConfiguration(KafkaTopic endpoint) : base(endpoint) + { + } + + public KafkaListenerConfiguration(Func source) : base(source) + { + } + + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs new file mode 100644 index 000000000..2768e395a --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs @@ -0,0 +1,32 @@ +using Confluent.Kafka; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka; + +public class KafkaSenderProtocol : ISenderProtocol +{ + private readonly KafkaTopic _topic; + + public KafkaSenderProtocol(KafkaTopic topic) + { + _topic = topic; + } + + public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch batch) + { + using var producer = new ProducerBuilder(_topic.Parent.ProducerConfig).Build(); + + var tasks = new List(); + foreach (var envelope in batch.Messages) + { + // TODO -- separate try/catch here! + + var message = _topic.Mapper.CreateMessage(envelope); + var task = producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); + tasks.Add(task); + } + + await Task.WhenAll(tasks); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs new file mode 100644 index 000000000..b9216a16a --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs @@ -0,0 +1,11 @@ +using Wolverine.Configuration; + +namespace Wolverine.Kafka; + +public class KafkaSubscriberConfiguration : SubscriberConfiguration +{ + internal KafkaSubscriberConfiguration(KafkaTopic endpoint) : base(endpoint) + { + } + +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs new file mode 100644 index 000000000..09810710b --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -0,0 +1,108 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Logging; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka; + +public class KafkaTopic : Endpoint, IBrokerEndpoint +{ + // Strictly an identifier for the endpoint + public const string WolverineTopicsName = "wolverine.topics"; + + public KafkaTransport Parent { get; } + + public KafkaTopic(KafkaTransport parent, string topicName, EndpointRole role) : base(new Uri("kafka://topic/" + topicName), role) + { + Parent = parent; + EndpointName = topicName; + TopicName = topicName; + + Mapper = new KafkaEnvelopeMapper(this); + } + + public override bool AutoStartSendingAgent() + { + return true; + } + + public string TopicName { get; } + + public IKafkaEnvelopeMapper Mapper { get; set; } + + public static string TopicNameForUri(Uri uri) + { + return uri.Segments.Last().Trim('/'); + } + + public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) + { + var listener = new KafkaListener(this, Parent.ConsumerConfig, receiver); + return ValueTask.FromResult((IListener)listener); + } + + protected override ISender CreateSender(IWolverineRuntime runtime) + { + return Mode == EndpointMode.Inline + ? new InlineKafkaSender(this) + : new BatchedSender(this, new KafkaSenderProtocol(this), runtime.Cancellation, + runtime.LoggerFactory.CreateLogger()); + } + + public async ValueTask CheckAsync() + { + if (TopicName == WolverineTopicsName) return true; // don't care, this is just a marker + try + { + using var client = new ProducerBuilder(Parent.ProducerConfig).Build(); + await client.ProduceAsync(TopicName, new Message + { + Key = "ping", + Value = "ping" + }); + + + return true; + } + catch (Exception) + { + return false; + } + } + + public async ValueTask TeardownAsync(ILogger logger) + { + if (TopicName == WolverineTopicsName) return; // don't care, this is just a marker + using var client = new AdminClientBuilder(Parent.AdminClientConfig).Build(); + await client.DeleteTopicsAsync(new string[] { TopicName }); + } + + public async ValueTask SetupAsync(ILogger logger) + { + if (TopicName == WolverineTopicsName) return; // don't care, this is just a marker + + using var client = new AdminClientBuilder(Parent.AdminClientConfig).Build(); + + try + { + await client.CreateTopicsAsync(new[] + { + new TopicSpecification + { + Name = TopicName + } + }); + + logger.LogInformation("Created Kafka topic {Topic}", TopicName); + } + catch (CreateTopicsException e) + { + if (e.Message.Contains("already exists.")) return; + throw; + } + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs new file mode 100644 index 000000000..6d0de953e --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs @@ -0,0 +1,72 @@ +using Confluent.Kafka; +using Wolverine.Kafka.Internals; + +namespace Wolverine.Kafka; + +public class KafkaTransportExpression +{ + private readonly KafkaTransport _transport; + + internal KafkaTransportExpression(KafkaTransport transport) + { + _transport = transport; + } + + /// + /// Configure both the producer and consumer config of the underlying Kafka client + /// + /// + /// + public KafkaTransportExpression ConfigureClient(Action configure) + { + configure(_transport.ConsumerConfig); + configure(_transport.ProducerConfig); + configure(_transport.AdminClientConfig); + + return this; + } + + /// + /// Configure the Kafka message producers within the Wolverine transport + /// + /// + /// + public KafkaTransportExpression ConfigureProducers(Action configure) + { + configure(_transport.ProducerConfig); + return this; + } + + /// + /// Configure the Kafka message consumers within the Wolverine transport + /// + /// + /// + public KafkaTransportExpression ConfigureConsumers(Action configure) + { + configure(_transport.ConsumerConfig); + return this; + } + + /// + /// Create newly used Kafka topics on endpoint activation if the topic is missing + /// + /// + /// + public KafkaTransportExpression AutoProvision(Action? configure = null) + { + _transport.AutoProvision = true; + configure?.Invoke(_transport.AdminClientConfig); + return this; + } + + /// + /// Deletes and rebuilds topics on application startup + /// + /// + public KafkaTransportExpression DeleteExistingTopicsOnStartup() + { + _transport.AutoPurgeAllQueues = true; + return this; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs new file mode 100644 index 000000000..51f0328a8 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -0,0 +1,128 @@ +using System.Text; +using Confluent.Kafka; +using JasperFx.Core.Reflection; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; + +namespace Wolverine.Kafka; + +public static class KafkaTransportExtensions +{ + /// + /// Quick access to the Kafka Transport within this application. + /// This is for advanced usage + /// + /// + /// + internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints) + { + var transports = endpoints.As().Transports; + + return transports.GetOrCreate(); + } + + /// + /// Add a connection to an Kafka broker within this application + /// + /// + /// + /// + public static KafkaTransportExpression UseKafka(this WolverineOptions options, string bootstrapServers) + { + var transport = options.KafkaTransport(); + transport.ConsumerConfig.BootstrapServers = bootstrapServers; + transport.ProducerConfig.BootstrapServers = bootstrapServers; + transport.AdminClientConfig.BootstrapServers = bootstrapServers; + + return new KafkaTransportExpression(transport); + } + + + /// + /// Listen for incoming messages at the designated Kafka topic name + /// + /// + /// The name of the Rabbit MQ queue + /// + /// Optional configuration for this Rabbit Mq queue if being initialized by Wolverine + /// + public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOptions endpoints, string topicName) + { + var transport = endpoints.KafkaTransport(); + + var endpoint = transport.Topics[topicName]; + endpoint.EndpointName = topicName; + endpoint.IsListener = true; + + return new KafkaListenerConfiguration(endpoint); + } + + /// + /// Publish messages to an Kafka topic + /// + /// + /// + /// + public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName) + { + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + + var topic = transport.Topics[topicName]; + + // This is necessary unfortunately to hook up the subscription rules + publishing.To(topic.Uri); + + return new KafkaSubscriberConfiguration(topic); + } + + /// + /// Publish messages to Kafka topics based on Wolverine's rules for deriving topic + /// names from a message type + /// + /// + /// + /// + public static KafkaSubscriberConfiguration ToKafkaTopics(this IPublishToExpression publishing) + { + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + + var topic = transport.Topics[KafkaTopic.WolverineTopicsName]; + + // This is necessary unfortunately to hook up the subscription rules + publishing.To(topic.Uri); + + return new KafkaSubscriberConfiguration(topic); + } + + internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message message) + { + var envelope = new Envelope + { + GroupId = message.Key, + Data = Encoding.Default.GetBytes(message.Value), + TopicName = topicName + }; + + message.Headers ??= new Headers(); // prevent NRE + + mapper.MapIncomingToEnvelope(envelope, message); + + return envelope; + } + + internal static Message CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope) + { + var message = new Message + { + Key = envelope.GroupId, + Value = Encoding.Default.GetString(envelope.Data), + Headers = new Headers() + }; + + mapper.MapEnvelopeToOutgoing(envelope, message); + + return message; + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj index 9e54942fc..fbcde2f34 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj +++ b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj @@ -1,7 +1,16 @@ - + Kafka Transport for Wolverine Messaging Systems + portable + WolverineFx.Kafka + false + false + false + false + false + enable + Wolverine.Kafka diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs index 8ff4bc311..49aa0b55a 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/MqttTransportTests.cs @@ -24,4 +24,12 @@ public void build_uri_for_endpoint() new MqttTopic("one/two", transport, EndpointRole.Application) .Uri.ShouldBe(new Uri("mqtt://topic/one/two")); } + + [Fact] + public void endpoint_name_is_topic_name() + { + var transport = new MqttTransport(); + new MqttTopic("one/two", transport, EndpointRole.Application) + .EndpointName.ShouldBe("one/two"); + } } \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs index 20fc5c488..594daeb75 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs @@ -24,6 +24,8 @@ public MqttTopic(string topicName, MqttTransport parent, EndpointRole role) : ba TopicName = topicName.Trim('/'); Parent = parent; + EndpointName = topicName; + EnvelopeMapper = new MqttEnvelopeMapper(this); Mode = EndpointMode.BufferedInMemory; } diff --git a/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs b/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs index 3b63e7bb0..42938471c 100644 --- a/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs +++ b/src/Wolverine/Runtime/RemoteInvocation/FailureAcknowledgement.cs @@ -1,8 +1,15 @@  +using System.Diagnostics; + namespace Wolverine.Runtime.RemoteInvocation; public class FailureAcknowledgement { + public FailureAcknowledgement() + { + Debug.WriteLine("Why?"); + } + public Guid RequestId { get; init; } public string Message { get; init; } = null!; diff --git a/src/Wolverine/Transports/BrokerResource.cs b/src/Wolverine/Transports/BrokerResource.cs index 3770a6c2c..07776cbc5 100644 --- a/src/Wolverine/Transports/BrokerResource.cs +++ b/src/Wolverine/Transports/BrokerResource.cs @@ -95,6 +95,8 @@ public async Task DetermineStatus(CancellationToken token) }; var columns = _transport.DiagnosticColumns().ToArray(); + if (!columns.Any()) return table; + foreach (var column in columns) table.AddColumn(new TableColumn(column.Header) { Alignment = column.Alignment });