From d4a2261524021b04107661c1e228642584701933 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Mon, 21 Oct 2024 12:36:48 -0700 Subject: [PATCH] add ingestion params into CallbackRoutingClientAPI (#453) Co-authored-by: Zihan Li --- .../com/linkedin/metadata/dao/BaseLocalDAO.java | 11 +++++++---- .../dao/ingestion/AspectCallbackRoutingClient.java | 13 +++++++++++++ .../com/linkedin/metadata/dao/BaseLocalDAOTest.java | 4 ++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 5fe1a3463..6438a40dc 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -643,7 +643,7 @@ private AddResult aspectUpdateHelper(URN } // this will skip the pre/in update callbacks if (!isRawUpdate) { - AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue); + AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue, updateTuple.getIngestionParams()); newValue = (ASPECT) result.getUpdatedAspect(); // skip the normal ingestion to the DAO if (result.isSkipProcessing()) { @@ -901,7 +901,8 @@ public ASPECT add(@Nonnull URN urn, @Nonnull ASP /** * Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context. - * Note: If you update the lambda function (ignored - newValue), make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional)} as well + * Note: If you update the lambda function (ignored - newValue), + * make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional, IngestionParams)}as well * to avoid any inconsistency between the lambda function and the add method. */ @Nonnull @@ -1718,13 +1719,15 @@ protected ASPECT updatePreIngestionLambdas(@Nonn * @param urn the urn of the asset * @param newAspectValue the new aspect value * @param oldAspectValue the old aspect value + * @param ingestionParams the ingestionparams of the current update * @return AspectUpdateResult which contains updated aspect value */ - protected AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue, Optional oldAspectValue) { + protected AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue, + Optional oldAspectValue, IngestionParams ingestionParams) { if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered( newAspectValue.getClass())) { AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass()); - AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue); + AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue, ingestionParams); ASPECT updatedAspect = (ASPECT) aspectCallbackResponse.getUpdatedAspect(); log.info("Aspect callback routing completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect); return new AspectUpdateResult(updatedAspect, client.isSkipProcessing()); diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java index ba0cc9a08..5da7999ce 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java @@ -2,6 +2,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.internal.IngestionParams; import java.util.Optional; @@ -18,6 +19,18 @@ public interface AspectCallbackRoutingClient { */ AspectCallbackResponse routeAspectCallback(Urn urn, ASPECT newAspectValue, Optional existingAspectValue); + /** + * A method that routes the updates request to the appropriate custom API. + * @param urn the urn of the asset + * @param newAspectValue the aspect to be updated + * @param existingAspectValue the existing aspect value + * @param ingestionParams the ingestionParams of current update + * @return AspectCallbackResponse containing the updated aspect + */ + default AspectCallbackResponse routeAspectCallback(Urn urn, ASPECT newAspectValue, + Optional existingAspectValue, IngestionParams ingestionParams) { + return routeAspectCallback(urn, newAspectValue, existingAspectValue); + } /** * A method that returns whether to skip processing further ingestion. * @return true if the ingestion should be skipped, false otherwise diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index a8e371374..99caea11d 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -667,7 +667,7 @@ public void testAspectCallbackHelperFromFooToBar() throws URISyntaxException { AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap); _dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry); - BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, null); + BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, Optional.empty(), null); AspectFoo newAspect = (AspectFoo) result.getUpdatedAspect(); assertEquals(newAspect, bar); } @@ -705,7 +705,7 @@ public void testAspectCallbackHelperWithUnregisteredAspect() throws URISyntaxExc _dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry); // Call the add method - BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, null); + BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, Optional.empty(), null); // Verify that the result is the same as the input aspect since it's not registered assertEquals(result.getUpdatedAspect(), foo);