diff --git a/.github/workflows/seqera_docs_changelog.yml b/.github/workflows/seqera_docs_changelog.yml new file mode 100644 index 000000000..44236e6e7 --- /dev/null +++ b/.github/workflows/seqera_docs_changelog.yml @@ -0,0 +1,61 @@ +name: Push changelog to Seqera Docs +on: + release: + types: [published] + workflow_dispatch: + inputs: + release_name: + description: 'Release version (e.g. 1.0.0)' + required: true + release_body: + description: 'Release changelog content' + required: true + +jobs: + update-docs: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Clone seqeralabs/docs + run: | + git clone https://github.com/seqeralabs/docs.git seqeralabs-docs + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Create changelog file + run: | + mkdir -p seqeralabs-docs/changelog/wave + cat << EOF > seqeralabs-docs/changelog/wave/${{ github.event.release.name || inputs.release_name }}.mdx + --- + title: Wave ${{ github.event.release.name || inputs.release_name }} + date: $(date +%Y-%m-%d) + tags: [wave] + --- + + ${{ github.event.release.body || inputs.release_body }} + EOF + + - uses: actions/create-github-app-token@v1 + id: generate-token + with: + app-id: ${{ secrets.DOCS_BOT_APP_ID }} + private-key: ${{ secrets.DOCS_BOT_APP_PRIVATE_KEY }} + owner: seqeralabs + repositories: docs + + - name: Create Pull Request + uses: peter-evans/create-pull-request@v7 + with: + token: ${{ steps.generate-token.outputs.token }} + branch-token: ${{ steps.generate-token.outputs.token }} + path: seqeralabs-docs + commit-message: "Changelog: Wave ${{ github.event.release.name }}" + title: "Changelog: Wave ${{ github.event.release.name }}" + body: | + This PR adds the changelog for Wave ${{ github.event.release.name }} to the Seqera documentation. + + This is an automated PR created from the Wave repository. + branch: changelog-wave-${{ github.event.release.name }} + base: master + delete-branch: true diff --git a/.gitignore b/.gitignore index 41b2ff645..be360a502 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,6 @@ deployment-url.txt tsp-output/ node_modules/ package-lock.json + +# Seqera Docs clone +seqeralabs-docs \ No newline at end of file diff --git a/VERSION b/VERSION index ace44233b..e34208c93 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.15.1 +1.15.4 diff --git a/build.gradle b/build.gradle index 56c9eb999..b5b734bb5 100644 --- a/build.gradle +++ b/build.gradle @@ -35,9 +35,10 @@ dependencies { compileOnly 'io.micronaut:micronaut-inject-groovy' compileOnly 'io.micronaut:micronaut-http-validation' implementation 'jakarta.persistence:jakarta.persistence-api:3.0.0' - api 'io.seqera:lib-mail:1.2.0' + api 'io.seqera:lib-mail:1.2.1' api 'io.seqera:wave-api:0.14.0' api 'io.seqera:wave-utils:0.15.0' + implementation 'io.seqera:lib-crypto:1.0.0' implementation 'io.micronaut:micronaut-http-client' implementation 'io.micronaut:micronaut-jackson-databind' implementation 'io.micronaut.groovy:micronaut-runtime-groovy' @@ -53,12 +54,11 @@ dependencies { implementation 'dev.failsafe:failsafe:3.1.0' implementation 'io.micronaut.reactor:micronaut-reactor' implementation 'io.micronaut.reactor:micronaut-reactor-http-client' - implementation('io.seqera:tower-crypto:22.4.0-watson') { transitive = false } // to be replaced with 22.4.0 once released implementation 'org.apache.commons:commons-compress:1.27.1' implementation 'org.apache.commons:commons-lang3:3.17.0' - implementation 'io.kubernetes:client-java:19.0.0' - implementation 'io.kubernetes:client-java-api-fluent:18.0.1' - implementation 'com.google.code.gson:gson:2.9.0' + implementation 'io.kubernetes:client-java:21.0.1' + implementation 'io.kubernetes:client-java-api-fluent:21.0.1' + implementation 'com.google.code.gson:gson:2.10.1' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'com.squareup.moshi:moshi:1.15.1' @@ -104,6 +104,7 @@ dependencies { runtimeOnly 'org.bouncycastle:bcpkix-jdk18on:1.78' runtimeOnly 'org.bitbucket.b_c:jose4j:0.9.4' runtimeOnly 'io.netty:netty-bom:4.1.115.Final' + runtimeOnly 'com.google.protobuf:protobuf-java:4.27.5' } application { diff --git a/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle b/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle index 835199bd4..5b7eb0d1e 100644 --- a/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle +++ b/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle @@ -41,6 +41,7 @@ java { dependencies { implementation 'org.slf4j:slf4j-api:2.0.16' + implementation 'org.slf4j:slf4j-jdk-platform-logging:2.0.16' testImplementation 'ch.qos.logback:logback-core:1.5.12' testImplementation 'ch.qos.logback:logback-classic:1.5.12' diff --git a/changelog.txt b/changelog.txt index 3aa45f8dd..fc8a65e27 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,4 +1,20 @@ # Wave changelog +1.15.4 - 27 Nov 2024 +- Add blocking executor to async caches (#759) [86f7d3e3] +- Add ExecutesOn annotation to error controller [a2db2b00] + +1.15.3 - 25 Nov 2024 +- Add Support Redis support for SSL and password (#717) [bf63599d] +- Bump MN 4.7.1 (#741) [203e5dd0] +- Remove double cache builder invocation [994b722c] +- Bump lib-mail version 1.2.1 [4da98a23] +- Bump io.seqera:lib-crypto [36f5b24d] +- Bump gson 2.10.1 [117703b9] +- Bump protobuf-java version 4.27.5 [3ea758b4] +- Bump k8s client to version 21.0.1 (#553) [51788578] + +1.15.2 [skipped] + 1.15.1 - 20 Nov 2024 - Check block existence with object operation (#750) [86ef526c] - Add /v1alpha2/validate-creds (#752) [e24ec62c] diff --git a/configuration.md b/configuration.md index d162a98b5..d119cae16 100644 --- a/configuration.md +++ b/configuration.md @@ -184,6 +184,16 @@ Rate limit configuration controls the limits of anonymous and authenticated user - **`redis.pool.enabled`**: whether to enable the Redis pool. It is set to `true` by default, enabling the use of a connection pool for efficient management of connections to the Redis server. *Optional*. +- **`redis.pool.minIdle`**: Specifies the minimum number of idle connections to maintain in the Redis connection pool. The default value is `0`. This ensures that connections are readily available for use.  *Optional*. + +- **`redis.pool.maxIdle`**: Specifies the maximum number of idle connections to maintain in the Redis connection pool. The default value is `10`.  *Optional*. + +- **`redis.pool.maxTotal`**: Specifies the maximum number of connections that can be maintained in the Redis connection pool. The default value is `50`. This helps to manage resource usage efficiently while supporting high demand.  *Optional*. + +- **`redis.client.timeout`**: Defines the timeout duration (in milliseconds) for Redis client operations. The default value is `5000` (5 seconds).  *Optional*. + +- **`redis.password`**: Specifies the password used to authenticate with the Redis server. This is needed when redis authentication is enabled.  *Optional*. + - **`surreal.default.ns`**: the namespace for the Surreal database. It can be set using `${SURREALDB_NS}` environment variable. *Mandatory*. - **`surreal.default.db`**: the name of the Surreal database. It can be set using`${SURREALDB_DB}` environment variable. This setting defines the target database within the Surreal database system that Wave should interact with. *Mandatory*. diff --git a/gradle.properties b/gradle.properties index 54b62b375..9c2c3bcbb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,5 +16,5 @@ # along with this program. If not, see . # -micronautVersion=4.6.3 +micronautVersion=4.7.1 micronautEnvs=dev,h2,mail,aws-ses diff --git a/src/main/groovy/io/seqera/wave/ErrorHandler.groovy b/src/main/groovy/io/seqera/wave/ErrorHandler.groovy index 0d872918b..69a4ab430 100644 --- a/src/main/groovy/io/seqera/wave/ErrorHandler.groovy +++ b/src/main/groovy/io/seqera/wave/ErrorHandler.groovy @@ -24,6 +24,7 @@ import io.micronaut.http.HttpRequest import io.micronaut.http.HttpResponse import io.micronaut.http.HttpResponseFactory import io.micronaut.http.HttpStatus +import io.micronaut.http.exceptions.HttpStatusException import io.micronaut.security.authentication.AuthorizationException import io.seqera.wave.exception.BuildTimeoutException import io.seqera.wave.exception.DockerRegistryException @@ -55,8 +56,9 @@ class ErrorHandler { def HttpResponse handle(HttpRequest httpRequest, Throwable t, Mapper responseFactory) { final errId = LongRndKey.rndHex() final request = httpRequest?.toString() + final knownException = t instanceof WaveException || t instanceof HttpStatusException def msg = t.message - if( t instanceof WaveException && msg ) { + if( knownException && msg ) { // the the error cause if( t.cause ) msg += " - Cause: ${t.cause.message ?: t.cause}".toString() // render the message for logging @@ -81,6 +83,13 @@ class ErrorHandler { log.error(render, t) } + if( t instanceof HttpStatusException ) { + final body = (t.body.isPresent() ? t.body.get() : t.message) as T + return HttpResponse + .status(t.status) + .body(body) + } + if( t instanceof RegistryForwardException ) { // report this error as it has been returned by the target registry return HttpResponse diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy index 7ae087db7..a4252c4f3 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy @@ -22,6 +22,7 @@ import java.net.http.HttpRequest import java.net.http.HttpResponse import java.time.Duration import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import com.github.benmanes.caffeine.cache.AsyncLoadingCache @@ -33,6 +34,7 @@ import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.transform.ToString import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.exception.RegistryForwardException import io.seqera.wave.exception.RegistryUnauthorizedAccessException @@ -40,7 +42,9 @@ import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.util.RegHelper import io.seqera.wave.util.Retryable import io.seqera.wave.util.StringUtils +import jakarta.annotation.PostConstruct import jakarta.inject.Inject +import jakarta.inject.Named import jakarta.inject.Singleton import static io.seqera.wave.WaveDefault.DOCKER_IO import static io.seqera.wave.auth.RegistryUtils.isServerError @@ -64,6 +68,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService { @Inject private RegistryTokenStore tokenStore + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + @Canonical @ToString(includePackage = false, includeNames = true) static private class CacheKey { @@ -101,16 +109,23 @@ class RegistryAuthServiceImpl implements RegistryAuthService { } // FIXME https://github.com/seqeralabs/wave/issues/747 - private AsyncLoadingCache cacheTokens = Caffeine.newBuilder() - .newBuilder() - .maximumSize(10_000) - .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) - .buildAsync(loader) + private AsyncLoadingCache cacheTokens @Inject private RegistryLookupService lookupService - @Inject RegistryCredentialsFactory credentialsFactory + @Inject + private RegistryCredentialsFactory credentialsFactory + + @PostConstruct + private void init() { + cacheTokens = Caffeine + .newBuilder() + .maximumSize(10_000) + .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) + .executor(ioExecutor) + .buildAsync(loader) + } /** * Implements container registry login diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy index 2b15b8efa..13775d4a4 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy @@ -21,6 +21,7 @@ package io.seqera.wave.auth import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import com.github.benmanes.caffeine.cache.AsyncLoadingCache @@ -28,11 +29,14 @@ import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.exception.RegistryForwardException import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.util.Retryable +import jakarta.annotation.PostConstruct import jakarta.inject.Inject +import jakarta.inject.Named import jakarta.inject.Singleton import static io.seqera.wave.WaveDefault.DOCKER_IO import static io.seqera.wave.WaveDefault.DOCKER_REGISTRY_1 @@ -55,6 +59,10 @@ class RegistryLookupServiceImpl implements RegistryLookupService { @Inject private RegistryAuthStore store + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + private CacheLoader loader = new CacheLoader() { @Override RegistryAuth load(URI endpoint) throws Exception { @@ -74,11 +82,17 @@ class RegistryLookupServiceImpl implements RegistryLookupService { } // FIXME https://github.com/seqeralabs/wave/issues/747 - private AsyncLoadingCache cache = Caffeine.newBuilder() + private AsyncLoadingCache cache + + @PostConstruct + void init() { + cache = Caffeine .newBuilder() .maximumSize(10_000) .expireAfterAccess(1, TimeUnit.HOURS) + .executor(ioExecutor) .buildAsync(loader) + } protected RegistryAuth lookup0(URI endpoint) { final httpClient = HttpClientFactory.followRedirectsHttpClient() diff --git a/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy b/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy index bceda35d2..af4ad61b3 100644 --- a/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy @@ -25,6 +25,8 @@ import io.micronaut.http.HttpResponse import io.micronaut.http.annotation.Controller import io.micronaut.http.annotation.Error import io.micronaut.http.hateoas.JsonError +import io.micronaut.scheduling.TaskExecutors +import io.micronaut.scheduling.annotation.ExecuteOn import io.seqera.wave.ErrorHandler import jakarta.inject.Inject /** @@ -35,9 +37,11 @@ import jakarta.inject.Inject @Slf4j @CompileStatic @Controller('/error') +@ExecuteOn(TaskExecutors.BLOCKING) class ErrorController { - @Inject ErrorHandler handler + @Inject + private ErrorHandler handler @Error(global = true) HttpResponse handleException(HttpRequest request, Throwable exception) { diff --git a/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy b/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy index 7dbd76fd3..0a0d30f1a 100644 --- a/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy +++ b/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy @@ -53,7 +53,8 @@ class ContainerCoordinates implements ContainerPath { static ContainerCoordinates parse(String path) { if( !path ) throw new IllegalArgumentException("Container image name is not provided") - + if( path.contains(' ') ) + throw new IllegalArgumentException("Invalid container name - offending image: '$path'") final scheme = StringUtils.getUrlProtocol(path) if( scheme ) { if( scheme!='oras') throw new IllegalArgumentException("Invalid container scheme: '$scheme' - offending image: '$path'") diff --git a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy index cc6f9d036..337e5656b 100644 --- a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy +++ b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy @@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Requires import io.seqera.wave.configuration.RedisConfig +import jakarta.inject.Inject import jakarta.inject.Singleton import jakarta.validation.constraints.NotNull import redis.clients.jedis.JedisPool @@ -51,9 +52,7 @@ class SpillWayStorageFactory { @Singleton @Requires(property = 'redis.uri') - LimitUsageStorage redisStorage(@NotNull RedisConfig redisConfig){ - log.info "Using redis $redisConfig.uri as storage for rate limit" - def jedisPool = new JedisPool(redisConfig.uri) - return RedisStorage.builder().withJedisPool(jedisPool).build() + LimitUsageStorage redisStorage(JedisPool pool){ + return RedisStorage.builder().withJedisPool(pool).build() } } diff --git a/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy b/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy index c11ff2e85..8729f1336 100644 --- a/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy +++ b/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy @@ -23,9 +23,14 @@ import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Value +import io.micronaut.core.annotation.Nullable import jakarta.inject.Singleton +import redis.clients.jedis.DefaultJedisClientConfig +import redis.clients.jedis.JedisClientConfig import redis.clients.jedis.JedisPool import redis.clients.jedis.JedisPoolConfig +import redis.clients.jedis.exceptions.InvalidURIException +import redis.clients.jedis.util.JedisURIHelper /** * Redis connection pool factory * @@ -39,17 +44,41 @@ class RedisFactory { @Singleton JedisPool createRedisPool( - @Value('${redis.uri}') String uri, + @Value('${redis.uri}') String connection, @Value('${redis.pool.minIdle:0}') int minIdle, @Value('${redis.pool.maxIdle:10}') int maxIdle, - @Value('${redis.pool.maxTotal:50}') int maxTotal + @Value('${redis.pool.maxTotal:50}') int maxTotal, + @Value('${redis.client.timeout:5000}') int timeout, + @Nullable @Value('${redis.password}') String password ) { - log.info "Using redis $uri as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}" + log.info "Using redis ${connection} as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}; timeout: ${timeout}" + + final uri = URI.create(connection) + // pool config final config = new JedisPoolConfig() config.setMinIdle(minIdle) config.setMaxIdle(maxIdle) config.setMaxTotal(maxTotal) - return new JedisPool(config, URI.create(uri)) + // client config + final clientConfig = clientConfig(uri, password, timeout) + // create the jedis pool + return new JedisPool(config, JedisURIHelper.getHostAndPort(uri), clientConfig) + } + + protected JedisClientConfig clientConfig(URI uri, String password, int timeout) { + if (!JedisURIHelper.isValid(uri)) { + throw new InvalidURIException("Invalid Redis connection URI: ${uri}") + } + + return DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout) + .socketTimeoutMillis(timeout) + .blockingSocketTimeoutMillis(timeout) + .user(JedisURIHelper.getUser(uri)) + .password(password?:JedisURIHelper.getPassword(uri)) + .database(JedisURIHelper.getDBIndex(uri)) + .protocol(JedisURIHelper.getRedisProtocol(uri)) + .ssl(JedisURIHelper.isRedisSSLScheme(uri)) + .build() } } diff --git a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy index 30f9d0e04..3db479f4b 100644 --- a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy +++ b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy @@ -18,6 +18,7 @@ package io.seqera.wave.service.aws +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import java.util.regex.Pattern @@ -27,7 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.Canonical import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.util.StringUtils +import jakarta.annotation.PostConstruct +import jakarta.inject.Inject +import jakarta.inject.Named import jakarta.inject.Singleton import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider @@ -73,13 +78,22 @@ class AwsEcrService { } } - // FIXME https://github.com/seqeralabs/wave/issues/747 - private AsyncLoadingCache cache = Caffeine.newBuilder() - .newBuilder() - .maximumSize(10_000) - .expireAfterWrite(3, TimeUnit.HOURS) - .buildAsync(loader) + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cache + + @PostConstruct + private void init() { + cache = Caffeine + .newBuilder() + .maximumSize(10_000) + .expireAfterWrite(3, TimeUnit.HOURS) + .executor(ioExecutor) + .buildAsync(loader) + } private EcrClient ecrClient(String accessKey, String secretKey, String region) { EcrClient.builder() diff --git a/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy b/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy index 130efe56d..f81dd27fa 100644 --- a/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Value +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.encoder.EncodingStrategy import jakarta.inject.Inject import jakarta.inject.Named @@ -55,7 +56,7 @@ abstract class AbstractFutureStore implements FutureStore { private volatile Duration pollInterval @Inject - @Named('future-store-executor') + @Named(TaskExecutors.BLOCKING) private ExecutorService executor AbstractFutureStore(FutureHash store, EncodingStrategy encodingStrategy) { diff --git a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy index 8c2d3fa2c..feb9f2508 100644 --- a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.data.queue import java.time.Duration import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -61,21 +62,27 @@ abstract class AbstractMessageQueue implements Runnable { final private String name0 // FIXME https://github.com/seqeralabs/wave/issues/747 - final private AsyncCache closedClients = Caffeine.newBuilder() - .newBuilder() - .expireAfterWrite(10, TimeUnit.MINUTES) - .buildAsync() + final private AsyncCache closedClients - AbstractMessageQueue(MessageQueue broker) { + AbstractMessageQueue(MessageQueue broker, ExecutorService ioExecutor) { final type = TypeHelper.getGenericType(this, 0) this.encoder = new MoshiEncodeStrategy(type) {} this.broker = broker + this.closedClients = createCache(ioExecutor) this.name0 = name() + '-thread-' + count.getAndIncrement() this.thread = new Thread(this, name0) this.thread.setDaemon(true) this.thread.start() } + private AsyncCache createCache(ExecutorService ioExecutor) { + Caffeine + .newBuilder() + .executor(ioExecutor) + .expireAfterWrite(10, TimeUnit.MINUTES) + .buildAsync() + } + protected abstract String name() protected abstract Duration pollInterval() diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy index e10cde8d4..16a3c52e7 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.job import java.time.Duration import java.time.Instant +import java.util.concurrent.ExecutorService import com.github.benmanes.caffeine.cache.AsyncCache import com.github.benmanes.caffeine.cache.Cache @@ -27,8 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Context +import io.micronaut.scheduling.TaskExecutors import jakarta.annotation.PostConstruct import jakarta.inject.Inject +import jakarta.inject.Named + /** * Implement the logic to handle Blob cache transfer (uploads) * @@ -51,6 +55,10 @@ class JobManager { @Inject private JobConfig config + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + // FIXME https://github.com/seqeralabs/wave/issues/747 private AsyncCache debounceCache @@ -60,10 +68,21 @@ class JobManager { debounceCache = Caffeine .newBuilder() .expireAfterWrite(config.graceInterval.multipliedBy(2)) + .executor(ioExecutor) .buildAsync() queue.addConsumer((job)-> processJob(job)) } + /** + * Process a job entry aorrding the state modelled by the {@link JobSpec} object. + * + * @param jobSpec + * A {@link JobSpec} object representing the job to be processed + * @return + * {@code true} to signal the process has been processed successfully and it should + * be removed from the underlying queue, or {@code false} if the job execution has + * not yet completed. + */ protected boolean processJob(JobSpec jobSpec) { try { return processJob0(jobSpec) diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy index dc5a01d28..cfb9bcdd2 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy @@ -26,7 +26,6 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.kubernetes.client.custom.Quantity import io.kubernetes.client.openapi.models.V1ContainerBuilder -import io.kubernetes.client.openapi.models.V1DeleteOptions import io.kubernetes.client.openapi.models.V1EnvVar import io.kubernetes.client.openapi.models.V1HostPathVolumeSource import io.kubernetes.client.openapi.models.V1Job @@ -150,7 +149,8 @@ class K8sServiceImpl implements K8sService { JobStatus getJobStatus(String name) { final job = k8sClient .batchV1Api() - .readNamespacedJob(name, namespace, null) + .readNamespacedJob(name, namespace) + .execute() if( !job ) { log.warn "K8s job=$name - unknown" return null @@ -184,7 +184,8 @@ class K8sServiceImpl implements K8sService { V1Pod getPod(String name) { return k8sClient .coreV1Api() - .readNamespacedPod(name, namespace, null) + .readNamespacedPod(name, namespace) + .execute() } /** @@ -361,7 +362,8 @@ class K8sServiceImpl implements K8sService { void deletePod(String name) { k8sClient .coreV1Api() - .deleteNamespacedPod(name, namespace, (String)null, (String)null, (Integer)null, (Boolean)null, (String)null, (V1DeleteOptions)null) + .deleteNamespacedPod(name, namespace) + .execute() } @Deprecated @@ -396,7 +398,6 @@ class K8sServiceImpl implements K8sService { .withRestartPolicy("Never") .addAllToVolumes(volumes) - final requests = new V1ResourceRequirements() if( scanConfig.requestsCpu ) requests.putRequestsItem('cpu', new Quantity(scanConfig.requestsCpu)) @@ -436,7 +437,8 @@ class K8sServiceImpl implements K8sService { return k8sClient .batchV1Api() - .createNamespacedJob(namespace, spec, null, null, null,null) + .createNamespacedJob(namespace, spec) + .execute() } V1Job createTransferJobSpec(String name, String containerImage, List args, BlobCacheConfig blobConfig) { @@ -499,7 +501,8 @@ class K8sServiceImpl implements K8sService { final spec = buildJobSpec(name, containerImage, args, workDir, creds, timeout, nodeSelector, platform) return k8sClient .batchV1Api() - .createNamespacedJob(namespace, spec, null, null, null,null) + .createNamespacedJob(namespace, spec) + .execute() } V1Job buildJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, Duration timeout, Map nodeSelector, ContainerPlatform platform) { @@ -597,7 +600,8 @@ class K8sServiceImpl implements K8sService { final spec = scanJobSpec(name, containerImage, args, workDir, creds, scanConfig) return k8sClient .batchV1Api() - .createNamespacedJob(namespace, spec, null, null, null,null) + .createNamespacedJob(namespace, spec) + .execute() } V1Job scanJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, ScanConfig scanConfig) { @@ -664,7 +668,8 @@ class K8sServiceImpl implements K8sService { final spec = mirrorJobSpec(name, containerImage, args, workDir, creds, config) return k8sClient .batchV1Api() - .createNamespacedJob(namespace, spec, null, null, null,null) + .createNamespacedJob(namespace, spec) + .execute() } V1Job mirrorJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, MirrorConfig config) { @@ -736,7 +741,9 @@ class K8sServiceImpl implements K8sService { void deleteJob(String name) { k8sClient .batchV1Api() - .deleteNamespacedJob(name, namespace, null, null, null, null,"Foreground", null) + .deleteNamespacedJob(name, namespace) + .propagationPolicy("Foreground") + .execute() } @Override @@ -744,7 +751,9 @@ class K8sServiceImpl implements K8sService { // list all pods for the given job final allPods = k8sClient .coreV1Api() - .listNamespacedPod(namespace, null, null, null, null, "job-name=${jobName}", null, null, null, null, null, null) + .listNamespacedPod(namespace) + .labelSelector("job-name=${jobName}") + .execute() if( !allPods || !allPods.items ) return null diff --git a/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy index 8be138dd1..5fadd46c9 100644 --- a/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy @@ -80,7 +80,7 @@ class BuildLogServiceImpl implements BuildLogService { @Inject @Named(TaskExecutors.IO) - private volatile ExecutorService ioExecutor + private ExecutorService ioExecutor @PostConstruct private void init() { diff --git a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy index 889af4871..61518a451 100644 --- a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy @@ -19,13 +19,16 @@ package io.seqera.wave.service.pairing.socket import java.time.Duration +import java.util.concurrent.ExecutorService import javax.annotation.PreDestroy import groovy.transform.CompileStatic import io.micronaut.context.annotation.Value +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.service.data.queue.AbstractMessageQueue import io.seqera.wave.service.data.queue.MessageQueue import io.seqera.wave.service.pairing.socket.msg.PairingMessage +import jakarta.inject.Named import jakarta.inject.Singleton /** * Implement a distributed queue for Wave pairing messages @@ -40,9 +43,10 @@ class PairingOutboundQueue extends AbstractMessageQueue { PairingOutboundQueue( MessageQueue broker, - @Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval + @Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval, + @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor ) { - super(broker) + super(broker, ioExecutor) this.pollInterval = pollInterval } diff --git a/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy index e87a51e4c..225cc925e 100644 --- a/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy @@ -68,14 +68,14 @@ class ValidationServiceImpl implements ValidationService { // check does not start with a protocol prefix final prot = StringUtils.getUrlProtocol(name) if( prot ) { - return "Invalid container repository name — offending value: $name" + return "Invalid container repository name — offending value: '$name'" } try { ContainerCoordinates.parse(name) } catch (IllegalArgumentException e) { - return "Invalid container image name — offending value: $name" + return "Invalid container image name — offending value: '$name'" } return null } diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy index 27e12d87f..666c8325e 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy @@ -48,6 +48,7 @@ import io.seqera.wave.tower.client.TowerClient import io.seqera.wave.util.ExponentialAttempt import io.seqera.wave.util.JacksonHelper import io.seqera.wave.util.RegHelper +import jakarta.annotation.PostConstruct import jakarta.inject.Inject import jakarta.inject.Named import static io.seqera.wave.util.LongRndKey.rndHex @@ -92,10 +93,16 @@ abstract class TowerConnector { } } - private AsyncLoadingCache> refreshCache = Caffeine - .newBuilder() - .expireAfterWrite(1, TimeUnit.MINUTES) - .buildAsync(loader) + private AsyncLoadingCache> refreshCache + + @PostConstruct + void init() { + refreshCache = Caffeine + .newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .executor(ioExecutor) + .buildAsync(loader) + } /** Only for testing - do not use */ Cache> refreshCache0() { diff --git a/src/main/groovy/io/seqera/wave/util/Retryable.groovy b/src/main/groovy/io/seqera/wave/util/Retryable.groovy index db4a6f9eb..703d33900 100644 --- a/src/main/groovy/io/seqera/wave/util/Retryable.groovy +++ b/src/main/groovy/io/seqera/wave/util/Retryable.groovy @@ -24,6 +24,7 @@ import java.util.function.Consumer import java.util.function.Predicate import dev.failsafe.Failsafe +import dev.failsafe.FailsafeException import dev.failsafe.RetryPolicy import dev.failsafe.RetryPolicyBuilder import dev.failsafe.event.EventListener @@ -137,7 +138,11 @@ class Retryable { R apply(CheckedSupplier action) { final policy = retryPolicy() - return Failsafe.with(policy).get(action) + try { + return Failsafe.with(policy).get(action) + } catch (FailsafeException e) { + throw e.cause + } } static Retryable of(Config config) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index bd030f589..d7c23ecbc 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -37,23 +37,11 @@ micronaut: enabled: true # http client configuration # https://docs.micronaut.io/latest/guide/configurationreference.html#io.micronaut.http.client.DefaultHttpClientConfiguration - executors: - stream-executor: - type: FIXED - number-of-threads: 16 - future-store-executor: - type : FIXED - number-of-threads : 32 - netty: - event-loops: - stream-pool: - executor: 'stream-executor' http: services: stream-client: read-timeout: '30s' read-idle-timeout: '120s' - event-loop-group: 'stream-pool' security: enabled: true intercept-url-map: @@ -99,12 +87,5 @@ jackson: logger: levels: io.seqera: "DEBUG" - io.micronaut.retry.intercept.RecoveryInterceptor: "OFF" -# ^^^^^^^^^^^^^^^^^^^^^ -# Disable logs of `RecoveryInterceptor`, as they have been found to be noisy. -# Declarative `io.micronaut.http.client.annotation.@Client`s are annotated with `io.micronaut.retry.annotation@Recoverable` -# and throw an exception on every error response by default. `RecoveryInterceptor` ends up logging those exceptions -# even if they are handled and no actual recovery/fallback logic gets to take place. -# TODO remove once the project is updated to Micronaut 4.x, as @Client won't be annotated with @Recoverable anymore -# See https://github.com/micronaut-projects/micronaut-core/issues/3719; https://github.com/micronaut-projects/micronaut-core/pull/8235 + com.github.benmanes.caffeine.cache.LocalAsyncCache: "ERROR" ... diff --git a/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy b/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy index d582316b8..a3e266086 100644 --- a/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy +++ b/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy @@ -453,13 +453,13 @@ class ContainerControllerTest extends Specification { controller.validateContainerRequest(new SubmitContainerTokenRequest(containerImage: 'http://docker.io/foo:latest')) then: err = thrown(BadRequestException) - err.message == 'Invalid container repository name — offending value: http://docker.io/foo:latest' + err.message == "Invalid container repository name — offending value: 'http://docker.io/foo:latest'" when: controller.validateContainerRequest(new SubmitContainerTokenRequest(containerImage: 'http:docker.io/foo:latest')) then: err = thrown(BadRequestException) - err.message == 'Invalid container image name — offending value: http:docker.io/foo:latest' + err.message == "Invalid container image name — offending value: 'http:docker.io/foo:latest'" } diff --git a/src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy b/src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy new file mode 100644 index 000000000..cbed6fb6f --- /dev/null +++ b/src/test/groovy/io/seqera/wave/redis/RedisFactoryTest.groovy @@ -0,0 +1,62 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.redis + +import spock.lang.Specification + +import redis.clients.jedis.exceptions.InvalidURIException +/** + * + * @author Munish Chouhan + */ +class RedisFactoryTest extends Specification { + def 'should create redis pool with valid URI'() { + given: + def factory = new RedisFactory() + + when: + def pool = factory.createRedisPool(URI_STRING, MIN_IDLE, MAX_IDLE, MAX_TOTAL, TIMEOUT, 'password') + + then: + pool != null + + where: + URI_STRING | MIN_IDLE | MAX_IDLE | MAX_TOTAL | TIMEOUT + 'redis://localhost:6379' | 0 | 10 | 50 | 5000 + 'rediss://localhost:6379'| 1 | 5 | 20 | 3000 + } + + def 'should throw exception for invalid URI'() { + given: + def factory = new RedisFactory() + + when: + factory.createRedisPool(URI_STRING, MIN_IDLE, MAX_IDLE, MAX_TOTAL, TIMEOUT, null) + + then: + def e = thrown(InvalidURIException) + e.message.contains("Invalid Redis connection URI: $URI_STRING") + + where: + URI_STRING | MIN_IDLE | MAX_IDLE | MAX_TOTAL | TIMEOUT + 'redis://localhost' | 0 | 10 | 50 | 5000 + 'localhost:6379' | 1 | 5 | 20 | 3000 + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy b/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy index 20ce856ab..6f51810dd 100644 --- a/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy @@ -18,24 +18,27 @@ package io.seqera.wave.service.aws - import spock.lang.Requires import spock.lang.Specification - +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject /** * * @author Paolo Di Tommaso */ +@MicronautTest class AwsEcrServiceTest extends Specification { + @Inject + AwsEcrService provider + @Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')}) def 'should get registry token' () { given: def accessKey = System.getenv('AWS_ACCESS_KEY_ID') def secretKey = System.getenv('AWS_SECRET_ACCESS_KEY') def REGION = 'eu-west-1' - def provider = new AwsEcrService() when: def creds = provider.getLoginToken(accessKey, secretKey, REGION, false).tokenize(":") @@ -49,9 +52,8 @@ class AwsEcrServiceTest extends Specification { thrown(Exception) } + @Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')}) def 'should check registry info' () { - given: - def provider = new AwsEcrService() expect: provider.getEcrHostInfo(null) == null provider.getEcrHostInfo('foo') == null @@ -59,7 +61,6 @@ class AwsEcrServiceTest extends Specification { provider.getEcrHostInfo('195996028523.dkr.ecr.eu-west-1.amazonaws.com/foo') == new AwsEcrService.AwsEcrHostInfo('195996028523', 'eu-west-1') and: provider.getEcrHostInfo('public.ecr.aws') == new AwsEcrService.AwsEcrHostInfo(null, 'us-east-1') - } def 'should check ecr registry' () { diff --git a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy index 8923f6cf3..11a0fac6a 100644 --- a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy @@ -22,13 +22,17 @@ import spock.lang.Specification import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit +import io.micronaut.scheduling.TaskExecutors import io.micronaut.test.extensions.spock.annotation.MicronautTest import io.seqera.wave.service.pairing.socket.PairingOutboundQueue import io.seqera.wave.service.pairing.socket.msg.PairingHeartbeat import io.seqera.wave.service.pairing.socket.msg.PairingMessage import jakarta.inject.Inject +import jakarta.inject.Named + /** * Test class {@link AbstractMessageQueue} using a {@link io.seqera.wave.service.data.queue.impl.LocalMessageQueue} * @@ -40,9 +44,12 @@ class AbstractMessageQueueLocalTest extends Specification { @Inject private MessageQueue broker + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + def 'should send and consume a request'() { given: - def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100)) + def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100), ioExecutor) when: def result = new CompletableFuture() @@ -58,7 +65,7 @@ class AbstractMessageQueueLocalTest extends Specification { def 'should validate '() { given: - def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100)) + def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100), ioExecutor) expect: queue.targetKey('foo') == 'pairing-outbound-queue/v1:foo' @@ -68,6 +75,4 @@ class AbstractMessageQueueLocalTest extends Specification { queue.close() } - - } diff --git a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy index ad77e1d11..0353fa8bb 100644 --- a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy @@ -23,6 +23,7 @@ import spock.lang.Specification import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import io.micronaut.context.ApplicationContext @@ -54,8 +55,9 @@ class AbstractMessageQueueRedisTest extends Specification implements RedisTestCo def 'should send and consume a request'() { given: + def executor = Executors.newCachedThreadPool() def broker = context.getBean(RedisMessageQueue) - def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100)) + def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100), executor) and: def result = new CompletableFuture() when: @@ -72,11 +74,12 @@ class AbstractMessageQueueRedisTest extends Specification implements RedisTestCo def 'should send and consume a request across instances'() { given: + def executor = Executors.newCachedThreadPool() def broker1 = context.getBean(RedisMessageQueue) - def queue1 = new PairingOutboundQueue(broker1, Duration.ofMillis(100)) + def queue1 = new PairingOutboundQueue(broker1, Duration.ofMillis(100), executor) and: def broker2 = context.getBean(RedisMessageQueue) - def queue2 = new PairingOutboundQueue(broker2, Duration.ofMillis(100)) + def queue2 = new PairingOutboundQueue(broker2, Duration.ofMillis(100), executor) and: def result = new CompletableFuture() diff --git a/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy b/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy index b288b93f5..5519e76de 100644 --- a/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy @@ -516,11 +516,16 @@ class K8sServiceImplTest extends Specification { def "getLatestPodForJob should return the latest pod when multiple pods are present"() { given: def jobName = "test-job" + def namespace = "test-ns" def pod1 = new V1Pod().metadata(new V1ObjectMeta().creationTimestamp(OffsetDateTime.now().minusDays(1))) def pod2 = new V1Pod().metadata(new V1ObjectMeta().creationTimestamp(OffsetDateTime.now())) def allPods = new V1PodList().items(Arrays.asList(pod1, pod2)) def api = Mock(CoreV1Api) - api.listNamespacedPod(_, _, _, _, _, "job-name=${jobName}", _, _, _, _, _, _) >> allPods + def podRequest2 = Mock(CoreV1Api. APIlistNamespacedPodRequest) + podRequest2.execute() >> allPods + def podRequest1 = Mock(CoreV1Api. APIlistNamespacedPodRequest) + podRequest1.labelSelector("job-name=${jobName}") >> podRequest2 + api.listNamespacedPod(namespace) >> podRequest1 def k8sClient = new K8sClient() { @Override ApiClient apiClient() { @@ -531,7 +536,7 @@ class K8sServiceImplTest extends Specification { } } and: - def k8sService = new K8sServiceImpl(k8sClient: k8sClient) + def k8sService = new K8sServiceImpl(k8sClient: k8sClient, namespace: namespace) when: def latestPod = k8sService.getLatestPodForJob(jobName) @@ -543,8 +548,13 @@ class K8sServiceImplTest extends Specification { def "getLatestPodForJob should return null when no pod is present"() { given: def jobName = "test-job" + def namespace = "test-ns" def api = Mock(CoreV1Api) - api.listNamespacedPod(_, _, _, _, _, "job-name=${jobName}", _, _, _, _, _, _) >> null + def podRequest2 = Mock(CoreV1Api. APIlistNamespacedPodRequest) + podRequest2.execute() >> null + def podRequest1 = Mock(CoreV1Api. APIlistNamespacedPodRequest) + podRequest1.labelSelector("job-name=${jobName}") >> podRequest2 + api.listNamespacedPod(namespace) >> podRequest1 def k8sClient = new K8sClient() { @Override ApiClient apiClient() { @@ -555,7 +565,7 @@ class K8sServiceImplTest extends Specification { } } and: - def k8sService = new K8sServiceImpl(k8sClient: k8sClient) + def k8sService = new K8sServiceImpl(k8sClient: k8sClient, namespace: namespace) when: def latestPod = k8sService.getLatestPodForJob(jobName) @@ -829,7 +839,7 @@ class K8sServiceImplTest extends Specification { job.spec.backoffLimit == 3 job.spec.template.spec.containers[0].image == containerImage job.spec.template.spec.containers[0].args == args - job.spec.template.spec.containers[0].resources.requests == null + job.spec.template.spec.containers[0].resources.requests == [:] job.spec.template.spec.containers[0].env == [new V1EnvVar().name('REGISTRY_AUTH_FILE').value('/tmp/config.json')] and: job.spec.template.spec.containers[0].volumeMounts.size() == 2 @@ -889,7 +899,7 @@ class K8sServiceImplTest extends Specification { job.spec.backoffLimit == 3 job.spec.template.spec.containers[0].image == containerImage job.spec.template.spec.containers[0].args == args - job.spec.template.spec.containers[0].resources.requests == null + job.spec.template.spec.containers[0].resources.requests == [:] job.spec.template.spec.volumes.size() == 1 job.spec.template.spec.volumes[0].persistentVolumeClaim.claimName == 'bar' job.spec.template.spec.restartPolicy == 'Never' @@ -976,11 +986,14 @@ class K8sServiceImplTest extends Specification { def api = Mock(BatchV1Api) def client = Mock(K8sClient) { batchV1Api()>>api } def service = Spy(new K8sServiceImpl(namespace:NS, k8sClient: client)) + def jobRequest = Mock(BatchV1Api. APIreadNamespacedJobRequest) when: def status = service.getJobStatus(NAME) + then: - 1 * api.readNamespacedJob(NAME, NS, null) >> JOB + jobRequest.execute() >> JOB + 1 * api.readNamespacedJob(NAME, NS) >> jobRequest and: status == EXPECTED diff --git a/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy b/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy index 4ed710744..32b6c35ad 100644 --- a/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy @@ -80,9 +80,12 @@ class ValidationServiceTest extends Specification { 'quay.io:80/foo:latest' | null 'localhost:8000/foo:latest' | null and: - 'docker:quay.io/foo:latest' | 'Invalid container image name — offending value: docker:quay.io/foo:latest' - 'http://quay.io/foo:latest' | 'Invalid container repository name — offending value: http://quay.io/foo:latest' - 'http://quay.io/foo:latest' | 'Invalid container repository name — offending value: http://quay.io/foo:latest' + 'docker:quay.io/foo:latest' | "Invalid container image name — offending value: 'docker:quay.io/foo:latest'" + 'http://quay.io/foo:latest' | "Invalid container repository name — offending value: 'http://quay.io/foo:latest'" + 'http://quay.io/foo:latest' | "Invalid container repository name — offending value: 'http://quay.io/foo:latest'" + 'ubuntu: latest' | "Invalid container image name — offending value: 'ubuntu: latest'" + 'ubuntu:latest ' | "Invalid container image name — offending value: 'ubuntu:latest '" + ' ' | "Invalid container image name — offending value: ' '" } @Unroll diff --git a/src/test/groovy/io/seqera/wave/util/RetryableTest.groovy b/src/test/groovy/io/seqera/wave/util/RetryableTest.groovy index 70c2fd6d7..3828dc8c1 100644 --- a/src/test/groovy/io/seqera/wave/util/RetryableTest.groovy +++ b/src/test/groovy/io/seqera/wave/util/RetryableTest.groovy @@ -22,9 +22,7 @@ import spock.lang.Specification import java.time.Duration -import dev.failsafe.FailsafeException import groovy.util.logging.Slf4j - /** * * @author Paolo Di Tommaso @@ -65,8 +63,8 @@ class RetryableTest extends Specification { when: retryable.apply(()-> {throw new IOException("Oops failed!")}) then: - def e = thrown(FailsafeException) - e.cause instanceof IOException + def e = thrown(IOException) + e.message == 'Oops failed!' } def 'should validate config' () {