Skip to content

Commit

Permalink
Add blocking executor to async caches
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Nov 26, 2024
1 parent 6322000 commit 7475f66
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 25 deletions.
26 changes: 19 additions & 7 deletions src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
Expand All @@ -33,6 +34,7 @@ import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.exception.RegistryUnauthorizedAccessException
Expand All @@ -41,6 +43,7 @@ import io.seqera.wave.util.RegHelper
import io.seqera.wave.util.Retryable
import io.seqera.wave.util.StringUtils
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.auth.RegistryUtils.isServerError
Expand All @@ -64,6 +67,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
@Inject
private RegistryTokenStore tokenStore

@Inject
@Named(TaskExecutors.BLOCKING)
private volatile ExecutorService ioExecutor

@Canonical
@ToString(includePackage = false, includeNames = true)
static private class CacheKey {
Expand Down Expand Up @@ -101,16 +108,21 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<CacheKey, String> cacheTokens = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.buildAsync(loader)

private AsyncLoadingCache<CacheKey, String> cacheTokens
@Inject
private RegistryLookupService lookupService

@Inject RegistryCredentialsFactory credentialsFactory
@Inject
private RegistryCredentialsFactory credentialsFactory

private void init() {
cacheTokens = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.executor(ioExecutor)
.buildAsync(loader)
}

/**
* Implements container registry login
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ package io.seqera.wave.auth
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.Retryable
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.WaveDefault.DOCKER_REGISTRY_1
Expand All @@ -55,6 +59,10 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
@Inject
private RegistryAuthStore store

@Inject
@Named(TaskExecutors.BLOCKING)
private volatile ExecutorService ioExecutor

private CacheLoader<URI, RegistryAuth> loader = new CacheLoader<URI, RegistryAuth>() {
@Override
RegistryAuth load(URI endpoint) throws Exception {
Expand All @@ -74,11 +82,17 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<URI, RegistryAuth> cache = Caffeine
private AsyncLoadingCache<URI, RegistryAuth> cache

@PostConstruct
void init() {
cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.executor(ioExecutor)
.buildAsync(loader)
}

protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
Expand Down
26 changes: 20 additions & 6 deletions src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.service.aws

import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

Expand All @@ -27,7 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
Expand Down Expand Up @@ -73,13 +78,22 @@ class AwsEcrService {
}
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.buildAsync(loader)
@Inject
@Named(TaskExecutors.BLOCKING)
private volatile ExecutorService ioExecutor

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache

@PostConstruct
private void init() {
cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.executor(ioExecutor)
.buildAsync(loader)
}

private EcrClient ecrClient(String accessKey, String secretKey, String region) {
EcrClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.seqera.wave.service.data.queue

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -61,21 +62,27 @@ abstract class AbstractMessageQueue<M> implements Runnable {
final private String name0

// FIXME https://github.com/seqeralabs/wave/issues/747
final private AsyncCache<String,Boolean> closedClients = Caffeine
.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.buildAsync()
final private AsyncCache<String,Boolean> closedClients

AbstractMessageQueue(MessageQueue<String> broker) {
AbstractMessageQueue(MessageQueue<String> broker, ExecutorService ioExecutor) {
final type = TypeHelper.getGenericType(this, 0)
this.encoder = new MoshiEncodeStrategy<M>(type) {}
this.broker = broker
this.closedClients = createCache(ioExecutor)
this.name0 = name() + '-thread-' + count.getAndIncrement()
this.thread = new Thread(this, name0)
this.thread.setDaemon(true)
this.thread.start()
}

private AsyncCache<String,Boolean> createCache(ExecutorService ioExecutor) {
Caffeine
.newBuilder()
.executor(ioExecutor)
.expireAfterWrite(10, TimeUnit.MINUTES)
.buildAsync()
}

protected abstract String name()

protected abstract Duration pollInterval()
Expand Down
9 changes: 9 additions & 0 deletions src/main/groovy/io/seqera/wave/service/job/JobManager.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ package io.seqera.wave.service.job

import java.time.Duration
import java.time.Instant
import java.util.concurrent.ExecutorService

import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Context
import io.micronaut.scheduling.TaskExecutors
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named

/**
* Implement the logic to handle Blob cache transfer (uploads)
*
Expand All @@ -51,6 +55,10 @@ class JobManager {
@Inject
private JobConfig config

@Inject
@Named(TaskExecutors.BLOCKING)
private volatile ExecutorService ioExecutor

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncCache<String,Instant> debounceCache

Expand All @@ -60,6 +68,7 @@ class JobManager {
debounceCache = Caffeine
.newBuilder()
.expireAfterWrite(config.graceInterval.multipliedBy(2))
.executor(ioExecutor)
.buildAsync()
queue.addConsumer((job)-> processJob(job))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package io.seqera.wave.service.pairing.socket

import java.time.Duration
import java.util.concurrent.ExecutorService
import javax.annotation.PreDestroy

import groovy.transform.CompileStatic
import io.micronaut.context.annotation.Value
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.service.data.queue.AbstractMessageQueue
import io.seqera.wave.service.data.queue.MessageQueue
import io.seqera.wave.service.pairing.socket.msg.PairingMessage
import jakarta.inject.Named
import jakarta.inject.Singleton
/**
* Implement a distributed queue for Wave pairing messages
Expand All @@ -40,9 +43,10 @@ class PairingOutboundQueue extends AbstractMessageQueue<PairingMessage> {

PairingOutboundQueue(
MessageQueue<String> broker,
@Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval
@Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval,
@Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor
) {
super(broker)
super(broker, ioExecutor)
this.pollInterval = pollInterval
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import io.seqera.wave.tower.client.TowerClient
import io.seqera.wave.util.ExponentialAttempt
import io.seqera.wave.util.JacksonHelper
import io.seqera.wave.util.RegHelper
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import static io.seqera.wave.util.LongRndKey.rndHex
Expand Down Expand Up @@ -92,10 +93,16 @@ abstract class TowerConnector {
}
}

private AsyncLoadingCache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache = Caffeine
.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.buildAsync(loader)
private AsyncLoadingCache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache

@PostConstruct
void init() {
refreshCache = Caffeine
.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.executor(getIoExecutor())
.buildAsync(loader)
}

/** Only for testing - do not use */
Cache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache0() {
Expand Down

0 comments on commit 7475f66

Please sign in to comment.