diff --git a/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy b/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy index 05979db..cca105a 100644 --- a/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy +++ b/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy @@ -72,7 +72,12 @@ class TopicHandler { TopicHandler perform() { createConsumer() - listening ? runAsync() : consume() + if( listening ) { + runAsync() + }else{ + consume() + closeConsumer() + } return this } @@ -92,6 +97,10 @@ class TopicHandler { consumer } + void closeConsumer(){ + consumer.close() + } + void consume(){ try { final records = consumer.poll(duration) @@ -114,6 +123,8 @@ class TopicHandler { log.trace "Closing $topic kafka thread" }catch(Exception e){ log.error "Exception reading kafka topic $topic",e + }finally{ + closeConsumer() } }) }