-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller] Client side foundations and refactor to support gRPC transport for controller APIs #1259
base: main
Are you sure you want to change the base?
[controller] Client side foundations and refactor to support gRPC transport for controller APIs #1259
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package com.linkedin.venice.controller.converters; | ||
|
||
import static com.linkedin.venice.controllerapi.ControllerApiConstants.*; | ||
|
||
import com.linkedin.venice.controller.requests.ControllerHttpRequest; | ||
import com.linkedin.venice.controller.requests.CreateStoreRequest; | ||
import com.linkedin.venice.controllerapi.NewStoreResponse; | ||
import com.linkedin.venice.controllerapi.QueryParams; | ||
import com.linkedin.venice.protocols.CreateStoreGrpcRequest; | ||
import com.linkedin.venice.protocols.CreateStoreGrpcResponse; | ||
import java.util.Optional; | ||
|
||
|
||
/** | ||
* A utility class to hold all the conversion logic from {@link com.linkedin.venice.controller.requests.ControllerRequest} | ||
* to transport specific request and transport specific responses back to {@link com.linkedin.venice.controllerapi.ControllerResponse}. | ||
* The utility methods are registered with {@link GrpcConvertersRegistry} and {@link HttpConvertersRegistry} for the | ||
* runtime to perform appropriate conversions. Refer to the documentation of the above converter registries for more | ||
* information. | ||
*/ | ||
public final class ConverterUtil { | ||
|
||
// store related converters | ||
public static CreateStoreGrpcRequest convertCreateStoreToGrpcRequest(CreateStoreRequest request) { | ||
CreateStoreGrpcRequest.Builder builder = CreateStoreGrpcRequest.newBuilder() | ||
.setStoreName(request.getStoreName()) | ||
.setKeySchema(request.getKeySchema()) | ||
.setValueSchema(request.getValueSchema()) | ||
.setIsSystemStore(request.isSystemStore()); | ||
|
||
Optional.ofNullable(request.getOwner()).ifPresent(builder::setOwner); | ||
Optional.ofNullable(request.getAccessPermissions()).ifPresent(builder::setAccessPermission); | ||
|
||
return builder.build(); | ||
} | ||
|
||
public static ControllerHttpRequest convertCreateStoreToHttpRequest(CreateStoreRequest request) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And similarly, the function to convert into a |
||
QueryParams params = new QueryParams(); | ||
|
||
params.add(NAME, request.getStoreName()) | ||
.add(OWNER, request.getOwner()) | ||
.add(KEY_SCHEMA, request.getKeySchema()) | ||
.add(VALUE_SCHEMA, request.getValueSchema()) | ||
.add(IS_SYSTEM_STORE, request.isSystemStore()) | ||
.add(CLUSTER, request.getClusterName()); | ||
|
||
Optional.ofNullable(request.getAccessPermissions()).ifPresent(perm -> params.add(ACCESS_PERMISSION, perm)); | ||
|
||
return ControllerHttpRequest.newBuilder().setParam(params).build(); | ||
} | ||
|
||
public static NewStoreResponse convertCreateStoreGrpcResponse(CreateStoreGrpcResponse grpcResponse) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And similarly to the idea I mentioned in my last review... can this become a function of |
||
NewStoreResponse response = new NewStoreResponse(); | ||
response.setOwner(grpcResponse.getOwner()); | ||
return response; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package com.linkedin.venice.controller.converters; | ||
|
||
import com.linkedin.venice.controller.requests.ControllerRequest; | ||
import com.linkedin.venice.controller.requests.CreateStoreRequest; | ||
import com.linkedin.venice.controllerapi.ControllerResponse; | ||
import com.linkedin.venice.protocols.CreateStoreGrpcResponse; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
|
||
/** | ||
* A registry to hold all the converters for {@link ControllerRequest} and {@link ControllerResponse} to and back | ||
* to gRPC specific request and responses. New addition of converters need to be registered below under the | ||
* {@link #initialize()}. e.g., | ||
* <pre> | ||
* // Register sample request converter | ||
* registerRequestConverter(NewApiRequest.class, ConverterUtil::convertNewApiToGrpcRequest) | ||
* | ||
* // Register sample response converter | ||
* registerResponseConverter(NewApiResponse.class, ConverterUtil::converterNewApiGrpcResponse) | ||
* </pre> | ||
*/ | ||
public class GrpcConvertersRegistry { | ||
private final Map<Class<?>, RequestConverter<? extends ControllerRequest, ?>> requestRegistry = new HashMap<>(); | ||
private final Map<Class<?>, ResponseConverter<?, ? extends ControllerResponse>> responseRegistry = new HashMap<>(); | ||
|
||
public GrpcConvertersRegistry() { | ||
initialize(); | ||
} | ||
|
||
public void initialize() { | ||
registerRequestConverter(CreateStoreRequest.class, ConverterUtil::convertCreateStoreToGrpcRequest); | ||
registerResponseConverter(CreateStoreGrpcResponse.class, ConverterUtil::convertCreateStoreGrpcResponse); | ||
} | ||
|
||
private <T extends ControllerRequest, R> void registerRequestConverter( | ||
Class<T> requestType, | ||
RequestConverter<T, R> converter) { | ||
requestRegistry.computeIfAbsent(requestType, k -> converter); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public <T extends ControllerRequest, R> RequestConverter<T, R> getRequestConverter(Class<T> requestType) { | ||
return (RequestConverter<T, R>) requestRegistry.get(requestType); | ||
} | ||
|
||
private <T, R extends ControllerResponse> void registerResponseConverter( | ||
Class<T> requestType, | ||
ResponseConverter<T, R> converter) { | ||
responseRegistry.computeIfAbsent(requestType, k -> converter); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public <T, R extends ControllerResponse> ResponseConverter<T, R> getResponseConverter(Class<T> requestType) { | ||
return (ResponseConverter<T, R>) responseRegistry.get(requestType); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package com.linkedin.venice.controller.converters; | ||
|
||
import com.linkedin.venice.controller.requests.ControllerHttpRequest; | ||
import com.linkedin.venice.controller.requests.ControllerRequest; | ||
import com.linkedin.venice.controller.requests.CreateStoreRequest; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
|
||
/** | ||
* A registry to hold all the converters for {@link ControllerRequest} to Http specific request. | ||
* New addition of converters need to be registered below under the | ||
* {@link #initialize()}. e.g., | ||
* <pre> | ||
* // Register sample request converter | ||
* registerRequestConverter(NewApiRequest.class, ConverterUtil::convertNewApiToHttpRequest) | ||
* </pre> | ||
*/ | ||
public class HttpConvertersRegistry { | ||
private final Map<Class<?>, RequestConverter<? extends ControllerRequest, ControllerHttpRequest>> requestRegistry = | ||
new HashMap<>(); | ||
Comment on lines
+20
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our current response classes are in a much better shape due to its decoupling from the transport details. It was only our request that had a strong coupling the underlying transport to begin with and hence the need to introduce a request abstraction that is purely driven and interacted by the controller client. As a result, the current http flow already returns this response abstraction defined by the controller ( Additionally, I didn't want to refactor the deserialization handling piece within the Let me know if that answers your question. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I just don't understand why we need it for one but not the other...? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is because, we decided to use a separate response data model for gRPC and We decided as a team to follow gRPC convention and make the new request, response strongly typed. |
||
|
||
public HttpConvertersRegistry() { | ||
initialize(); | ||
} | ||
|
||
public void initialize() { | ||
registerRequestConverter(CreateStoreRequest.class, ConverterUtil::convertCreateStoreToHttpRequest); | ||
} | ||
|
||
private <T extends ControllerRequest> void registerRequestConverter( | ||
Class<T> requestType, | ||
RequestConverter<T, ControllerHttpRequest> converter) { | ||
requestRegistry.computeIfAbsent(requestType, k -> converter); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public <T extends ControllerRequest> RequestConverter<T, ControllerHttpRequest> getRequestConverter( | ||
Class<T> requestType) { | ||
return (RequestConverter<T, ControllerHttpRequest>) requestRegistry.get(requestType); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package com.linkedin.venice.controller.converters; | ||
|
||
import com.linkedin.venice.controller.requests.ControllerRequest; | ||
|
||
|
||
/** | ||
* A general interface to represent request converters that operates on {@Link T} to produce a transport specific | ||
* request {@link D} | ||
* @param <T> Source controller request type | ||
* @param <D> Transport specific request type | ||
Comment on lines
+9
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. T -> The controller request type (e.g., I did use some of elaborative naming for generic types in few places in the code where I saw no ambiguity. I thought a bit and then wasn't fully convinced if Given this converter interface is not a general converter interface rather specific to the controller requests, I found the above choices too generic and hence resorted write the parameter java document to provide some clarity. Do we have any convention on naming generic types within our code base? For e.g., Can we have something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah i see; |
||
*/ | ||
@FunctionalInterface | ||
public interface RequestConverter<T extends ControllerRequest, D> { | ||
D convert(T request); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.linkedin.venice.controller.converters; | ||
|
||
import com.linkedin.venice.controllerapi.ControllerResponse; | ||
|
||
|
||
/** | ||
* A general interface to represent response converters that operates on transport specific response {@Link T} to produce | ||
* a controller specific responesn {@link D} | ||
* @param <T> Source transport specific response type | ||
* @param <D> Controller response type | ||
mynameborat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
public interface ResponseConverter<T, D extends ControllerResponse> { | ||
D convert(T response); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package com.linkedin.venice.controller.requests; | ||
|
||
import com.linkedin.venice.controllerapi.QueryParams; | ||
import com.linkedin.venice.utils.Time; | ||
|
||
|
||
/** | ||
* A general class to represent Http specific transport request that embodies all the Http requests | ||
* send to the controller as part of controller APIs. This is a container class defined to help with the | ||
* refactoring of ControllerClient to become transport protocol agnostic. | ||
*/ | ||
public class ControllerHttpRequest { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the relationship between this class and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Originally, I thought about having some coupling between
However, the above spills some of the transport details into the model and hence chose against it; |
||
private static final int DEFAULT_REQUEST_TIMEOUT_MS = 600 * Time.MS_PER_SECOND; | ||
private static final int DEFAULT_MAX_ATTEMPTS = 10; | ||
private final QueryParams params; | ||
private final byte[] data; | ||
|
||
private final int timeoutMs; | ||
|
||
private final int maxRetries; | ||
|
||
public ControllerHttpRequest(QueryParams params, byte[] data, int timeoutMs, int maxRetries) { | ||
this.params = params; | ||
this.data = data; | ||
this.timeoutMs = timeoutMs; | ||
this.maxRetries = maxRetries; | ||
} | ||
|
||
public QueryParams getParams() { | ||
return params; | ||
} | ||
|
||
public byte[] getData() { | ||
return data; | ||
} | ||
|
||
public int getTimeoutMs() { | ||
return timeoutMs; | ||
} | ||
|
||
public int getMaxRetries() { | ||
return maxRetries; | ||
} | ||
|
||
public static Builder newBuilder() { | ||
return new Builder(); | ||
} | ||
|
||
public static class Builder { | ||
private QueryParams params; | ||
private byte[] data; | ||
private int timeoutMs = DEFAULT_REQUEST_TIMEOUT_MS; | ||
|
||
private int maxRetries = DEFAULT_MAX_ATTEMPTS; | ||
|
||
public Builder setParam(QueryParams params) { | ||
this.params = params; | ||
return this; | ||
} | ||
|
||
public Builder setData(byte[] data) { | ||
this.data = data; | ||
return this; | ||
} | ||
|
||
public Builder setTimeoutMs(int timeoutMs) { | ||
this.timeoutMs = timeoutMs; | ||
return this; | ||
} | ||
|
||
public Builder setMaxRetries(int retries) { | ||
this.maxRetries = retries; | ||
return this; | ||
} | ||
|
||
public ControllerHttpRequest build() { | ||
return new ControllerHttpRequest(params, data, timeoutMs, maxRetries); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package com.linkedin.venice.controller.requests; | ||
|
||
import com.linkedin.venice.controllerapi.ControllerRoute; | ||
|
||
|
||
/** | ||
* A data model to represent controller requests that are exposed through CLIs and Rest Spec to end users. Authors | ||
* of new controller APIs will need to extend this class and plug in appropropriate fields needed for the new API. | ||
* | ||
* Additionally, authors need to define converters for the new APIs to be able to work with newer transport agnostic | ||
* request model. Refer to {@link com.linkedin.venice.controller.converters.GrpcConvertersRegistry} and | ||
* {@link com.linkedin.venice.controller.converters.HttpConvertersRegistry} to register converters for new request types | ||
* | ||
* For routing, the newer APIs need to define routes in {@link ControllerRoute} and | ||
* {@link com.linkedin.venice.controller.transport.GrpcRoute} for the {@link com.linkedin.venice.controllerapi.ControllerClient} | ||
* to dispatch requests in transport agnostic model. | ||
*/ | ||
public abstract class ControllerRequest { | ||
private static final long DEFAULT_TIMEOUT_MS = 30000L; // 10 seconds | ||
private static final int DEFAULT_MAX_RETRIES = 5; | ||
private final ControllerRoute route; | ||
private final String clusterName; | ||
|
||
private final long timeoutMs; | ||
|
||
private final int maxRetries; | ||
|
||
public ControllerRequest(String clusterName, ControllerRoute route) { | ||
this(clusterName, route, DEFAULT_TIMEOUT_MS, DEFAULT_MAX_RETRIES); | ||
} | ||
|
||
public ControllerRequest(String clusterName, ControllerRoute route, long timeoutMs, int maxRetries) { | ||
this.clusterName = clusterName; | ||
this.route = route; | ||
this.timeoutMs = timeoutMs; | ||
this.maxRetries = maxRetries; | ||
} | ||
|
||
public String getClusterName() { | ||
return clusterName; | ||
} | ||
|
||
public ControllerRoute getRoute() { | ||
return route; | ||
} | ||
|
||
public long getTimeoutMs() { | ||
return timeoutMs; | ||
} | ||
|
||
public int getMaxRetries() { | ||
return maxRetries; | ||
} | ||
|
||
static abstract class Builder<T extends Builder<?>> { | ||
String clusterName; | ||
|
||
public T setClusterName(String clusterName) { | ||
this.clusterName = clusterName; | ||
return getThis(); | ||
} | ||
|
||
abstract T getThis(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this code be a function of
CreateStoreRequest
? Perhaps the parent classControllerRequest
can becomeControllerRequest<GRPC>
and define apublic abstract GRPC getGRPC()
API. ThenCreateStoreRequest extends ControllerRequest<CreateStoreGrpcRequest>
? Would that eliminate the need for the whole registry?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes back to my earlier comment - #1259 (comment)
Controller models (responses and requests) taking dependencies on the underlying transport through methods like
toGrpc
andfromGrpc
would make the consumers of these models exposed to some underlying details (although we could control the access modifier) this also potential brings in the dependencies of the underlying transport dependencies for consumers of models.Hence the choice to invert the dependency and have these concrete transport implementations depend on the models of controller abstraction and the conversion residing outside these models.