Skip to content

Commit

Permalink
Refactored RPC Routing for grpc implementation (#447)
Browse files Browse the repository at this point in the history
* Refactored

* Fixed the refactoring issues in the code

* Fixed import

* Added unit tests

* Addressed comments

* Addressed comments

* Cleaned code

* Changed to constructor

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Oct 11, 2024
1 parent ecfff54 commit a4f22cc
Show file tree
Hide file tree
Showing 16 changed files with 245 additions and 233 deletions.
43 changes: 22 additions & 21 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
Expand All @@ -25,8 +24,10 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -182,7 +183,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected UrnPathExtractor<URN> _urnPathExtractor;

private LambdaFunctionRegistry _lambdaFunctionRegistry;
private RestliPreUpdateAspectRegistry _restliPreUpdateAspectRegistry = null;
private PreUpdateAspectRegistry _preUpdateAspectRegistry = null;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -402,16 +403,16 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
/**
* Set pre ingestion aspect registry.
*/
public void setRestliPreUpdateAspectRegistry(
@Nullable RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
_restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry;
public void setPreUpdateAspectRegistry(
@Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) {
_preUpdateAspectRegistry = preUpdateAspectRegistry;
}

/**
* Get pre ingestion aspect registry.
*/
public RestliPreUpdateAspectRegistry getRestliPreUpdateAspectRegistry() {
return _restliPreUpdateAspectRegistry;
public PreUpdateAspectRegistry getPreUpdateAspectRegistry() {
return _preUpdateAspectRegistry;
}


Expand Down Expand Up @@ -1659,20 +1660,20 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* @param urn the urn of the asset
* @param newValue the new aspect value
* @param newAspect the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newValue) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
newValue.getClass())) {
RestliCompliantPreUpdateRoutingClient client =
_restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(newValue);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(newValue));
RecordTemplate convertedAspect = client.convertAspectToRecordTemplate(updatedAspect);
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, convertedAspect);
return (ASPECT) convertedAspect;
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newAspect) {
if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered(
newAspect.getClass())) {
PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass());
PreUpdateRoutingClient client =
preUpdateRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return (ASPECT) updatedAspect;
}
return newValue;
return newAspect;
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their PreUpdateRoutingClient.
*/
@Slf4j
public class PreUpdateAspectRegistry {

private final Map<Class<? extends RecordTemplate>, PreUpdateRoutingAccessor> _preUpdateLambdaMap;

/**
* Constructor to register pre-update routing accessors for multiple aspects at once.
* @param preUpdateMap map containing aspect classes and their corresponding accessors
*/
public PreUpdateAspectRegistry(@Nonnull Map<Class<? extends RecordTemplate>, PreUpdateRoutingAccessor> preUpdateMap) {
_preUpdateLambdaMap = new HashMap<>(preUpdateMap);
log.info("Registered pre-update routing accessors for aspects: {}", _preUpdateLambdaMap.keySet());
}

/**
* Get Pre Update Routing Accessor for an aspect class.
* @param aspectClass the class of the aspect to retrieve the accessor for
* @return PreUpdateRoutingAccessor for the given aspect class, or null if not found
*/
public <ASPECT extends RecordTemplate> PreUpdateRoutingAccessor getPreUpdateRoutingAccessor(
@Nonnull Class<ASPECT> aspectClass) {
return _preUpdateLambdaMap.get(aspectClass);
}

/**
* Check if Pre Update Routing Accessor is registered for an aspect.
*/
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return _preUpdateLambdaMap.containsKey(aspectClass);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import lombok.Data;

/**
* Response of pre-update process that includes the updated aspect.
*/
@Data
public class PreUpdateResponse<ASPECT extends RecordTemplate> {
private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import lombok.Data;


@Data
public class PreUpdateRoutingAccessor {

public PreUpdateRoutingClient<? extends RecordTemplate> preUpdateClient;

public enum RoutingAction {
PROCEED, SKIP
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;


/**
* An interface that defines methods to route update requests to the appropriate custom APIs for pre-ingestion process.
*/

public interface PreUpdateRoutingClient<ASPECT extends Message> {
public interface PreUpdateRoutingClient<ASPECT extends RecordTemplate> {
/**
* A method that routes the update request to the appropriate custom API.
* @param urn the urn of the asset
* @param aspect the aspect to be updated
* @return the updated aspect
*/
ASPECT routingLambda(Message urn, ASPECT aspect);
PreUpdateResponse<ASPECT> preUpdate(Urn urn, ASPECT aspect);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl;
import com.linkedin.metadata.dao.ingestion.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.metadata.dao.ingestion.SamplePreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
Expand All @@ -31,6 +33,7 @@
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -658,7 +661,16 @@ public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());

PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor();
preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());

Map<Class<? extends RecordTemplate>, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>();
preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor);

PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap);
_dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry);

AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo);
assertEquals(result, bar);
}
Expand All @@ -669,7 +681,15 @@ public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException {
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setAlwaysEmitAuditEvent(true);
_dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor();
preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());

Map<Class<? extends RecordTemplate>, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>();
preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor);

PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap);

_dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry);
expectGetLatest(urn, AspectFoo.class,
Arrays.asList(makeAspectEntry(null, null), makeAspectEntry(foo, _dummyAuditStamp)));

Expand All @@ -687,7 +707,14 @@ public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxExcepti
AspectBar foo = new AspectBar().setValue("foo");

// Inject RestliPreIngestionAspectRegistry with no registered aspect
_dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor();
preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());

Map<Class<? extends RecordTemplate>, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>();
preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor);

PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap);
_dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry);

// Call the add method
AspectBar result = _dummyLocalDAO.preUpdateRouting(urn, foo);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,40 +1,18 @@
package com.linkedin.metadata.dao.ingestion;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
import com.linkedin.testing.AspectFoo;


public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient {
@Override
public Message routingLambda(Message urn, Message aspect) {
// For testing, change the aspect value to "bar"
return Any.pack(StringValue.of("bar"));
}

@Override
public Message convertUrnToMessage(Urn urn) {
// Directly wrap the URN string into a Protobuf message for testing
return Any.pack(StringValue.of(urn.toString()));
}

@Override
public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
// For testing, convert AspectFoo to a TestMessageProtos.AspectMessage
// Assuming the aspect has a `value` field and its string representation can be used for now
String aspectString = pegasusAspect.toString(); // Extracting the aspect as a string (e.g., {value=foo})

// Wrap the aspect string into a simple Protobuf message for testing
return Any.pack(StringValue.of(aspectString));
}
public class SamplePreUpdateRoutingClient implements PreUpdateRoutingClient {

@Override
public RecordTemplate convertAspectToRecordTemplate(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
public PreUpdateResponse preUpdate(Urn urn, RecordTemplate recordTemplate) {
AspectFoo aspectFoo = (AspectFoo) recordTemplate;
aspectFoo.setValue("bar");
return new PreUpdateResponse(aspectFoo);
}
}
Loading

0 comments on commit a4f22cc

Please sign in to comment.