Skip to content

Commit

Permalink
BigTable: simplify configuration of clients (#7639)
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra authored Oct 18, 2023
1 parent 424e637 commit 68a7d63
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,27 @@
import io.quarkus.runtime.annotations.StaticInitSafe;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import org.projectnessie.versioned.storage.bigtable.BigTableClientsConfig;

@StaticInitSafe
@ConfigMapping(prefix = "nessie.version.store.persist.bigtable")
public interface QuarkusBigTableConfig {
public interface QuarkusBigTableConfig extends BigTableClientsConfig {

@WithDefault("nessie")
@Override
String instanceId();

Optional<String> appProfileId();

Optional<String> quotaProjectId();

Optional<String> endpoint();

Optional<String> mtlsEndpoint();
@WithDefault("8086")
@Override
int emulatorPort();

Optional<String> emulatorHost();
@WithDefault("true")
@Override
boolean enableTelemetry();

Optional<String> tablePrefix();

Map<String, String> jwtAudienceMapping();

Optional<Duration> maxRetryDelay();

OptionalInt maxAttempts();

Optional<Duration> initialRpcTimeout();

Optional<Duration> totalTimeout();

Optional<Duration> initialRetryDelay();

OptionalInt minChannelCount();

OptionalInt maxChannelCount();

OptionalInt initialChannelCount();

OptionalInt minRpcsPerChannel();

OptionalInt maxRpcsPerChannel();

@WithDefault("false")
boolean noTableAdminClient();

@WithDefault("8086")
int emulatorPort();

@WithDefault("true")
boolean enableTelemetry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,21 @@
package org.projectnessie.quarkus.providers.storage;

import static org.projectnessie.quarkus.config.VersionStoreConfig.VersionStoreType.BIGTABLE;
import static org.projectnessie.versioned.storage.bigtable.BigTableBackendFactory.configureDataClient;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.rpc.PermissionDeniedException;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import io.quarkiverse.googlecloudservices.common.GcpBootstrapConfiguration;
import io.quarkiverse.googlecloudservices.common.GcpConfigHolder;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.util.Optional;
import org.projectnessie.quarkus.config.QuarkusBigTableConfig;
import org.projectnessie.quarkus.providers.versionstore.StoreType;
import org.projectnessie.versioned.storage.bigtable.BigTableBackendConfig;
import org.projectnessie.versioned.storage.bigtable.BigTableBackendFactory;
import org.projectnessie.versioned.storage.bigtable.BigTableClientsFactory;
import org.projectnessie.versioned.storage.common.persist.Backend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -98,80 +93,17 @@ public Backend buildBackend() {
}

try {
BigtableDataSettings.Builder dataSettings =
(bigTableConfig.emulatorHost().isPresent()
? BigtableDataSettings.newBuilderForEmulator(
bigTableConfig.emulatorHost().get(), bigTableConfig.emulatorPort())
.setCredentialsProvider(NoCredentialsProvider.create())
: BigtableDataSettings.newBuilder().setCredentialsProvider(credentialsProvider));
dataSettings.setProjectId(projectId).setInstanceId(bigTableConfig.instanceId());
bigTableConfig.appProfileId().ifPresent(dataSettings::setAppProfileId);
bigTableConfig.mtlsEndpoint().ifPresent(dataSettings.stubSettings()::setMtlsEndpoint);
bigTableConfig.quotaProjectId().ifPresent(dataSettings.stubSettings()::setQuotaProjectId);
bigTableConfig.endpoint().ifPresent(dataSettings.stubSettings()::setEndpoint);
if (!bigTableConfig.jwtAudienceMapping().isEmpty()) {
dataSettings.stubSettings().setJwtAudienceMapping(bigTableConfig.jwtAudienceMapping());
}

ChannelPoolSettings defaultPoolSettings = ChannelPoolSettings.builder().build();

ChannelPoolSettings poolSettings =
ChannelPoolSettings.builder()
.setMinChannelCount(
bigTableConfig.minChannelCount().orElse(defaultPoolSettings.getMinChannelCount()))
.setMaxChannelCount(
bigTableConfig.maxChannelCount().orElse(defaultPoolSettings.getMaxChannelCount()))
.setInitialChannelCount(
bigTableConfig
.initialChannelCount()
.orElse(defaultPoolSettings.getInitialChannelCount()))
.setMinRpcsPerChannel(
bigTableConfig
.minRpcsPerChannel()
.orElse(defaultPoolSettings.getMinRpcsPerChannel()))
.setMaxRpcsPerChannel(
bigTableConfig
.maxRpcsPerChannel()
.orElse(defaultPoolSettings.getMaxRpcsPerChannel()))
.setPreemptiveRefreshEnabled(true)
.build();

configureDataClient(
dataSettings,
Optional.of(poolSettings),
bigTableConfig.totalTimeout(),
bigTableConfig.maxAttempts(),
bigTableConfig.maxRetryDelay(),
bigTableConfig.initialRpcTimeout(),
bigTableConfig.initialRetryDelay());

if (bigTableConfig.enableTelemetry()) {
BigtableDataSettings.enableOpenCensusStats();
BigtableDataSettings.enableGfeOpenCensusStats();
}

LOGGER.info("Creating Google BigTable data client...");
BigtableDataClient dataClient = BigtableDataClient.create(dataSettings.build());
BigtableDataClient dataClient =
BigTableClientsFactory.createDataClient(projectId, bigTableConfig, credentialsProvider);

BigtableTableAdminClient tableAdminClient = null;
if (bigTableConfig.noTableAdminClient()) {
LOGGER.info("Google BigTable table admin client creation disabled.");
} else {

BigtableTableAdminSettings.Builder adminSettings =
bigTableConfig.emulatorHost().isPresent()
? BigtableTableAdminSettings.newBuilderForEmulator(
bigTableConfig.emulatorHost().get(), bigTableConfig.emulatorPort())
.setCredentialsProvider(NoCredentialsProvider.create())
: BigtableTableAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider);
adminSettings.setProjectId(projectId).setInstanceId(bigTableConfig.instanceId());
bigTableConfig.mtlsEndpoint().ifPresent(adminSettings.stubSettings()::setMtlsEndpoint);
bigTableConfig.quotaProjectId().ifPresent(adminSettings.stubSettings()::setQuotaProjectId);
bigTableConfig.endpoint().ifPresent(adminSettings.stubSettings()::setEndpoint);

LOGGER.info("Creating Google BigTable table admin client...");
tableAdminClient = BigtableTableAdminClient.create(adminSettings.build());
tableAdminClient =
BigTableClientsFactory.createTableAdminClient(
projectId, bigTableConfig, credentialsProvider);

// Check whether the admin client actually works (Google cloud API access could be
// disabled). If not, we cannot even check whether tables need to be created, if necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.projectnessie.versioned.storage.bigtable;

import static org.projectnessie.versioned.storage.bigtable.BigTableBackendFactory.configureDataClient;
import static org.projectnessie.versioned.storage.bigtable.BigTableClientsFactory.applyCommonDataClientSettings;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
Expand All @@ -24,8 +24,6 @@
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Optional;
import java.util.OptionalInt;
import org.projectnessie.versioned.storage.common.persist.Backend;
import org.projectnessie.versioned.storage.testextension.BackendTestFactory;

Expand Down Expand Up @@ -61,14 +59,7 @@ BigtableDataClient buildNewDataClient() {
.setInstanceId(instanceId)
.setCredentialsProvider(NoCredentialsProvider.create());

configureDataClient(
settings,
Optional.empty(),
Optional.empty(),
OptionalInt.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
applyCommonDataClientSettings(settings);

return BigtableDataClient.create(settings.build());
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
*/
package org.projectnessie.versioned.storage.bigtable;

import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.projectnessie.versioned.storage.common.persist.Backend;
import org.projectnessie.versioned.storage.common.persist.BackendFactory;
Expand All @@ -44,6 +34,8 @@ public String name() {
@Nonnull
@jakarta.annotation.Nonnull
public BigTableBackendConfig newConfigInstance() {
// Note: this method should not be called and will throw because dataClient is not set.
// BigTableBackendConfig instances cannot be constructed using this method.
return BigTableBackendConfig.builder().build();
}

Expand All @@ -53,58 +45,4 @@ public BigTableBackendConfig newConfigInstance() {
public Backend buildBackend(@Nonnull @jakarta.annotation.Nonnull BigTableBackendConfig config) {
return new BigTableBackend(config, false);
}

public static void configureDataClient(
BigtableDataSettings.Builder settings,
Optional<ChannelPoolSettings> channelPoolSettings,
Optional<Duration> totalRpcTimeout,
OptionalInt maxAttempts,
Optional<Duration> maxRetryDelay,
Optional<Duration> initialRpcTimeout,
Optional<Duration> initialRetryDelay) {
EnhancedBigtableStubSettings.Builder stubSettings = settings.stubSettings();
for (RetrySettings.Builder retrySettings :
List.of(
stubSettings.readRowSettings().retrySettings(),
stubSettings.readRowsSettings().retrySettings(),
stubSettings.bulkReadRowsSettings().retrySettings(),
stubSettings.mutateRowSettings().retrySettings(),
stubSettings.bulkMutateRowsSettings().retrySettings(),
stubSettings.readChangeStreamSettings().retrySettings())) {
configureDuration(totalRpcTimeout, retrySettings::setTotalTimeout);
configureDuration(initialRpcTimeout, retrySettings::setInitialRpcTimeout);
configureDuration(initialRetryDelay, retrySettings::setInitialRetryDelay);
configureDuration(maxRetryDelay, retrySettings::setMaxRetryDelay);
maxAttempts.ifPresent(retrySettings::setMaxAttempts);
}

channelPoolSettings.ifPresent(
poolSettings -> {
InstantiatingGrpcChannelProvider transportChannelProvider =
(InstantiatingGrpcChannelProvider) stubSettings.getTransportChannelProvider();
InstantiatingGrpcChannelProvider.Builder transportChannelProviderBuilder =
transportChannelProvider.toBuilder();
stubSettings.setTransportChannelProvider(
transportChannelProviderBuilder.setChannelPoolSettings(poolSettings).build());
});

stubSettings
.bulkMutateRowsSettings()
.setBatchingSettings(
stubSettings.bulkMutateRowsSettings().getBatchingSettings().toBuilder()
.setElementCountThreshold((long) BigTableConstants.MAX_BULK_MUTATIONS)
.build());

stubSettings
.bulkReadRowsSettings()
.setBatchingSettings(
stubSettings.bulkReadRowsSettings().getBatchingSettings().toBuilder()
.setElementCountThreshold((long) BigTableConstants.MAX_BULK_READS)
.build());
}

private static void configureDuration(
Optional<Duration> config, Consumer<org.threeten.bp.Duration> configurable) {
config.map(Duration::toMillis).map(org.threeten.bp.Duration::ofMillis).ifPresent(configurable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.versioned.storage.bigtable;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

/**
* Settings used to create and configure BigTable clients (data and table admin).
*
* @see BigTableClientsFactory
*/
public interface BigTableClientsConfig {

default String instanceId() {
return "nessie";
}

Optional<String> appProfileId();

Optional<String> quotaProjectId();

Optional<String> endpoint();

Optional<String> mtlsEndpoint();

Optional<String> emulatorHost();

default int emulatorPort() {
return 8086;
}

Map<String, String> jwtAudienceMapping();

Optional<Duration> maxRetryDelay();

OptionalInt maxAttempts();

Optional<Duration> initialRpcTimeout();

Optional<Duration> totalTimeout();

Optional<Duration> initialRetryDelay();

OptionalInt minChannelCount();

OptionalInt maxChannelCount();

OptionalInt initialChannelCount();

OptionalInt minRpcsPerChannel();

OptionalInt maxRpcsPerChannel();

default boolean enableTelemetry() {
return true;
}
}
Loading

0 comments on commit 68a7d63

Please sign in to comment.