From 5e894b247bdcf22e94a5f37d94b48ebac6bf3f8d Mon Sep 17 00:00:00 2001 From: Artemyev Vyacheslav Date: Tue, 7 Nov 2023 19:16:07 +0500 Subject: [PATCH] Refactoring --- .../consumer/ConfirmConsumer.kt | 91 ++++++++----------- 1 file changed, 39 insertions(+), 52 deletions(-) diff --git a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt index dc40b3d..249b18c 100644 --- a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt +++ b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt @@ -2,28 +2,20 @@ package com.viartemev.thewhiterabbit.consumer import com.rabbitmq.client.Channel import com.rabbitmq.client.Delivery -import com.viartemev.thewhiterabbit.exception.AcknowledgeException -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.channels.ClosedReceiveChannelException -import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore import mu.KotlinLogging import java.io.Closeable -import java.io.IOException import kotlinx.coroutines.channels.Channel as KChannel private val logger = KotlinLogging.logger {} -/** - * - */ class ConfirmConsumer internal constructor( - private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int + private val amqpChannel: Channel, amqpQueue: String, private val prefetchSize: Int ) : Closeable { private val deliveries = KChannel(prefetchSize) @@ -31,65 +23,60 @@ class ConfirmConsumer internal constructor( init { amqpChannel.basicQos(prefetchSize, false) - consTag = amqpChannel.basicConsume(amqpQueue, false, { consumerTag, message -> - try { - logger.debug { "Trying to send a message from the consumer to the channel" } - deliveries.trySendBlocking(message) - logger.debug { "The message was successfully sent to the channel" } - } catch (e: Exception) { - logger.error(e) { "Can't send a message. Consumer $consumerTag has been cancelled" } - } - }, { consumerTag -> - logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" } - deliveries.cancel() - }) + consTag = amqpChannel.basicConsume(amqpQueue, false, ::handleDelivery, ::handleCancel) } - suspend fun consumeMessageWithConfirm(handler: suspend (Delivery) -> Unit) { + private fun handleDelivery(consumerTag: String, message: Delivery) { try { - logger.debug { "Trying to receive a message from the channel" } - val delivery = deliveries.receive() - logger.debug { "The message was received from the channel" } - val deliveryTag = delivery.envelope.deliveryTag - handler(delivery) - try { - //TODO with context? - amqpChannel.basicAck(deliveryTag, false) - //TODO fix exception handling - } catch (e: IOException) { - val errorMessage = "Can't ack a message with deliveryTag: $deliveryTag" - logger.error { errorMessage } - throw AcknowledgeException(errorMessage) - } - //TODO exception handling + logger.debug { "Trying to send a message from the consumer to the channel" } + deliveries.trySendBlocking(message) + logger.debug { "The message was successfully sent to the channel" } } catch (e: Exception) { - when (e) { - is ClosedReceiveChannelException -> throw CancellationException() - else -> throw e - } + logger.error(e) { "Can't send a message. Consumer $consumerTag has been cancelled" } } } - //TODO context + dispatcher - //TODO test cancellation & exception handling + private fun handleCancel(consumerTag: String) { + logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" } + deliveries.close() + } + + suspend fun consumeMessageWithConfirm(handler: suspend (Delivery) -> Unit) = coroutineScope { + logger.debug { "Trying to receive a message from the channel" } + val delivery = deliveries.receive() + logger.debug { "The message was received from the channel" } + val deliveryTag = delivery.envelope.deliveryTag + try { + handler(delivery) + amqpChannel.basicAck(deliveryTag, false) + } catch (e: Exception) { + val errorMessage = "Can't ack a message with deliveryTag: $deliveryTag" + logger.error(e) { errorMessage } + cancel(errorMessage, e) + } + } + suspend fun consumeMessagesWithConfirm(handler: suspend (Delivery) -> Unit) = coroutineScope { - val channel = KChannel(prefetchSize) + val semaphore = Semaphore(prefetchSize) while (isActive) { - channel.send(Unit) + semaphore.acquire() launch { try { consumeMessageWithConfirm(handler) } finally { - channel.receive() + semaphore.release() } } } } override fun close() { - logger.debug { "Shutting down consumer" } - amqpChannel.basicCancel(consTag) - //FIXME Additional cancellation? - deliveries.cancel() + try { + logger.debug { "Cancelling consumer#$consTag" } + amqpChannel.basicCancel(consTag) + } catch (e: Exception) { + logger.error(e) { "Can't cancel consumer#$consTag" } + deliveries.close() + } } }