Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemyev Vyacheslav authored and Artemyev Vyacheslav committed Nov 7, 2023
1 parent 2ea8a77 commit 5e894b2
Showing 1 changed file with 39 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,81 @@ package com.viartemev.thewhiterabbit.consumer

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import com.viartemev.thewhiterabbit.exception.AcknowledgeException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import mu.KotlinLogging
import java.io.Closeable
import java.io.IOException
import kotlinx.coroutines.channels.Channel as KChannel

private val logger = KotlinLogging.logger {}

/**
*
*/
class ConfirmConsumer internal constructor(
private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int
private val amqpChannel: Channel, amqpQueue: String, private val prefetchSize: Int
) : Closeable {
private val deliveries = KChannel<Delivery>(prefetchSize)

private val consTag: String

init {
amqpChannel.basicQos(prefetchSize, false)
consTag = amqpChannel.basicConsume(amqpQueue, false, { consumerTag, message ->
try {
logger.debug { "Trying to send a message from the consumer to the channel" }
deliveries.trySendBlocking(message)
logger.debug { "The message was successfully sent to the channel" }
} catch (e: Exception) {
logger.error(e) { "Can't send a message. Consumer $consumerTag has been cancelled" }
}
}, { consumerTag ->
logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
deliveries.cancel()
})
consTag = amqpChannel.basicConsume(amqpQueue, false, ::handleDelivery, ::handleCancel)
}

suspend fun consumeMessageWithConfirm(handler: suspend (Delivery) -> Unit) {
private fun handleDelivery(consumerTag: String, message: Delivery) {
try {
logger.debug { "Trying to receive a message from the channel" }
val delivery = deliveries.receive()
logger.debug { "The message was received from the channel" }
val deliveryTag = delivery.envelope.deliveryTag
handler(delivery)
try {
//TODO with context?
amqpChannel.basicAck(deliveryTag, false)
//TODO fix exception handling
} catch (e: IOException) {
val errorMessage = "Can't ack a message with deliveryTag: $deliveryTag"
logger.error { errorMessage }
throw AcknowledgeException(errorMessage)
}
//TODO exception handling
logger.debug { "Trying to send a message from the consumer to the channel" }
deliveries.trySendBlocking(message)
logger.debug { "The message was successfully sent to the channel" }
} catch (e: Exception) {
when (e) {
is ClosedReceiveChannelException -> throw CancellationException()
else -> throw e
}
logger.error(e) { "Can't send a message. Consumer $consumerTag has been cancelled" }
}
}

//TODO context + dispatcher
//TODO test cancellation & exception handling
private fun handleCancel(consumerTag: String) {
logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
deliveries.close()
}

suspend fun consumeMessageWithConfirm(handler: suspend (Delivery) -> Unit) = coroutineScope {
logger.debug { "Trying to receive a message from the channel" }
val delivery = deliveries.receive()
logger.debug { "The message was received from the channel" }
val deliveryTag = delivery.envelope.deliveryTag
try {
handler(delivery)
amqpChannel.basicAck(deliveryTag, false)
} catch (e: Exception) {
val errorMessage = "Can't ack a message with deliveryTag: $deliveryTag"
logger.error(e) { errorMessage }
cancel(errorMessage, e)
}
}

suspend fun consumeMessagesWithConfirm(handler: suspend (Delivery) -> Unit) = coroutineScope {
val channel = KChannel<Unit>(prefetchSize)
val semaphore = Semaphore(prefetchSize)
while (isActive) {
channel.send(Unit)
semaphore.acquire()
launch {
try {
consumeMessageWithConfirm(handler)
} finally {
channel.receive()
semaphore.release()
}
}
}
}

override fun close() {
logger.debug { "Shutting down consumer" }
amqpChannel.basicCancel(consTag)
//FIXME Additional cancellation?
deliveries.cancel()
try {
logger.debug { "Cancelling consumer#$consTag" }
amqpChannel.basicCancel(consTag)
} catch (e: Exception) {
logger.error(e) { "Can't cancel consumer#$consTag" }
deliveries.close()
}
}
}

0 comments on commit 5e894b2

Please sign in to comment.