From 77d7b489612e62f4a23b5e3f87c4ff3924cd5c2c Mon Sep 17 00:00:00 2001 From: Kevin Pullin Date: Fri, 3 Dec 2021 15:08:43 -0800 Subject: [PATCH] Recompute TTL values on each `get` TTL values must be recomputed on each `get` action instead of being fixed to a constant value for the lifetime of the cached object. Prior to this change, the constant TTL value `X` would cause kafka-connect to continually reschedule connector restarts `X` milliseconds in the future, effectively ensuring that the connector never actually restarts. With this change, the reschedule actions have a timestamp that shrinks until the TTL is reached. Before (note contant restart time of ~30 mins): ``` 2021-12-03 20:19:65,634 INFO || Scheduling a restart of connector debezium_infra in 1799900 ms [org.apache.kafka.connect.runtime.WorkerConfigTransformer] 2021-12-03 20:18:25,143 INFO || Scheduling a restart of connector debezium_infra in 1799900 ms [org.apache.kafka.connect.runtime.WorkerConfigTransformer] ``` After (note restart time decreases ~40 seconds and the log messages are ~40 seconds apart): ``` 2021-12-03 21:09:24,228 INFO || Scheduling a restart of connector debezium_infra in 2063263 ms [org.apache.kafka.connect.runtime.WorkerConfigTransformer] 2021-12-03 21:08:45,858 INFO || Scheduling a restart of connector debezium_infra in 2101945 ms [org.apache.kafka.connect.runtime.WorkerConfigTransformer] ``` Fix build issues with Azure tests Set the TTL to `None` if the duration is `0` --- .../io/lenses/connect/secrets/package.scala | 11 ++---- .../providers/AzureSecretProvider.scala | 37 ++++++++----------- .../providers/AzureSecretProviderTest.scala | 7 ++-- 3 files changed, 21 insertions(+), 34 deletions(-) diff --git a/secret-provider/src/main/scala/io/lenses/connect/secrets/package.scala b/secret-provider/src/main/scala/io/lenses/connect/secrets/package.scala index 78583d5..1ff40c5 100644 --- a/secret-provider/src/main/scala/io/lenses/connect/secrets/package.scala +++ b/secret-provider/src/main/scala/io/lenses/connect/secrets/package.scala @@ -7,7 +7,6 @@ package io.lenses.connect.secrets import com.typesafe.scalalogging.StrictLogging -import org.apache.kafka.common.config.ConfigData import org.apache.kafka.connect.errors.ConnectException import java.io.File @@ -15,7 +14,6 @@ import java.io.FileOutputStream import java.time.OffsetDateTime import java.util.Base64 import scala.collection.mutable -import scala.jdk.CollectionConverters._ import scala.util.Failure import scala.util.Success import scala.util.Try @@ -150,7 +148,7 @@ package object connect extends StrictLogging { //calculate the min expiry for secrets and return the configData and expiry def getSecretsAndExpiry( secrets: Map[String, (String, Option[OffsetDateTime])], - ): (Option[OffsetDateTime], ConfigData) = { + ): (Option[OffsetDateTime], Map[String, String]) = { val expiryList = mutable.ListBuffer.empty[OffsetDateTime] val data = secrets @@ -159,15 +157,12 @@ package object connect extends StrictLogging { expiry.foreach(e => expiryList.append(e)) (key, value) }) - .asJava if (expiryList.isEmpty) { - (None, new ConfigData(data)) + (None, data) } else { val minExpiry = expiryList.min - val ttl = minExpiry.toInstant.toEpochMilli - OffsetDateTime.now.toInstant - .toEpochMilli - (Some(minExpiry), new ConfigData(data, ttl)) + (Some(minExpiry), data) } } diff --git a/secret-provider/src/main/scala/io/lenses/connect/secrets/providers/AzureSecretProvider.scala b/secret-provider/src/main/scala/io/lenses/connect/secrets/providers/AzureSecretProvider.scala index c9a071a..8a45be5 100644 --- a/secret-provider/src/main/scala/io/lenses/connect/secrets/providers/AzureSecretProvider.scala +++ b/secret-provider/src/main/scala/io/lenses/connect/secrets/providers/AzureSecretProvider.scala @@ -6,6 +6,8 @@ package io.lenses.connect.secrets.providers +import java.time.Duration + import com.azure.core.credential.TokenCredential import com.azure.security.keyvault.secrets.SecretClient import com.azure.security.keyvault.secrets.SecretClientBuilder @@ -25,7 +27,7 @@ class AzureSecretProvider() extends ConfigProvider with AzureHelper { private var rootDir: String = _ private var credentials: Option[TokenCredential] = None val clientMap: mutable.Map[String, SecretClient] = mutable.Map.empty - val cache = mutable.Map.empty[String, (Option[OffsetDateTime], ConfigData)] + val cache = mutable.Map.empty[String, (Option[OffsetDateTime], Map[String, String])] // configure the vault client override def configure(configs: util.Map[String, _]): Unit = { @@ -62,32 +64,19 @@ class AzureSecretProvider() extends ConfigProvider with AzureHelper { clientMap += (keyVaultUrl -> client) + val now = OffsetDateTime.now() + val (expiry, data) = cache.get(keyVaultUrl) match { case Some((expiresAt, data)) => // we have all the keys and are before the expiry - val now = OffsetDateTime.now() if ( - keys.asScala.subsetOf(data.data().asScala.keySet) && (expiresAt + keys.asScala.subsetOf(data.keySet) && expiresAt .getOrElse(now.plusSeconds(1)) - .isAfter(now)) + .isAfter(now) ) { logger.info("Fetching secrets from cache") - ( - expiresAt, - new ConfigData( - data - .data() - .asScala - .view - .filter { - case (k, _) => keys.contains(k) - } - .toMap - .asJava, - data.ttl(), - ), - ) + (expiresAt, data.view.filterKeys(k => keys.contains(k)).toMap) } else { // missing some or expired so reload getSecretsAndExpiry(getSecrets(client, keys.asScala.toSet)) @@ -97,9 +86,13 @@ class AzureSecretProvider() extends ConfigProvider with AzureHelper { getSecretsAndExpiry(getSecrets(client, keys.asScala.toSet)) } - expiry.foreach(exp => logger.info(s"Min expiry for TTL set to [${exp.toString}]")) - cache += (keyVaultUrl -> (expiry, data)) - data + var ttl = 0L + expiry.foreach { exp => + ttl = Duration.between(now, exp).toMillis + logger.info(s"Min expiry for TTL set to [${exp.toString}]") + } + cache.put(keyVaultUrl, (expiry, data)) + new ConfigData(data.asJava, ttl) } override def close(): Unit = {} diff --git a/secret-provider/src/test/scala/io/lenses/connect/secrets/providers/AzureSecretProviderTest.scala b/secret-provider/src/test/scala/io/lenses/connect/secrets/providers/AzureSecretProviderTest.scala index 490ab20..c21050a 100644 --- a/secret-provider/src/test/scala/io/lenses/connect/secrets/providers/AzureSecretProviderTest.scala +++ b/secret-provider/src/test/scala/io/lenses/connect/secrets/providers/AzureSecretProviderTest.scala @@ -15,7 +15,6 @@ import io.lenses.connect.secrets.config.AzureProviderSettings import io.lenses.connect.secrets.connect import io.lenses.connect.secrets.connect.AuthMode import org.apache.kafka.common.config.provider.ConfigProvider -import org.apache.kafka.common.config.ConfigData import org.apache.kafka.common.config.ConfigTransformer import org.apache.kafka.connect.errors.ConnectException import org.mockito.Mockito.when @@ -236,7 +235,7 @@ class AzureSecretProviderTest extends AnyWordSpec with Matchers with MockitoSuga // poke in the mocked client provider.clientMap += (s"https://$secretPath" -> client) val now = OffsetDateTime.now().plusMinutes(10) - val cachedData = new ConfigData(Map(secretKey -> secretPath).asJava) + val cachedData = Map(secretKey -> secretPath) val cached = (Some(now), cachedData) // add to cache @@ -283,7 +282,7 @@ class AzureSecretProviderTest extends AnyWordSpec with Matchers with MockitoSuga provider.clientMap += (vaultUrl -> client) //put expiry of cache 1 second behind val now = OffsetDateTime.now().minusSeconds(1) - val cachedData = new ConfigData(Map(secretKey -> secretPath).asJava) + val cachedData = Map(secretKey -> secretPath) val cached = (Some(now), cachedData) // add to cache @@ -332,7 +331,7 @@ class AzureSecretProviderTest extends AnyWordSpec with Matchers with MockitoSuga provider.clientMap += (vaultUrl -> client) //put expiry of cache 1 second behind val now = OffsetDateTime.now() - val cachedData = new ConfigData(Map("old-key" -> secretPath).asJava) + val cachedData = Map("old-key" -> secretPath) val cached = (Some(now), cachedData) // add to cache