diff --git a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt index 0667897..64fa0b2 100644 --- a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt +++ b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt @@ -11,35 +11,57 @@ import mu.KotlinLogging private val logger = KotlinLogging.logger {} class ConsumerFlow( - private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int + private val amqpChannel: Channel, private val amqpQueue: String ) { //TODO exception handling - suspend fun consumerAutoAckFlow(): Flow = callbackFlow { + suspend fun consumerAutoAckFlow(prefetchSize: Int): Flow = callbackFlow { amqpChannel.basicQos(prefetchSize, false) val tag = amqpChannel.basicConsume(amqpQueue, true, { consumerTag, message -> + logger.debug { "Trying to send a message from the flow consumer to the channel" } trySendBlocking(message) + logger.debug { "The message was successfully sent to the channel" } }, { consumerTag -> + logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" } channel.close() }) awaitClose { amqpChannel.basicCancel(tag) - amqpChannel.close() } } - //TODO exception handling - suspend fun consumerConfirmAckFlow(): Flow = callbackFlow { - amqpChannel.basicQos(prefetchSize, false) - val tag = amqpChannel.basicConsume(amqpQueue, false, { consumerTag, message -> - trySendBlocking(message) - amqpChannel.basicAck(message.envelope.deliveryTag, false) - }, { consumerTag -> - channel.close() - }) + /** + * The consumerConfirmAckFlow function creates a cold Flow that consumes messages from an AMQP queue using a provided amqpChannel. + * The messages are not automatically acknowledged after being received. + * Instead, acknowledgments are manually sent to the AMQP server after the messages are successfully emitted to the downstream flow collector. + * + */ + suspend fun consumerConfirmAckFlow(prefetchSize: Int = 0): Flow = callbackFlow { + if (prefetchSize != 0) { + amqpChannel.basicQos(prefetchSize, false) + } + val deliverCallback: (consumerTag: String, message: Delivery) -> Unit = { _, message -> + try { + logger.debug { "Trying to send a message from the flow consumer to the flow" } + trySendBlocking(message) + logger.debug { "The message was successfully sent to the flow" } + amqpChannel.basicAck(message.envelope.deliveryTag, false) + logger.debug { "The message was successfully acknowledged" } + } catch (e: Exception) { + logger.error(e) { "Caught exception while delivering the message" } + close(e) + } + } + val cancelCallback: (consumerTag: String) -> Unit = { consumerTag -> channel.close() } + val tag = amqpChannel.basicConsume(amqpQueue, false, deliverCallback, cancelCallback) awaitClose { - amqpChannel.basicCancel(tag) - amqpChannel.close() + try { + logger.debug { "Cancelling consumer#$tag" } + amqpChannel.basicCancel(tag) + } catch (e: Exception) { + logger.error(e) { "Can't cancel consumer#$tag" } + channel.close() + } } } diff --git a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/main.kt b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/main.kt new file mode 100644 index 0000000..a07ced2 --- /dev/null +++ b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/main.kt @@ -0,0 +1,22 @@ +package com.viartemev.thewhiterabbit.consumer.flow + +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.cancellable +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runBlocking +import kotlin.coroutines.cancellation.CancellationException + +fun main() = runBlocking { + flow { + for (i in 1..5) { + if (i == 5) { + println("Flow is cancelling") + throw CancellationException("Cancelled by choice") + } + emit(i) + delay(100) + } + }.cancellable() + .collect { value -> println(value) } + println("Flow is completed") +} diff --git a/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt b/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt index bd12855..5181737 100644 --- a/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt +++ b/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt @@ -8,6 +8,8 @@ import com.viartemev.thewhiterabbit.queue.declareQueue import com.viartemev.thewhiterabbit.utils.createMessage import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.flow.cancellable +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.take import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test @@ -24,9 +26,7 @@ class ConsumerFlowTest : AbstractTestContainersTest() { (1..10).map { createMessage(queue = QUEUE_NAME, body = "1") } .map { m -> async { publishWithConfirm(m) } }.awaitAll() } - ConsumerFlow(this, QUEUE_NAME, 2) - .consumerAutoAckFlow() - .take(10) + ConsumerFlow(this, QUEUE_NAME).consumerAutoAckFlow(2).take(10) .collect { delivery -> println(String(delivery.body)) } } } @@ -38,13 +38,17 @@ class ConsumerFlowTest : AbstractTestContainersTest() { connection.confirmChannel { declareQueue(QueueSpecification(QUEUE_NAME)) publish { - (1..10).map { createMessage(queue = QUEUE_NAME, body = "1") } + (1..10).map { i -> createMessage(queue = QUEUE_NAME, body = i.toString()) } .map { m -> async { publishWithConfirm(m) } }.awaitAll() } - ConsumerFlow(this, QUEUE_NAME, 2) - .consumerConfirmAckFlow() + ConsumerFlow(this, QUEUE_NAME) + .consumerConfirmAckFlow(2) .take(10) - .collect { delivery -> println(String(delivery.body)) } + //.cancellable() + .catch { e -> println("Caught exception: $e") } + .collect { delivery -> + println("Delivery is $delivery") + } } } }