From 9fd184490ed9325b619c806ce69e2a10c21dd4cb Mon Sep 17 00:00:00 2001 From: Jorge Aguilera Date: Thu, 27 Oct 2022 14:51:04 +0200 Subject: [PATCH] close consumer once completed Signed-off-by: Jorge Aguilera --- .../main/nextflow/events/kafa/TopicHandler.groovy | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy b/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy index 05979db..cca105a 100644 --- a/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy +++ b/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy @@ -72,7 +72,12 @@ class TopicHandler { TopicHandler perform() { createConsumer() - listening ? runAsync() : consume() + if( listening ) { + runAsync() + }else{ + consume() + closeConsumer() + } return this } @@ -92,6 +97,10 @@ class TopicHandler { consumer } + void closeConsumer(){ + consumer.close() + } + void consume(){ try { final records = consumer.poll(duration) @@ -114,6 +123,8 @@ class TopicHandler { log.trace "Closing $topic kafka thread" }catch(Exception e){ log.error "Exception reading kafka topic $topic",e + }finally{ + closeConsumer() } }) }