Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the plugin functional #1

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4dfa9e3
[build] ci: :construction_worker: add ci
panaxging Jul 5, 2024
ffe0160
[build] test: :white_check_mark: fix failing test
panaxging Jul 5, 2024
1a9e28d
Merge pull request #1 from panaxging/build
panaxging Jul 5, 2024
6a53a02
[release] ci: :construction_worker: upgrade java version
panaxging Jul 5, 2024
53baeca
[release] ci: :construction_worker: add release workflow
panaxging Jul 5, 2024
640e1aa
Revert "[release] ci: :construction_worker: upgrade java version"
panaxging Jul 5, 2024
8d0ab36
[release] ci: :arrow_down: downgrade java version
panaxging Jul 5, 2024
e2a1eef
Merge pull request #2 from panaxging/release
panaxging Jul 5, 2024
bfced3a
[fix_release_workflow] ci: :pushpin: pin java version to 17
panaxging Jul 5, 2024
c77f51c
Merge pull request #3 from panaxging/ci/fix_release_workflow
panaxging Jul 5, 2024
bd20b10
[fix_java_version_env] ci: :green_heart: fix java_version env variable
panaxging Jul 5, 2024
0b6ec00
Merge pull request #4 from panaxging/ci/fix_java_version_env
panaxging Jul 5, 2024
812f9a3
[update_release_system] chore: :see_no_evil: ingnore gradle.properties
panaxging Jul 5, 2024
3488a33
[update_release_system] feat: :zap: improve credentials parsing using…
panaxging Jul 5, 2024
c18eb2c
[update_release_system] ci: :construction_worker: remove build job an…
panaxging Jul 5, 2024
d6ce467
Merge pull request #5 from panaxging/ci/update_release_system
panaxging Jul 5, 2024
61ae9f0
[add_github_token] ci: :green_heart: add GITHUB_TOKEN
panaxging Jul 5, 2024
c6e2edf
Merge pull request #6 from panaxging/ci/add_github_token
panaxging Jul 5, 2024
288a0cb
[BasePlugin_wrapper] fix: :fire: remove override of start and stop me…
panaxging Jul 5, 2024
ec28edc
Merge pull request #7 from panaxging/fix/BasePlugin_wrapper
panaxging Jul 5, 2024
5dae814
[kafka] feat: :arrow_up: upgradre kafka-clients to 3.7.1
panaxging Jul 9, 2024
2fb1daa
Merge pull request #9 from panaxging/upgrade/kafka
panaxging Jul 10, 2024
218d1ed
[reading_topics] fix:
panaxging Jul 10, 2024
b5d8675
[reading_topics] fix: :bug: close the channel after consuming topic
panaxging Jul 10, 2024
c22ab44
[reading_topics] feat: :sparkles: add poll timeout
panaxging Jul 10, 2024
20e6688
[reading_topics] fix: :bug: parse pollTimeout as integer and throw ex…
panaxging Jul 10, 2024
bfae593
[reading_topics] feat: :sparkles: retrieve pollTimeout from kafkaConf
panaxging Jul 10, 2024
90fd445
[reading_topics] docs: :pencil2: fix typo
panaxging Jul 10, 2024
396738a
[reading_topics] docs: :memo: update configuration section
panaxging Jul 10, 2024
686bb5e
[reading_topics] fix: :bug: close the channel
panaxging Jul 10, 2024
e4038a4
Merge pull request #10 from panaxging/fix/reading_topics
panaxging Jul 10, 2024
f960d6a
[watchTopic] fix: :bug: fix watchTopic method implementing "until" ar…
panaxging Jul 10, 2024
9a674ff
[watchTopic] test: :white_check_mark: update and deactivate some tests
panaxging Jul 10, 2024
418b245
[watchTopic] chore: :bookmark: release 0.0.2
panaxging Jul 10, 2024
ad84886
[watchTopic] test: :white_check_mark: deactivate test
panaxging Jul 10, 2024
8752308
Merge pull request #11 from panaxging/fix/watchTopic
panaxging Jul 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: build

env:
java_version: 17

on:
push:
branches:
- '*'
tags-ignore:
- '*'
pull_request:
branches:
- '*'
jobs:
build:
name: Build
if: "!contains(github.event.head_commit.message, '[ci skip]')"
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false

steps:
- name: Environment
run: env | sort

- name: Checkout
uses: actions/checkout@v1
with:
fetch-depth: 1
submodules: true

- name: Setup Java ${{ env.java_version }}
uses: actions/setup-java@v1
with:
java-version: ${{ env.java_version }}
architecture: x64
distribution: 'temurin'

- name: Compile
run: ./gradlew assemble

- name: Tests
run: ./gradlew check
env:
GRADLE_OPTS: '-Dorg.gradle.daemon=false'
29 changes: 29 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: release

env:
java_version: 17

on:
push:
tags:
- "[0-9]+.[0-9]+.[0-9]+"

permissions:
contents: write

jobs:
publish-gpr:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ env.java_version }} for x64
uses: actions/setup-java@v3
with:
java-version: ${{ env.java_version }}
distribution: "temurin"
architecture: x64

- name: Build and upload artifact and release
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ./gradlew :plugins:nf-kafka:upload
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
.idea
.nextflow

# Ignore dev secrets
gradle.properties

# Ignore Gradle build output directory
build
work
Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ Make sure to have Nextflow `22.10.0` or later. Add the following snippet to your

```
plugins {
id 'nf-kafa@0.0.1'
id 'nf-kafka@0.0.2'
}
```

## Configuration

The plugin configuration is specified using the `kafka` scope:

| Config option | Description |
|-------------------------------|--- |
| `kafka.url` | The connection url.
| `kafka.group` | The group where the plugin will be attached.
| Config option | Description | Mandatory |
| ----------------------------- | ------------------------------------------------------- | --------- |
| `kafka.url` | The connection url. | yes |
| `kafka.group` | The group where the plugin will be attached. | yes |
| `kafka.pollTimeout` | The time that consumer will wait to consume from topic. | no |

For example:

```
kafka {
url = 'localhost:902'
group = 'group'
pollTimeout = 2500 // ms
}
```

Expand Down Expand Up @@ -124,7 +126,7 @@ services:

```
plugins{
id '[email protected].1'
id '[email protected].2'
}

kafka{
Expand Down
28 changes: 14 additions & 14 deletions plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ plugins {
id "idea"
}

ext.github_organization = 'nextflow-io'
ext.github_username = project.findProperty('github_username') ?: 'pditommaso'
ext.github_owner = project.findProperty('github_owner') ?: System.getenv('GITHUB_REPOSITORY_OWNER')
ext.github_user = project.findProperty('github_user') ?: System.getenv('GITHUB_TRIGGERING_ACTOR')
ext.github_repository = project.findProperty('github_repository') ?: System.getenv('GITHUB_REPOSITORY')
ext.github_access_token = project.findProperty('github_access_token') ?: System.getenv('GITHUB_TOKEN')
ext.github_commit_email = project.findProperty('github_commit_email') ?: '[email protected]'

jar.enabled = false

Expand Down Expand Up @@ -86,7 +86,7 @@ subprojects {
{
"version": "${project.version}",
"date": "${timestamp}",
"url": "https://github.com/${github_organization}/${project.name}/releases/download/${project.version}/${project.name}-${project.version}.zip",
"url": "https://github.com/${github_repository}/releases/download/${project.version}/${project.name}-${project.version}.zip",
"requires": "${metaFromManifest('Plugin-Requires',file('src/resources/META-INF/MANIFEST.MF'))}",
"sha512sum": "${computeSha512(zip)}"
}
Expand Down Expand Up @@ -128,8 +128,8 @@ subprojects {
"$buildDir/libs/${project.name}-${project.version}-meta.json" ]}
release = providers.provider { project.version }
repo = providers.provider { project.name }
owner = github_organization
userName = github_username
owner = github_owner
userName = github_user
authToken = github_access_token
skipExisting = true
}
Expand All @@ -143,11 +143,11 @@ classes.dependsOn subprojects.copyPluginLibs
/*
* Merge and publish the plugins index file
*/
task publishIndex( type: io.nextflow.gradle.tasks.GithubRepositoryPublisher ) {
indexUrl = 'https://github.com/nextflow-io/plugins/main/plugins.json'
repos = ['nf-sqldb']
owner = github_organization
githubUser = github_username
githubEmail = github_commit_email
githubToken = github_access_token
}
// task publishIndex( type: io.nextflow.gradle.tasks.GithubRepositoryPublisher ) {
// indexUrl = 'https://github.com/nextflow-io/plugins/main/plugins.json'
// repos =
// owner =
// githubUser =
// githubEmail =
// githubToken =
// }
2 changes: 1 addition & 1 deletion plugins/nf-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dependencies {
compileOnly 'org.slf4j:slf4j-api:1.7.10'
compileOnly 'org.pf4j:pf4j:3.4.1'

implementation 'org.apache.kafka:kafka-clients:2.6.3'
implementation 'org.apache.kafka:kafka-clients:3.7.1'

testImplementation "io.nextflow:nextflow:$nextflowVersion"
testImplementation "org.codehaus.groovy:groovy:3.0.10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ class ChannelKafkaExtension extends PluginExtensionPoint {
}

@Factory
DataflowWriteChannel fromTopic(String topic, Duration duration=Duration.ofSeconds(1)) {
topicToChannel(topic, duration, false)
DataflowWriteChannel fromTopic(String topic) {
topicToChannel(topic, false)
}

@Factory
DataflowWriteChannel watchTopic(String topic, Duration duration=Duration.ofSeconds(1)) {
topicToChannel(topic, duration, true)
DataflowWriteChannel watchTopic(String topic, Map until) {
topicToChannel(topic, true, until)
}

@Operator
Expand Down Expand Up @@ -91,7 +91,7 @@ class ChannelKafkaExtension extends PluginExtensionPoint {
.publishMessage([key, message])
}

private DataflowWriteChannel topicToChannel(String topic, Duration duration, boolean listening){
private DataflowWriteChannel topicToChannel(String topic, boolean listening, Map until=null){
final channel = CH.create()

final handler = new TopicHandler()
Expand All @@ -100,8 +100,9 @@ class ChannelKafkaExtension extends PluginExtensionPoint {
.withGroup(config.group)
.withTopic(topic)
.withListening(listening)
.withUntil(until)
.withTarget(channel)
.withDuration(duration)
.withDuration(Duration.ofMillis(config.pollTimeout))
if(NF.dsl2) {
session.addIgniter {-> handler.perform() }
}
Expand Down
7 changes: 0 additions & 7 deletions plugins/nf-kafka/src/main/nextflow/events/KafkaPlugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,8 @@ class KafkaPlugin extends BasePlugin {
super(wrapper)
}

@Override
void start() {
super.start()
}

@Override
void stop() {
ThreadFactory.instance.shutdownExecutors()
super.stop()
}

}
37 changes: 33 additions & 4 deletions plugins/nf-kafka/src/main/nextflow/events/kafa/KafkaConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import groovy.transform.ToString
@CompileStatic
class KafkaConfig {

private String url
private String group
private String url // mandatory
private String group // mandatory
private Integer pollTimeout // ms

KafkaConfig(Map map){
def config = map ?: Collections.emptyMap()
url = config.url
group = config.group

url = config.url ?: KafkaConfig.missingConf("url")
group = config.group ?: KafkaConfig.missingConf("group")
pollTimeout = KafkaConfig.parsePollTimeout(config.pollTimeout)
}

String getUrl(){
Expand All @@ -29,5 +32,31 @@ class KafkaConfig {
String getGroup(){
group
}

Integer getpollTimeout(){
pollTimeout
}

static private Integer parsePollTimeout(confPollTimeout) {
Integer pollTimeout = 1000 // default value
if (confPollTimeout) {
pollTimeout = confPollTimeout
.toString()
.toInteger()
}
return pollTimeout
}

static private missingConf(String conf) {
throw new KafkaConfigException(conf);
}

}

class KafkaConfigException extends Exception {
public KafkaConfigException(String conf) {
super(
"the configuration of the $conf is mandatory, please configure it in the scope of the kafka plugin conf"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nextflow.events.kafa
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Session
import nextflow.events.KafkaPlugin
import nextflow.util.ThreadPoolBuilder
Expand Down Expand Up @@ -31,6 +32,7 @@ class TopicHandler {
private String topic
private Duration duration
private boolean listening
private Map until
private Session session

KafkaConsumer<String, String> consumer
Expand Down Expand Up @@ -60,6 +62,11 @@ class TopicHandler {
this
}

TopicHandler withUntil(Map until){
this.until = until
this
}

TopicHandler withTarget(DataflowWriteChannel channel) {
this.target = channel
return this
Expand All @@ -77,6 +84,7 @@ class TopicHandler {
}else{
consume()
closeConsumer()
this.target.bind(Channel.STOP)
}
return this
}
Expand Down Expand Up @@ -105,8 +113,15 @@ class TopicHandler {
try {
final records = consumer.poll(duration)
records.each {
target << [ it.key(), it.value()]
Map record = [ key:it.key(), value:it.value() ]
target << record

if (record == this.until) {
Thread.currentThread().interrupt()
target.bind(Channel.STOP)
}
}

}catch(Exception e){
log.error "Exception reading kafka topic $topic",e
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-kafka/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ Manifest-Version: 1.0
Plugin-Class: nextflow.events.KafkaPlugin
Plugin-Id: nf-kafka
Plugin-Provider: Seqera Labs
Plugin-Version: 0.0.1
Plugin-Version: 0.0.2
Plugin-Requires: >=22.10.0
Loading