From 745f3913a7fc439bcf0605916831c2563b7c35ad Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Fri, 2 Jul 2021 18:02:30 -0400 Subject: [PATCH 1/2] Higher level VaultClient --- .../main/scala/com/banno/vault/Vault.scala | 24 ++++++----- .../scala/com/banno/vault/VaultClient.scala | 40 +++++++++++++++++++ 2 files changed, 54 insertions(+), 10 deletions(-) create mode 100644 core/src/main/scala/com/banno/vault/VaultClient.scala diff --git a/core/src/main/scala/com/banno/vault/Vault.scala b/core/src/main/scala/com/banno/vault/Vault.scala index 148528ed..ae91ee6d 100644 --- a/core/src/main/scala/com/banno/vault/Vault.scala +++ b/core/src/main/scala/com/banno/vault/Vault.scala @@ -219,16 +219,9 @@ object Vault { (roleId: String, secretPath: String, duration: FiniteDuration, waitInterval: FiniteDuration): Stream[F, A] = Stream.eval(login(client, vaultUri)(roleId)).flatMap(token => keepLoginAndSecretLeased[F, A](client, vaultUri)(token, secretPath, duration, waitInterval)) - /** - * This function logs into the Vault server given by the vaultUri, to obtain a loginToken. - * It then also provides a Stream that continuously renews the token when it is about to finish. - * - keeps the token constantly renewed - * - Upon termination of the Stream (from the using application) revokes the token. - * However, any error on revoking the token is ignored. - */ - def keepLoginRenewed[F[_]](client: Client[F], vaultUri: Uri) + def tokenStream[F[_]](client: Client[F], vaultUri: Uri) (token: VaultToken, tokenLeaseExtension: FiniteDuration) - (implicit T: Temporal[F]): Stream[F, String] = { + (implicit T: Temporal[F]): Stream[F, VaultToken] = { def renewOnDuration(token: VaultToken): F[VaultToken] = { val waitInterval: Long = @@ -248,10 +241,21 @@ object Vault { def cleanup(token: VaultToken): F[Unit] = revokeSelfToken(client, vaultUri)(token).handleError(_ => ()) Stream.bracket(token.pure[F])(cleanup).flatMap { token => - Stream.emit(token.clientToken).concurrently(keep(token)) + Stream.emit(token).concurrently(keep(token)) } } + /** + * This function logs into the Vault server given by the vaultUri, to obtain a loginToken. + * It then also provides a Stream that continuously renews the token when it is about to finish. + * - keeps the token constantly renewed + * - Upon termination of the Stream (from the using application) revokes the token. + * However, any error on revoking the token is ignored. + */ + def keepLoginRenewed[F[_]](client: Client[F], vaultUri: Uri) + (token: VaultToken, tokenLeaseExtension: FiniteDuration)(implicit T: Temporal[F]): Stream[F, String] = + tokenStream(client, vaultUri)(token, tokenLeaseExtension).map(_.clientToken) + def loginAndKeep[F[_]: Async](client: Client[F], vaultUri: Uri) (roleId: String, tokenLeaseExtension: FiniteDuration): Stream[F, String] = Stream.eval(login(client, vaultUri)(roleId)).flatMap(token => keepLoginRenewed[F](client, vaultUri)(token, tokenLeaseExtension)) diff --git a/core/src/main/scala/com/banno/vault/VaultClient.scala b/core/src/main/scala/com/banno/vault/VaultClient.scala new file mode 100644 index 00000000..592537e6 --- /dev/null +++ b/core/src/main/scala/com/banno/vault/VaultClient.scala @@ -0,0 +1,40 @@ +package com.banno.vault + +import cats.effect.kernel.{Concurrent, Ref, Resource, Temporal} +import cats.effect.syntax.all._ +import com.banno.vault.models._ +import io.circe.Decoder +import org.http4s.Uri +import org.http4s.client.Client +import scala.concurrent.duration.FiniteDuration + +trait VaultClient[F[_]] { + def readSecret[A: Decoder](secretPath: String): F[VaultSecret[A]] +} + +object VaultClient { + def apply[F[_]](implicit ev: VaultClient[F]): VaultClient[F] = ev + + def login[F[_]](client: Client[F], vaultUri: Uri, roleId: String, tokenLeaseExtension: FiniteDuration) + (implicit F: Temporal[F]): Resource[F, VaultClient[F]] = { + + def startRenewalStream(ref: Ref[F, VaultToken], token: VaultToken) = + Vault.tokenStream(client, vaultUri)(token, tokenLeaseExtension) + .evalMap(newToken => ref.set(newToken)) + .compile + .drain + .start + + for { + token <- Resource.make(Vault.login[F](client, vaultUri)(roleId))(Vault.revokeSelfToken(client, vaultUri)) + tokenRef <- Resource.eval(Ref.of(token)) + _ <- Resource.make(startRenewalStream(tokenRef, token))(_.cancel) + } yield impl(client, vaultUri, token) + } + + private def impl[F[_]: Concurrent](client: Client[F], vaultUri: Uri, token: VaultToken): VaultClient[F] = + new VaultClient[F] { + def readSecret[A: Decoder](secretPath: String): F[VaultSecret[A]] = + Vault.readSecret(client, vaultUri)(token.clientToken, secretPath) + } +} From fe6ffcc6f4c3b28900dc5128edfa5d1e738b68e0 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Fri, 2 Jul 2021 18:08:11 -0400 Subject: [PATCH 2/2] Except, like, use the refreshed token --- core/src/main/scala/com/banno/vault/VaultClient.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/banno/vault/VaultClient.scala b/core/src/main/scala/com/banno/vault/VaultClient.scala index 592537e6..ddb179f8 100644 --- a/core/src/main/scala/com/banno/vault/VaultClient.scala +++ b/core/src/main/scala/com/banno/vault/VaultClient.scala @@ -32,9 +32,10 @@ object VaultClient { } yield impl(client, vaultUri, token) } - private def impl[F[_]: Concurrent](client: Client[F], vaultUri: Uri, token: VaultToken): VaultClient[F] = + private def impl[F[_]: Concurrent](client: Client[F], vaultUri: Uri, tokenRef: Ref[F, VaultToken]): VaultClient[F] = new VaultClient[F] { def readSecret[A: Decoder](secretPath: String): F[VaultSecret[A]] = - Vault.readSecret(client, vaultUri)(token.clientToken, secretPath) + tokenRef.get.flatMap(token => + Vault.readSecret(client, vaultUri)(token.clientToken, secretPath)) } }