Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading from topics #10

Merged
merged 8 commits into from
Jul 10, 2024
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ Make sure to have Nextflow `22.10.0` or later. Add the following snippet to your

```
plugins {
id 'nf-kafa@0.0.1'
id 'nf-kafka@0.0.1'
}
```

## Configuration

The plugin configuration is specified using the `kafka` scope:

| Config option | Description |
|-------------------------------|--- |
| `kafka.url` | The connection url.
| `kafka.group` | The group where the plugin will be attached.
| Config option | Description | Mandatory |
| ----------------------------- | ------------------------------------------------------- | --------- |
| `kafka.url` | The connection url. | yes |
| `kafka.group` | The group where the plugin will be attached. | yes |
| `kafka.pollTimeout` | The time that consumer will wait to consume from topic. | no |

For example:

```
kafka {
url = 'localhost:902'
group = 'group'
pollTimeout = 2500 // ms
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ class ChannelKafkaExtension extends PluginExtensionPoint {
}

@Factory
DataflowWriteChannel fromTopic(String topic, Duration duration=Duration.ofSeconds(1)) {
topicToChannel(topic, duration, false)
DataflowWriteChannel fromTopic(String topic) {
topicToChannel(topic, false)
}

@Factory
DataflowWriteChannel watchTopic(String topic, Duration duration=Duration.ofSeconds(1)) {
topicToChannel(topic, duration, true)
DataflowWriteChannel watchTopic(String topic) {
topicToChannel(topic, true)
}

@Operator
Expand Down Expand Up @@ -91,7 +91,7 @@ class ChannelKafkaExtension extends PluginExtensionPoint {
.publishMessage([key, message])
}

private DataflowWriteChannel topicToChannel(String topic, Duration duration, boolean listening){
private DataflowWriteChannel topicToChannel(String topic, boolean listening){
final channel = CH.create()

final handler = new TopicHandler()
Expand All @@ -101,7 +101,7 @@ class ChannelKafkaExtension extends PluginExtensionPoint {
.withTopic(topic)
.withListening(listening)
.withTarget(channel)
.withDuration(duration)
.withDuration(Duration.ofMillis(config.pollTimeout))
if(NF.dsl2) {
session.addIgniter {-> handler.perform() }
}
Expand Down
5 changes: 5 additions & 0 deletions plugins/nf-kafka/src/main/nextflow/events/KafkaPlugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ class KafkaPlugin extends BasePlugin {
KafkaPlugin(PluginWrapper wrapper) {
super(wrapper)
}

@Override
void stop() {
ThreadFactory.instance.shutdownExecutors()
}
}
37 changes: 33 additions & 4 deletions plugins/nf-kafka/src/main/nextflow/events/kafa/KafkaConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import groovy.transform.ToString
@CompileStatic
class KafkaConfig {

private String url
private String group
private String url // mandatory
private String group // mandatory
private Integer pollTimeout // ms

KafkaConfig(Map map){
def config = map ?: Collections.emptyMap()
url = config.url
group = config.group

url = config.url ?: KafkaConfig.missingConf("url")
group = config.group ?: KafkaConfig.missingConf("group")
pollTimeout = KafkaConfig.parsePollTimeout(config.pollTimeout)
}

String getUrl(){
Expand All @@ -29,5 +32,31 @@ class KafkaConfig {
String getGroup(){
group
}

Integer getpollTimeout(){
pollTimeout
}

static private Integer parsePollTimeout(confPollTimeout) {
Integer pollTimeout = 1000 // default value
if (confPollTimeout) {
pollTimeout = confPollTimeout
.toString()
.toInteger()
}
return pollTimeout
}

static private missingConf(String conf) {
throw new KafkaConfigException(conf);
}

}

class KafkaConfigException extends Exception {
public KafkaConfigException(String conf) {
super(
"the configuration of the $conf is mandatory, please configure it in the scope of the kafka plugin conf"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nextflow.events.kafa
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Session
import nextflow.events.KafkaPlugin
import nextflow.util.ThreadPoolBuilder
Expand Down Expand Up @@ -77,6 +78,7 @@ class TopicHandler {
}else{
consume()
closeConsumer()
this.target.bind(Channel.STOP)
}
return this
}
Expand Down
Loading