diff --git a/README.md b/README.md index 4f8a34c..a2b6cdf 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Make sure to have Nextflow `22.10.0` or later. Add the following snippet to your ``` plugins { - id 'nf-kafa@0.0.1' + id 'nf-kafka@0.0.1' } ``` @@ -25,10 +25,11 @@ plugins { The plugin configuration is specified using the `kafka` scope: -| Config option | Description | -|-------------------------------|--- | -| `kafka.url` | The connection url. -| `kafka.group` | The group where the plugin will be attached. +| Config option | Description | Mandatory | +| ----------------------------- | ------------------------------------------------------- | --------- | +| `kafka.url` | The connection url. | yes | +| `kafka.group` | The group where the plugin will be attached. | yes | +| `kafka.pollTimeout` | The time that consumer will wait to consume from topic. | no | For example: @@ -36,6 +37,7 @@ For example: kafka { url = 'localhost:902' group = 'group' + pollTimeout = 2500 // ms } ``` diff --git a/plugins/nf-kafka/src/main/nextflow/events/ChannelKafkaExtension.groovy b/plugins/nf-kafka/src/main/nextflow/events/ChannelKafkaExtension.groovy index b4613db..469f7b3 100644 --- a/plugins/nf-kafka/src/main/nextflow/events/ChannelKafkaExtension.groovy +++ b/plugins/nf-kafka/src/main/nextflow/events/ChannelKafkaExtension.groovy @@ -55,13 +55,13 @@ class ChannelKafkaExtension extends PluginExtensionPoint { } @Factory - DataflowWriteChannel fromTopic(String topic, Duration duration=Duration.ofSeconds(1)) { - topicToChannel(topic, duration, false) + DataflowWriteChannel fromTopic(String topic) { + topicToChannel(topic, false) } @Factory - DataflowWriteChannel watchTopic(String topic, Duration duration=Duration.ofSeconds(1)) { - topicToChannel(topic, duration, true) + DataflowWriteChannel watchTopic(String topic) { + topicToChannel(topic, true) } @Operator @@ -91,7 +91,7 @@ class ChannelKafkaExtension extends PluginExtensionPoint { .publishMessage([key, message]) } - private DataflowWriteChannel topicToChannel(String topic, Duration duration, boolean listening){ + private DataflowWriteChannel topicToChannel(String topic, boolean listening){ final channel = CH.create() final handler = new TopicHandler() @@ -101,7 +101,7 @@ class ChannelKafkaExtension extends PluginExtensionPoint { .withTopic(topic) .withListening(listening) .withTarget(channel) - .withDuration(duration) + .withDuration(Duration.ofMillis(config.pollTimeout)) if(NF.dsl2) { session.addIgniter {-> handler.perform() } } diff --git a/plugins/nf-kafka/src/main/nextflow/events/KafkaPlugin.groovy b/plugins/nf-kafka/src/main/nextflow/events/KafkaPlugin.groovy index 16ead15..b52274e 100644 --- a/plugins/nf-kafka/src/main/nextflow/events/KafkaPlugin.groovy +++ b/plugins/nf-kafka/src/main/nextflow/events/KafkaPlugin.groovy @@ -33,4 +33,9 @@ class KafkaPlugin extends BasePlugin { KafkaPlugin(PluginWrapper wrapper) { super(wrapper) } + + @Override + void stop() { + ThreadFactory.instance.shutdownExecutors() + } } diff --git a/plugins/nf-kafka/src/main/nextflow/events/kafa/KafkaConfig.groovy b/plugins/nf-kafka/src/main/nextflow/events/kafa/KafkaConfig.groovy index f9662ac..3712573 100644 --- a/plugins/nf-kafka/src/main/nextflow/events/kafa/KafkaConfig.groovy +++ b/plugins/nf-kafka/src/main/nextflow/events/kafa/KafkaConfig.groovy @@ -13,13 +13,16 @@ import groovy.transform.ToString @CompileStatic class KafkaConfig { - private String url - private String group + private String url // mandatory + private String group // mandatory + private Integer pollTimeout // ms KafkaConfig(Map map){ def config = map ?: Collections.emptyMap() - url = config.url - group = config.group + + url = config.url ?: KafkaConfig.missingConf("url") + group = config.group ?: KafkaConfig.missingConf("group") + pollTimeout = KafkaConfig.parsePollTimeout(config.pollTimeout) } String getUrl(){ @@ -29,5 +32,31 @@ class KafkaConfig { String getGroup(){ group } + + Integer getpollTimeout(){ + pollTimeout + } + + static private Integer parsePollTimeout(confPollTimeout) { + Integer pollTimeout = 1000 // default value + if (confPollTimeout) { + pollTimeout = confPollTimeout + .toString() + .toInteger() + } + return pollTimeout + } + + static private missingConf(String conf) { + throw new KafkaConfigException(conf); + } } + +class KafkaConfigException extends Exception { + public KafkaConfigException(String conf) { + super( + "the configuration of the $conf is mandatory, please configure it in the scope of the kafka plugin conf" + ); + } +} \ No newline at end of file diff --git a/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy b/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy index cca105a..f2b0f20 100644 --- a/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy +++ b/plugins/nf-kafka/src/main/nextflow/events/kafa/TopicHandler.groovy @@ -3,6 +3,7 @@ package nextflow.events.kafa import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.Channel import nextflow.Session import nextflow.events.KafkaPlugin import nextflow.util.ThreadPoolBuilder @@ -77,6 +78,7 @@ class TopicHandler { }else{ consume() closeConsumer() + this.target.bind(Channel.STOP) } return this }