Skip to content

Commit

Permalink
Auth Manager API part 3: OAuth2 Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra committed Dec 23, 2024
1 parent cd187c5 commit b88d99d
Show file tree
Hide file tree
Showing 10 changed files with 1,214 additions and 11 deletions.
47 changes: 43 additions & 4 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.util.PropertyUtil;
import org.immutables.value.Value;

/**
* The purpose of this class is to hold configuration options for {@link
* org.apache.iceberg.rest.auth.OAuth2Util.AuthSession}.
* The purpose of this class is to hold OAuth configuration options for {@link
* OAuth2Util.AuthSession}.
*/
@Value.Style(redactedMask = "****")
@SuppressWarnings("ImmutablesStyle")
@Value.Immutable
@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
public interface AuthConfig {
@Nullable
@Value.Redacted
Expand All @@ -47,7 +48,7 @@ default String scope() {
return OAuth2Properties.CATALOG_SCOPE;
}

@Value.Lazy
@Value.Default
@Nullable
default Long expiresAtMillis() {
return OAuth2Util.expiresAtMillis(token());
Expand All @@ -69,4 +70,42 @@ default String oauth2ServerUri() {
static ImmutableAuthConfig.Builder builder() {
return ImmutableAuthConfig.builder();
}

static AuthConfig fromProperties(Map<String, String> properties) {
return builder()
.credential(properties.get(OAuth2Properties.CREDENTIAL))
.token(properties.get(OAuth2Properties.TOKEN))
.scope(properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE))
.oauth2ServerUri(
properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()))
.optionalOAuthParams(OAuth2Util.buildOptionalParam(properties))
.keepRefreshed(
PropertyUtil.propertyAsBoolean(
properties,
OAuth2Properties.TOKEN_REFRESH_ENABLED,
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT))
.expiresAtMillis(expiresAtMillis(properties))
.build();
}

private static Long expiresAtMillis(Map<String, String> props) {
Long expiresAtMillis = null;

if (props.containsKey(OAuth2Properties.TOKEN)) {
expiresAtMillis = OAuth2Util.expiresAtMillis(props.get(OAuth2Properties.TOKEN));
}

if (expiresAtMillis == null) {
if (props.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
long millis =
PropertyUtil.propertyAsLong(
props,
OAuth2Properties.TOKEN_EXPIRES_IN_MS,
OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
expiresAtMillis = System.currentTimeMillis() + millis;
}
}

return expiresAtMillis;
}
}
22 changes: 20 additions & 2 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,23 @@ public class AuthManagers {
private AuthManagers() {}

public static AuthManager loadAuthManager(String name, Map<String, String> properties) {
String authType =
properties.getOrDefault(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_NONE);
String authType = properties.get(AuthProperties.AUTH_TYPE);
if (authType == null) {
boolean hasCredential = properties.containsKey(OAuth2Properties.CREDENTIAL);
boolean hasToken = properties.containsKey(OAuth2Properties.TOKEN);
if (hasCredential || hasToken) {
LOG.warn(
"Inferring {}={} since property {} was provided. "
+ "Please explicitly set {} to avoid this warning.",
AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_OAUTH2,
hasCredential ? OAuth2Properties.CREDENTIAL : OAuth2Properties.TOKEN,
AuthProperties.AUTH_TYPE);
authType = AuthProperties.AUTH_TYPE_OAUTH2;
} else {
authType = AuthProperties.AUTH_TYPE_NONE;
}
}

String impl;
switch (authType.toLowerCase(Locale.ROOT)) {
Expand All @@ -42,6 +57,9 @@ public static AuthManager loadAuthManager(String name, Map<String, String> prope
case AuthProperties.AUTH_TYPE_BASIC:
impl = AuthProperties.AUTH_MANAGER_IMPL_BASIC;
break;
case AuthProperties.AUTH_TYPE_OAUTH2:
impl = AuthProperties.AUTH_MANAGER_IMPL_OAUTH2;
break;
default:
impl = authType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ private AuthProperties() {}

public static final String AUTH_TYPE_NONE = "none";
public static final String AUTH_TYPE_BASIC = "basic";
public static final String AUTH_TYPE_OAUTH2 = "oauth2";

public static final String AUTH_MANAGER_IMPL_NONE =
"org.apache.iceberg.rest.auth.NoopAuthManager";
public static final String AUTH_MANAGER_IMPL_BASIC =
"org.apache.iceberg.rest.auth.BasicAuthManager";
public static final String AUTH_MANAGER_IMPL_OAUTH2 =
"org.apache.iceberg.rest.auth.OAuth2Manager";

public static final String BASIC_USERNAME = "rest.auth.basic.username";
public static final String BASIC_PASSWORD = "rest.auth.basic.password";
Expand Down
127 changes: 127 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthSessionCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

/** A cache for {@link AuthSession} instances. */
public class AuthSessionCache implements AutoCloseable {

private final Duration sessionTimeout;
private final Executor executor;
private final LongSupplier nanoTimeSupplier;

private volatile Cache<String, AuthSession> sessionCache;

/**
* Creates a new cache with the given session timeout, and with default executor and nano time
* supplier for eviction tasks.
*
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
*/
public AuthSessionCache(Duration sessionTimeout) {
this(sessionTimeout, null, null);
}

/**
* Creates a new cache with the given session timeout, executor, and nano time supplier. This
* method is useful for testing mostly.
*
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
* @param executor the executor to use for eviction tasks; if null, the cache will use the
* {@linkplain ForkJoinPool#commonPool() common pool}. The executor will not be closed when
* this cache is closed.
* @param nanoTimeSupplier the supplier for nano time; if null, the cache will use {@link
* System#nanoTime()}.
*/
public AuthSessionCache(
Duration sessionTimeout,
@Nullable Executor executor,
@Nullable LongSupplier nanoTimeSupplier) {
this.sessionTimeout = sessionTimeout;
this.executor = executor;
this.nanoTimeSupplier = nanoTimeSupplier;
}

/**
* Returns a cached session for the given key, loading it with the given loader if it is not
* already cached.
*
* @param key the key to use for the session.
* @param loader the loader to use to load the session if it is not already cached.
* @param <T> the type of the session.
* @return the cached session.
*/
@SuppressWarnings("unchecked")
public <T extends AuthSession> T cachedSession(String key, Function<String, T> loader) {
return (T) sessionCache().get(key, loader);
}

@Override
public void close() {
Cache<String, AuthSession> cache = sessionCache;
this.sessionCache = null;
if (cache != null) {
cache.invalidateAll();
cache.cleanUp();
}
}

@VisibleForTesting
Cache<String, AuthSession> sessionCache() {
if (sessionCache == null) {
synchronized (this) {
if (sessionCache == null) {
this.sessionCache = newSessionCache(sessionTimeout, executor, nanoTimeSupplier);
}
}
}
return sessionCache;
}

private static Cache<String, AuthSession> newSessionCache(
Duration sessionTimeout, Executor executor, LongSupplier nanoTimeSupplier) {
Caffeine<String, AuthSession> builder =
Caffeine.newBuilder()
.expireAfterAccess(sessionTimeout)
.removalListener(
(id, auth, cause) -> {
if (auth != null) {
auth.close();
}
});
if (executor != null) {
builder.executor(executor);
}
if (nanoTimeSupplier != null) {
builder.ticker(nanoTimeSupplier::getAsLong);
}
return builder.build();
}
}
Loading

0 comments on commit b88d99d

Please sign in to comment.