Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-a-thon] [Not-For-Review] Client bindings and request flows for controller APIs createStore, getStoresInCluster and QueryJobStatus #1250

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceHttpException;
import com.linkedin.venice.grpc.ControllerGrpcTransport;
import com.linkedin.venice.grpc.GrpcControllerRoute;
import com.linkedin.venice.grpc.GrpcConverters;
import com.linkedin.venice.helix.VeniceJsonSerializer;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
Expand All @@ -98,6 +101,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -121,6 +125,8 @@ public class ControllerClient implements Closeable {
private String leaderControllerUrl;
private final List<String> controllerDiscoveryUrls;

private final boolean useGrpc;

public ControllerClient(String clusterName, String discoveryUrls) {
this(clusterName, discoveryUrls, Optional.empty());
}
Expand All @@ -140,6 +146,7 @@ public ControllerClient(String clusterName, String discoveryUrls, Optional<SSLFa
if (this.controllerDiscoveryUrls.isEmpty()) {
throw new VeniceException("Controller discovery url list is empty");
}
this.useGrpc = false;
}

/**
Expand Down Expand Up @@ -1421,6 +1428,41 @@ private <T extends ControllerResponse> T request(
int timeoutMs,
int maxAttempts,
byte[] data) {
if (useGrpc) {
return fireRequestUsingGrpc(route, params, responseType, timeoutMs, maxAttempts, data);
} else {
return fireRequestUsingHttp(route, params, responseType, timeoutMs, maxAttempts, data);
}
}

private <T extends ControllerResponse> T fireRequestUsingGrpc(
ControllerRoute route,
QueryParams params,
Class<T> responseType,
int timeoutMs,
int maxAttempts,
byte[] data) {
GrpcControllerRoute grpcControllerRoute = GrpcConverters.mapControllerRouteToGrpcControllerRoute(route);
try (ControllerGrpcTransport transport = new ControllerGrpcTransport(sslFactory)) {
try {
CompletionStage<T> responseFuture =
transport.request(getLeaderControllerUrl(), params, responseType, grpcControllerRoute);
return responseFuture.toCompletableFuture().join();
} catch (Exception e) {
throw new VeniceException("Encountered error when getting response from server", e);
}
} catch (Exception e) {
throw new VeniceException("Encountered error when fetching response from grpc server", e);
}
}

private <T extends ControllerResponse> T fireRequestUsingHttp(
ControllerRoute route,
QueryParams params,
Class<T> responseType,
int timeoutMs,
int maxAttempts,
byte[] data) {
Exception lastException = null;
boolean logErrorMessage = true;
try (ControllerTransport transport = new ControllerTransport(sslFactory)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.linkedin.venice.grpc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controllerapi.QueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.protocols.CreateStoreGrpcRequest;
import com.linkedin.venice.protocols.GetStoresInClusterGrpcRequest;
import com.linkedin.venice.protocols.QueryJobStatusGrpcRequest;
import com.linkedin.venice.protocols.VeniceControllerGrpcServiceGrpc;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;


public class ControllerGrpcTransport implements AutoCloseable {
private static final int PORT = 1234;
private static final String GRPC_ADDRESS_FORMAT = "%s:%s";
private final VeniceConcurrentHashMap<String, ManagedChannel> serverGrpcChannels;
private final ChannelCredentials channelCredentials;
private final VeniceConcurrentHashMap<ManagedChannel, VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceStub> stubCache;

public ControllerGrpcTransport(Optional<SSLFactory> sslFactory) {
this.stubCache = new VeniceConcurrentHashMap<>();
this.serverGrpcChannels = new VeniceConcurrentHashMap<>();
this.channelCredentials = buildChannelCredentials(sslFactory);
}

public <ResT> CompletionStage<ResT> request(
String serverUrl,
QueryParams params,
Class<ResT> responseType,
GrpcControllerRoute route) {

VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceStub stub = getOrCreateStub(serverUrl);
CompletableFuture<ResT> valueFuture = new CompletableFuture<>();

if (GrpcControllerRoute.CREATE_STORE.equals(route)) {
stub.createStore(
(CreateStoreGrpcRequest) GrpcConverters.getRequestConverter(route.getRequestType()).convert(params),
buildStreamObserver(valueFuture, responseType, route));
} else if (GrpcControllerRoute.GET_STORES_IN_CLUSTER.equals(route)) {
stub.getStoresInCluster(
(GetStoresInClusterGrpcRequest) GrpcConverters.getRequestConverter(route.getRequestType()).convert(params),
buildStreamObserver(valueFuture, responseType, route));
} else if (GrpcControllerRoute.QUERY_JOB_STATUS.equals(route)) {
stub.getJobStatus(
(QueryJobStatusGrpcRequest) GrpcConverters.getRequestConverter(route.getRequestType()).convert(params),
buildStreamObserver(valueFuture, responseType, route));
} else {
throw new VeniceException("Unknown gRPC route; Failing the request");
}

return valueFuture;
}

@VisibleForTesting
<T, ResT> ControllerGrpcObserver<T, ResT> buildStreamObserver(
CompletableFuture<ResT> future,
Class<ResT> httpResponseType,
GrpcControllerRoute route) {
return new ControllerGrpcObserver<>(future, httpResponseType, route);
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ManagedChannel> entry: serverGrpcChannels.entrySet()) {
entry.getValue().shutdown();
}
}

@VisibleForTesting
ChannelCredentials buildChannelCredentials(Optional<SSLFactory> sslFactory) {
SSLFactory factory = sslFactory.orElse(null);

if (factory == null) {
return InsecureChannelCredentials.create();
}

try {
TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder()
.keyManager(GrpcUtils.getKeyManagers(factory))
.trustManager(GrpcUtils.getTrustManagers(factory));
return tlsBuilder.build();
} catch (Exception e) {
throw new VeniceClientException(
"Failed to initialize SSL channel credentials for Venice gRPC Transport Client",
e);
}
}

VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceStub getOrCreateStub(String serverAddress) {
String grpcAddress = getGrpcAddressFromServerAddress(serverAddress);

ManagedChannel channel = serverGrpcChannels
.computeIfAbsent(serverAddress, k -> Grpc.newChannelBuilder(grpcAddress, channelCredentials).build());

return stubCache.computeIfAbsent(channel, VeniceControllerGrpcServiceGrpc::newStub);
}

@VisibleForTesting
String getGrpcAddressFromServerAddress(String serverAddress) {
String[] serverAddressParts = serverAddress.split(":");
Preconditions.checkState(serverAddressParts.length == 2, "Invalid server address");

return String.format(GRPC_ADDRESS_FORMAT, serverAddressParts[0], PORT);
}

static class ControllerGrpcObserver<ResT, HttpResT> implements StreamObserver<ResT> {
private final CompletableFuture<HttpResT> responseFuture;
private final Class<HttpResT> httpResponseType;

private final GrpcControllerRoute route;

public ControllerGrpcObserver(
CompletableFuture<HttpResT> future,
Class<HttpResT> httpResponseType,
GrpcControllerRoute route) {
this.httpResponseType = httpResponseType;
this.responseFuture = future;
this.route = route;
}

@Override
public void onNext(ResT value) {
if (!responseFuture.isDone()) {

@SuppressWarnings("Unchecked")
HttpResT result = ((GrpcToHttpResponseConverter<ResT, HttpResT>) GrpcConverters
.getResponseConverter(route.getResponseType(), httpResponseType)).convert(value);

responseFuture.complete(result);
}
}

@Override
public void onError(Throwable t) {
responseFuture.completeExceptionally(t);
}

@Override
public void onCompleted() {
// do nothing
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.venice.grpc;

import com.google.protobuf.GeneratedMessageV3;
import com.linkedin.venice.protocols.CreateStoreGrpcRequest;
import com.linkedin.venice.protocols.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.GetStoresInClusterGrpcRequest;
import com.linkedin.venice.protocols.GetStoresInClusterGrpcResponse;
import com.linkedin.venice.protocols.QueryJobStatusGrpcRequest;
import com.linkedin.venice.protocols.QueryJobStatusGrpcResponse;


public enum GrpcControllerRoute {
CREATE_STORE(CreateStoreGrpcRequest.class, CreateStoreGrpcResponse.class),
GET_STORES_IN_CLUSTER(GetStoresInClusterGrpcRequest.class, GetStoresInClusterGrpcResponse.class),
QUERY_JOB_STATUS(QueryJobStatusGrpcRequest.class, QueryJobStatusGrpcResponse.class);

private final Class<? extends GeneratedMessageV3> requestType;
private final Class<? extends GeneratedMessageV3> responseType;

GrpcControllerRoute(Class<? extends GeneratedMessageV3> reqT, Class<? extends GeneratedMessageV3> resT) {
this.requestType = reqT;
this.responseType = resT;
}

public Class<? extends GeneratedMessageV3> getRequestType() {
return this.requestType;
}

public Class<? extends GeneratedMessageV3> getResponseType() {
return this.responseType;
}
}
Loading
Loading