Skip to content

Commit

Permalink
Add flow consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemyev Vyacheslav authored and Artemyev Vyacheslav committed Nov 7, 2023
1 parent d20fddb commit 4ba6ab0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import com.rabbitmq.client.Delivery
import com.viartemev.thewhiterabbit.exception.AcknowledgeException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import mu.KotlinLogging
Expand All @@ -20,7 +23,7 @@ private val logger = KotlinLogging.logger {}
*
*/
class ConfirmConsumer internal constructor(
private val amqpChannel: Channel, amqpQueue: String, private val prefetchSize: Int
private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int
) : Closeable {
private val deliveries = KChannel<Delivery>(prefetchSize)

Expand All @@ -38,7 +41,6 @@ class ConfirmConsumer internal constructor(
}
}, { consumerTag ->
logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
//FIXME do we need to cancel the channel?
deliveries.cancel()
})
}
Expand Down Expand Up @@ -85,8 +87,9 @@ class ConfirmConsumer internal constructor(
}

override fun close() {
logger.debug { "closing ConfirmConsumer" }
logger.debug { "Shutting down consumer" }
amqpChannel.basicCancel(consTag)
//FIXME Additional cancellation?
deliveries.cancel()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.viartemev.thewhiterabbit.consumer.flow

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import mu.KotlinLogging

private val logger = KotlinLogging.logger {}

class ConsumerFlow(
private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int
) {

//TODO exception handling
suspend fun consumerAutoAckFlow(): Flow<Delivery> = callbackFlow {
amqpChannel.basicQos(prefetchSize, false)
val tag = amqpChannel.basicConsume(amqpQueue, true, { consumerTag, message ->
trySendBlocking(message)
}, { consumerTag ->
channel.close()
})
awaitClose {
amqpChannel.basicCancel(tag)
amqpChannel.close()
}
}

//TODO exception handling
suspend fun consumerConfirmAckFlow(): Flow<Delivery> = callbackFlow {
amqpChannel.basicQos(prefetchSize, false)
val tag = amqpChannel.basicConsume(amqpQueue, false, { consumerTag, message ->
trySendBlocking(message)
amqpChannel.basicAck(message.envelope.deliveryTag, false)
}, { consumerTag ->
channel.close()
})
awaitClose {
amqpChannel.basicCancel(tag)
amqpChannel.close()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,24 @@ class ConfirmConsumerTest : AbstractTestContainersTest() {
val sender = launch {
publish {
while (isActive) {
delay(100)
delay(1000)
publishWithConfirm(message)
}
}
}
consume(QUEUE_NAME, 2) {
consumeMessagesWithConfirm {
println("Consuming message: ${it.body}")
delay(5000)
counter.getAndAdd(String(it.body).toInt())
val consumer = launch {
consume(QUEUE_NAME, 2) {
consumeMessagesWithConfirm {
println("Consuming message: ${it.body}")
delay(1000)
counter.getAndAdd(String(it.body).toInt())
}
}
}
delay(50000)
delay(5000)
println("Shouting down...")
sender.cancel()
consumer.cancel()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.viartemev.thewhiterabbit.consumer.flow

import com.viartemev.thewhiterabbit.AbstractTestContainersTest
import com.viartemev.thewhiterabbit.channel.confirmChannel
import com.viartemev.thewhiterabbit.channel.publish
import com.viartemev.thewhiterabbit.queue.QueueSpecification
import com.viartemev.thewhiterabbit.queue.declareQueue
import com.viartemev.thewhiterabbit.utils.createMessage
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test

class ConsumerFlowTest : AbstractTestContainersTest() {
private val QUEUE_NAME = "test_queue"

@Test
fun testAutoAckFlow(): Unit = runBlocking {
factory.newConnection().use { connection ->
connection.confirmChannel {
declareQueue(QueueSpecification(QUEUE_NAME))
publish {
(1..10).map { createMessage(queue = QUEUE_NAME, body = "1") }
.map { m -> async { publishWithConfirm(m) } }.awaitAll()
}
ConsumerFlow(this, QUEUE_NAME, 2)
.consumerAutoAckFlow()
.take(10)
.collect { delivery -> println(String(delivery.body)) }
}
}
}

@Test
fun testConfirmAckFlow(): Unit = runBlocking {
factory.newConnection().use { connection ->
connection.confirmChannel {
declareQueue(QueueSpecification(QUEUE_NAME))
publish {
(1..10).map { createMessage(queue = QUEUE_NAME, body = "1") }
.map { m -> async { publishWithConfirm(m) } }.awaitAll()
}
ConsumerFlow(this, QUEUE_NAME, 2)
.consumerConfirmAckFlow()
.take(10)
.collect { delivery -> println(String(delivery.body)) }
}
}
}
}
15 changes: 15 additions & 0 deletions src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>
</encoder>
</appender>

<logger name="com.viartemev" level="TRACE"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

</configuration>

0 comments on commit 4ba6ab0

Please sign in to comment.