diff --git a/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt b/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt index 869ec0f..1993cc1 100644 --- a/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt +++ b/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt @@ -1,9 +1,7 @@ package no.nav.fo.veilarbvarsel.config.kafka -import no.nav.common.kafka.consumer.KafkaConsumerClient import no.nav.common.kafka.consumer.TopicConsumer import no.nav.common.kafka.consumer.util.ConsumerUtils.jsonConsumer -import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder import no.nav.common.kafka.util.KafkaPropertiesPreset.onPremDefaultConsumerProperties import no.nav.common.utils.NaisUtils.getCredentials import no.nav.fo.veilarbvarsel.config.KafkaEnvironment @@ -27,28 +25,19 @@ abstract class KafkaConsumerWrapper( private val props = Properties() - val consumerClient: KafkaConsumerClient + //val consumerClient: KafkaConsumerClient private var shutdown = false private var running = false init { - val credentials = getCredentials("service_user") - - consumerClient = KafkaConsumerClientBuilder.builder() - .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)) - .withConsumers(topicConsumers()) - .build() + //val credentials = getCredentials("service_user") - -// props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = env.bootstrapServers -// props["group.id"] = systemUser -// props["key.deserializer"] = StringDeserializer::class.java -// props["value.deserializer"] = KafkaEventDeserializer::class.java -// props["max.poll.records"] = 1 -// props["max.partition.fetch.bytes"] = 1048576 / 2 -// props["auto.offset.reset"] = "earliest" +// consumerClient = KafkaConsumerClientBuilder.builder() +// .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)) +// .withConsumers(topicConsumers()) +// .build() } abstract fun handle(data: V) @@ -66,46 +55,52 @@ abstract class KafkaConsumerWrapper( override fun run() { logger.info("Starting Kafka Consumer on topics $topics") - consumerClient.start() + //consumerClient.start() running = true -// val credentials = getCredentials("service_user") -// val consumer = KafkaConsumer(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)).apply { -// subscribe(listOf(topics)) -// } -// -// consumer.use { -// try { -// while (!shutdown) { -// val records = consumer.poll(Duration.ofMillis(5000)) -// -// logger.info("Getting records from $topics. size: ${records.count()}") -// -// records.iterator().forEach { -// handle(it.value()) -// } -// } -// } catch (e: Exception) { -// logger.error("Got exception", e) -// } -// -// logger.info("Outside while loop?") -// } -// -// logger.info("End of run. $shutdown") -// consumer.close() + val credentials = getCredentials("service_user") + val consumer = KafkaConsumer( + onPremDefaultConsumerProperties( + CONSUMER_GROUP_ID, + env.bootstrapServers, + credentials + ) + ).apply { + subscribe(listOf(topics)) + } + + consumer.use { + try { + while (!shutdown) { + val records = consumer.poll(Duration.ofMillis(5000)) + + logger.info("Getting records from $topics. size: ${records.count()}") + + records.iterator().forEach { + handle(it.value()) + } + } + } catch (e: Exception) { + logger.error("Got exception", e) + } + + logger.info("Outside while loop?") + } + + logger.info("End of run. $shutdown") + consumer.close() running = false } override fun close() { logger.info("Closing Kafka Consumer on topics $topics...") - consumerClient.stop() + //consumerClient.stop() -// shutdown = true -// while (running) { -// Thread.sleep(100) -// } + shutdown = true + while (running) { + Thread.sleep(100) + } logger.info("Kafka Consumer on topics $topics closed!") } diff --git a/src/main/kotlin/no/nav/fo/veilarbvarsel/main.kt b/src/main/kotlin/no/nav/fo/veilarbvarsel/main.kt index f6e82b8..95f8a9d 100644 --- a/src/main/kotlin/no/nav/fo/veilarbvarsel/main.kt +++ b/src/main/kotlin/no/nav/fo/veilarbvarsel/main.kt @@ -32,9 +32,9 @@ fun Application.mainModule(appContext: ApplicationContext = ApplicationContext() } } -// install(BackgroundJob.BackgroundJobFeature("Events Consumer")) { -// job = appContext.eventConsumer -// } + install(BackgroundJob.BackgroundJobFeature("Events Consumer")) { + job = appContext.eventConsumer + } - appContext.eventConsumer.run() + //appContext.eventConsumer.run() }