Skip to content

Commit

Permalink
Add blocking executor to async caches (#759)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored Nov 26, 2024
1 parent a2db2b0 commit 86f7d3e
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 42 deletions.
27 changes: 21 additions & 6 deletions src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
Expand All @@ -33,14 +34,17 @@ import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.exception.RegistryUnauthorizedAccessException
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.RegHelper
import io.seqera.wave.util.Retryable
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.auth.RegistryUtils.isServerError
Expand All @@ -64,6 +68,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
@Inject
private RegistryTokenStore tokenStore

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

@Canonical
@ToString(includePackage = false, includeNames = true)
static private class CacheKey {
Expand Down Expand Up @@ -101,16 +109,23 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<CacheKey, String> cacheTokens = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.buildAsync(loader)
private AsyncLoadingCache<CacheKey, String> cacheTokens

@Inject
private RegistryLookupService lookupService

@Inject RegistryCredentialsFactory credentialsFactory
@Inject
private RegistryCredentialsFactory credentialsFactory

@PostConstruct
private void init() {
cacheTokens = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.executor(ioExecutor)
.buildAsync(loader)
}

/**
* Implements container registry login
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ package io.seqera.wave.auth
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.Retryable
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.WaveDefault.DOCKER_REGISTRY_1
Expand All @@ -55,6 +59,10 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
@Inject
private RegistryAuthStore store

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

private CacheLoader<URI, RegistryAuth> loader = new CacheLoader<URI, RegistryAuth>() {
@Override
RegistryAuth load(URI endpoint) throws Exception {
Expand All @@ -74,11 +82,17 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<URI, RegistryAuth> cache = Caffeine
private AsyncLoadingCache<URI, RegistryAuth> cache

@PostConstruct
void init() {
cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.executor(ioExecutor)
.buildAsync(loader)
}

protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
Expand Down
26 changes: 20 additions & 6 deletions src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.service.aws

import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

Expand All @@ -27,7 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
Expand Down Expand Up @@ -73,13 +78,22 @@ class AwsEcrService {
}
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.buildAsync(loader)
@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache

@PostConstruct
private void init() {
cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.executor(ioExecutor)
.buildAsync(loader)
}

private EcrClient ecrClient(String accessKey, String secretKey, String region) {
EcrClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.encoder.EncodingStrategy
import jakarta.inject.Inject
import jakarta.inject.Named
Expand Down Expand Up @@ -55,7 +56,7 @@ abstract class AbstractFutureStore<V> implements FutureStore<String,V> {
private volatile Duration pollInterval

@Inject
@Named('future-store-executor')
@Named(TaskExecutors.BLOCKING)
private ExecutorService executor

AbstractFutureStore(FutureHash<String> store, EncodingStrategy<V> encodingStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.seqera.wave.service.data.queue

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -61,21 +62,27 @@ abstract class AbstractMessageQueue<M> implements Runnable {
final private String name0

// FIXME https://github.com/seqeralabs/wave/issues/747
final private AsyncCache<String,Boolean> closedClients = Caffeine
.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.buildAsync()
final private AsyncCache<String,Boolean> closedClients

AbstractMessageQueue(MessageQueue<String> broker) {
AbstractMessageQueue(MessageQueue<String> broker, ExecutorService ioExecutor) {
final type = TypeHelper.getGenericType(this, 0)
this.encoder = new MoshiEncodeStrategy<M>(type) {}
this.broker = broker
this.closedClients = createCache(ioExecutor)
this.name0 = name() + '-thread-' + count.getAndIncrement()
this.thread = new Thread(this, name0)
this.thread.setDaemon(true)
this.thread.start()
}

private AsyncCache<String,Boolean> createCache(ExecutorService ioExecutor) {
Caffeine
.newBuilder()
.executor(ioExecutor)
.expireAfterWrite(10, TimeUnit.MINUTES)
.buildAsync()
}

protected abstract String name()

protected abstract Duration pollInterval()
Expand Down
9 changes: 9 additions & 0 deletions src/main/groovy/io/seqera/wave/service/job/JobManager.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ package io.seqera.wave.service.job

import java.time.Duration
import java.time.Instant
import java.util.concurrent.ExecutorService

import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Context
import io.micronaut.scheduling.TaskExecutors
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named

/**
* Implement the logic to handle Blob cache transfer (uploads)
*
Expand All @@ -51,6 +55,10 @@ class JobManager {
@Inject
private JobConfig config

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncCache<String,Instant> debounceCache

Expand All @@ -60,6 +68,7 @@ class JobManager {
debounceCache = Caffeine
.newBuilder()
.expireAfterWrite(config.graceInterval.multipliedBy(2))
.executor(ioExecutor)
.buildAsync()
queue.addConsumer((job)-> processJob(job))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BuildLogServiceImpl implements BuildLogService {

@Inject
@Named(TaskExecutors.IO)
private volatile ExecutorService ioExecutor
private ExecutorService ioExecutor

@PostConstruct
private void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package io.seqera.wave.service.pairing.socket

import java.time.Duration
import java.util.concurrent.ExecutorService
import javax.annotation.PreDestroy

import groovy.transform.CompileStatic
import io.micronaut.context.annotation.Value
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.service.data.queue.AbstractMessageQueue
import io.seqera.wave.service.data.queue.MessageQueue
import io.seqera.wave.service.pairing.socket.msg.PairingMessage
import jakarta.inject.Named
import jakarta.inject.Singleton
/**
* Implement a distributed queue for Wave pairing messages
Expand All @@ -40,9 +43,10 @@ class PairingOutboundQueue extends AbstractMessageQueue<PairingMessage> {

PairingOutboundQueue(
MessageQueue<String> broker,
@Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval
@Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval,
@Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor
) {
super(broker)
super(broker, ioExecutor)
this.pollInterval = pollInterval
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import io.seqera.wave.tower.client.TowerClient
import io.seqera.wave.util.ExponentialAttempt
import io.seqera.wave.util.JacksonHelper
import io.seqera.wave.util.RegHelper
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import static io.seqera.wave.util.LongRndKey.rndHex
Expand Down Expand Up @@ -92,10 +93,16 @@ abstract class TowerConnector {
}
}

private AsyncLoadingCache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache = Caffeine
.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.buildAsync(loader)
private AsyncLoadingCache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache

@PostConstruct
void init() {
refreshCache = Caffeine
.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.executor(ioExecutor)
.buildAsync(loader)
}

/** Only for testing - do not use */
Cache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache0() {
Expand Down
3 changes: 0 additions & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ micronaut:
stream-executor:
type: FIXED
number-of-threads: 16
future-store-executor:
type : FIXED
number-of-threads : 32
netty:
event-loops:
stream-pool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@

package io.seqera.wave.service.aws


import spock.lang.Requires
import spock.lang.Specification


import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
@MicronautTest
class AwsEcrServiceTest extends Specification {

@Inject
AwsEcrService provider

@Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')})
def 'should get registry token' () {
given:
def accessKey = System.getenv('AWS_ACCESS_KEY_ID')
def secretKey = System.getenv('AWS_SECRET_ACCESS_KEY')
def REGION = 'eu-west-1'
def provider = new AwsEcrService()

when:
def creds = provider.getLoginToken(accessKey, secretKey, REGION, false).tokenize(":")
Expand All @@ -49,17 +52,15 @@ class AwsEcrServiceTest extends Specification {
thrown(Exception)
}

@Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')})
def 'should check registry info' () {
given:
def provider = new AwsEcrService()
expect:
provider.getEcrHostInfo(null) == null
provider.getEcrHostInfo('foo') == null
provider.getEcrHostInfo('195996028523.dkr.ecr.eu-west-1.amazonaws.com') == new AwsEcrService.AwsEcrHostInfo('195996028523', 'eu-west-1')
provider.getEcrHostInfo('195996028523.dkr.ecr.eu-west-1.amazonaws.com/foo') == new AwsEcrService.AwsEcrHostInfo('195996028523', 'eu-west-1')
and:
provider.getEcrHostInfo('public.ecr.aws') == new AwsEcrService.AwsEcrHostInfo(null, 'us-east-1')

}

def 'should check ecr registry' () {
Expand Down
Loading

0 comments on commit 86f7d3e

Please sign in to comment.