Skip to content

Commit

Permalink
improve README
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jorgeaguileraseqera committed Oct 27, 2022
1 parent 325dcd7 commit 5b31131
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 3 deletions.
169 changes: 167 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Events plugin for Nextflow
# Kafka plugin for Nextflow

This plugin provides an extension to implement built-in support for events systems as Kafka and manipulation in Nextflow scripts.
This plugin provides an extension to implement built-in support for Kafka systems and manipulation in Nextflow scripts.

It provides the ability to create a Nextflow channel to listening from topics as send message.

Expand All @@ -11,3 +11,168 @@ The current version provides out-of-the-box support for the following systems:
NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES.


## Get started

Make sure to have Nextflow `22.10.0` or later. Add the following snippet to your `nextflow.config` file.

```
plugins {
id '[email protected]'
}
```

## Configuration

The plugin configuration is specified using the `kafka` scope:

| Config option | Description |
|-------------------------------|--- |
| `kafka.url` | The connection url.
| `kafka.group` | The group where the plugin will be attached.

For example:

```
kafka {
url = 'localhost:902'
group = 'group'
}
```

## Available operations

This plugin adds to the Nextflow DSL the following extensions

### fromTopic

The `fromTopic` factory method allows for performing a query against the topic specified and creating a Nextflow channel emitting
a tuple for each record in the corresponding group with the key and value. For example:

```
include { fromTopic } from 'plugin/nf-kafka'
ch = channel.fromTopic('my-topic')
```

### watchTopic

The `watchTopic` factory method allows to watch against the topic specified and creating a Nextflow channel emitting
a tuple for each record in the corresponding group. For example:

```
include { watchTopic } from 'plugin/nf-kafka'
ch = channel.watchTopic('my-topic')
.subscribe { println "new message received ${it[0]} = ${it[1]}" }
.until{ it[1] == 'done' }
```

### writeMessage

The `writeMessage` operator allows to send a message (with and optional key) to a topic

```
include { writeMessage; watchTopic } from 'plugin/nf-kafka'
process listener{
input: val(msg)
output: stdout
script:
writeMessage('my-topic', 'Hi folks')
"echo ${msg[1]}"
}
workflow{
listener( Channel.of("msg") )
}
```

You need to provide the topic and the message plus an optional key

## Develop

1. build the plugin `./gradlew copyPluginZip`

2. create a `docker-compose.yml` and run `docker-compose up -d` to have a local instance running at localhost:29092

```
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
```

3. create a Nextflow project and configure it

```
plugins{
id '[email protected]'
}
kafka{
url='localhost:29092'
group='group'
}
```

4.- create a run a listener pipeline

```
include { watchTopic } from 'plugin/nf-kafka'
process listener1{
input: val(msg)
output: stdout
script:
"echo '${msg[1]}'"
}
workflow{
chn1 = channel.watchTopic("test").until{ it[1]=='done' }
listener1(chn1) | view
}
```

NXF_PLUGINS_DIR=/<the-path-to-the-project>/build/plugins/ nextflow run ./kafka-listener.nf

5.- create a run (in another shell) a producer

```
include { publishTopic } from 'plugin/nf-kafka'
workflow{
Channel.of(
"Hi folks",
"Another message",
"done"
).publishTopic("test")
}
```

NXF_PLUGINS_DIR=/<the-path-to-the-project>/build/plugins/ nextflow run ./kafka-producer.nf

If all goes well, producer will send 3 messages to the topic and listener will print them and exit

2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

rootProject.name = 'nf-events'
rootProject.name = 'nf-kafka'
include 'plugins'
include('plugins:nf-kafka')

0 comments on commit 5b31131

Please sign in to comment.