Skip to content

Commit

Permalink
add consumerConfirmAckFlow method
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemyev Vyacheslav authored and Artemyev Vyacheslav committed Nov 7, 2023
1 parent 4ba6ab0 commit c818bd2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,57 @@ import mu.KotlinLogging
private val logger = KotlinLogging.logger {}

class ConsumerFlow(
private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int
private val amqpChannel: Channel, private val amqpQueue: String
) {

//TODO exception handling
suspend fun consumerAutoAckFlow(): Flow<Delivery> = callbackFlow {
suspend fun consumerAutoAckFlow(prefetchSize: Int): Flow<Delivery> = callbackFlow {
amqpChannel.basicQos(prefetchSize, false)
val tag = amqpChannel.basicConsume(amqpQueue, true, { consumerTag, message ->
logger.debug { "Trying to send a message from the flow consumer to the channel" }
trySendBlocking(message)
logger.debug { "The message was successfully sent to the channel" }
}, { consumerTag ->
logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
channel.close()
})
awaitClose {
amqpChannel.basicCancel(tag)
amqpChannel.close()
}
}

//TODO exception handling
suspend fun consumerConfirmAckFlow(): Flow<Delivery> = callbackFlow {
amqpChannel.basicQos(prefetchSize, false)
val tag = amqpChannel.basicConsume(amqpQueue, false, { consumerTag, message ->
trySendBlocking(message)
amqpChannel.basicAck(message.envelope.deliveryTag, false)
}, { consumerTag ->
channel.close()
})
/**
* The consumerConfirmAckFlow function creates a cold Flow that consumes messages from an AMQP queue using a provided amqpChannel.
* The messages are not automatically acknowledged after being received.
* Instead, acknowledgments are manually sent to the AMQP server after the messages are successfully emitted to the downstream flow collector.
*
*/
suspend fun consumerConfirmAckFlow(prefetchSize: Int = 0): Flow<Delivery> = callbackFlow {
if (prefetchSize != 0) {
amqpChannel.basicQos(prefetchSize, false)
}
val deliverCallback: (consumerTag: String, message: Delivery) -> Unit = { _, message ->
try {
logger.debug { "Trying to send a message from the flow consumer to the flow" }
trySendBlocking(message)
logger.debug { "The message was successfully sent to the flow" }
amqpChannel.basicAck(message.envelope.deliveryTag, false)
logger.debug { "The message was successfully acknowledged" }
} catch (e: Exception) {
logger.error(e) { "Caught exception while delivering the message" }
close(e)
}
}
val cancelCallback: (consumerTag: String) -> Unit = { consumerTag -> channel.close() }
val tag = amqpChannel.basicConsume(amqpQueue, false, deliverCallback, cancelCallback)
awaitClose {
amqpChannel.basicCancel(tag)
amqpChannel.close()
try {
logger.debug { "Cancelling consumer#$tag" }
amqpChannel.basicCancel(tag)
} catch (e: Exception) {
logger.error(e) { "Can't cancel consumer#$tag" }
channel.close()
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/main.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.viartemev.thewhiterabbit.consumer.flow

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.cancellation.CancellationException

fun main() = runBlocking {
flow {
for (i in 1..5) {
if (i == 5) {
println("Flow is cancelling")
throw CancellationException("Cancelled by choice")
}
emit(i)
delay(100)
}
}.cancellable()
.collect { value -> println(value) }
println("Flow is completed")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.viartemev.thewhiterabbit.queue.declareQueue
import com.viartemev.thewhiterabbit.utils.createMessage
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
Expand All @@ -24,9 +26,7 @@ class ConsumerFlowTest : AbstractTestContainersTest() {
(1..10).map { createMessage(queue = QUEUE_NAME, body = "1") }
.map { m -> async { publishWithConfirm(m) } }.awaitAll()
}
ConsumerFlow(this, QUEUE_NAME, 2)
.consumerAutoAckFlow()
.take(10)
ConsumerFlow(this, QUEUE_NAME).consumerAutoAckFlow(2).take(10)
.collect { delivery -> println(String(delivery.body)) }
}
}
Expand All @@ -38,13 +38,17 @@ class ConsumerFlowTest : AbstractTestContainersTest() {
connection.confirmChannel {
declareQueue(QueueSpecification(QUEUE_NAME))
publish {
(1..10).map { createMessage(queue = QUEUE_NAME, body = "1") }
(1..10).map { i -> createMessage(queue = QUEUE_NAME, body = i.toString()) }
.map { m -> async { publishWithConfirm(m) } }.awaitAll()
}
ConsumerFlow(this, QUEUE_NAME, 2)
.consumerConfirmAckFlow()
ConsumerFlow(this, QUEUE_NAME)
.consumerConfirmAckFlow(2)
.take(10)
.collect { delivery -> println(String(delivery.body)) }
//.cancellable()
.catch { e -> println("Caught exception: $e") }
.collect { delivery ->
println("Delivery is $delivery")
}
}
}
}
Expand Down

0 comments on commit c818bd2

Please sign in to comment.