From 0d7727e1e1bb31cfc89ab641b472eeb9809ad2e6 Mon Sep 17 00:00:00 2001 From: Justin Donn Date: Thu, 19 Sep 2024 15:09:39 -0700 Subject: [PATCH] feat(dual-write): add new ingestSkipPreUpdates resource endpoints (#430) * feat(dual-write): add new ingestSkipPreIngestionUpdates resource endpoints * refactor ingest* methods to address comments * add javadocs for new method * add javadocs for new method typo * update method names to rawIngest and rawAdd * update some comments --- .../linkedin/metadata/dao/BaseLocalDAO.java | 4 +- .../restli/BaseAspectRoutingResource.java | 192 +++++++++++------- .../metadata/restli/BaseEntityResource.java | 94 ++++++++- .../metadata/restli/RestliConstants.java | 2 + .../restli/BaseAspectRoutingResourceTest.java | 16 +- .../restli/BaseEntityResourceTest.java | 18 ++ 6 files changed, 246 insertions(+), 80 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 10024409b..3988e6f5c 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -853,7 +853,7 @@ public ASPECT add(@Nonnull URN urn, @Nonnull ASP @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, @Nullable IngestionParams ingestionParams) { ASPECT updatedAspect = preUpdateRouting(urn, newValue); - return addSkipPreIngestionUpdates(urn, updatedAspect, auditStamp, trackingContext, ingestionParams); + return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams); } /** @@ -862,7 +862,7 @@ public ASPECT add(@Nonnull URN urn, @Nonnull ASP * Please use the regular add method linked above. */ @Nonnull - public ASPECT addSkipPreIngestionUpdates(@Nonnull URN urn, @Nonnull ASPECT newValue, + public ASPECT rawAdd(@Nonnull URN urn, @Nonnull ASPECT newValue, @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, @Nullable IngestionParams ingestionParams) { final IngestionParams nonNullIngestionParams = diff --git a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java index a5a298ebf..18f814536 100644 --- a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java +++ b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java @@ -299,6 +299,15 @@ private Task backfillWithNewValue(@ActionParam(PARAM_URNS) @Nonn }); } + /** + * Internal ingest method for snapshots. First execute any pre-ingestion updates. Then, route any aspects which have a registered routing + * GMS client to the respective GMS for ingestion. Finally, continue to save the aspect locally as well (i.e. dual write) + * @param snapshot snapshot to process + * @param aspectsToIgnore aspects to ignore + * @param trackingContext context for tracking ingestion health + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ @Nonnull @Override protected Task ingestInternal(@Nonnull SNAPSHOT snapshot, @@ -308,46 +317,44 @@ protected Task ingestInternal(@Nonnull SNAPSHOT snapshot, return RestliUtils.toTask(() -> { final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot); final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext()); - ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> { - if (!aspectsToIgnore.contains(aspect.getClass())) { - if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) { - try { - // get the updated aspect if there is a preupdate routing lambda registered - RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry(); - if (registry != null && registry.isRegistered(aspect.getClass())) { - log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass())); - aspect = preUpdateRouting(urn, aspect, registry); - log.info("PreUpdateRouting completed in ingestInternal, urn: {}, updated aspect: {}", urn, aspect); - // Get the fqcn of the aspect class - String aspectFQCN = aspect.getClass().getCanonicalName(); - //TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting - if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) { - log.info("Skip ingestion in ingestInternal for urn: {}, aspectFQCN: {}", urn, aspectFQCN); - return; - } - } - if (trackingContext != null) { - getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingestWithTracking(urn, aspect, trackingContext, ingestionParams); - } else { - getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect); - } - // since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add - // which skips pre-update lambdas. - getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams); - } catch (Exception exception) { - log.error( - String.format("Couldn't ingest routing aspect %s for %s", aspect.getClass().getSimpleName(), urn), - exception); - } - } else { - getLocalDAO().add(urn, aspect, auditStamp, trackingContext, ingestionParams); - } - } - }); + ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> + ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, false)); + return null; + }); + } + + /** + * Raw internal ingest method for snapshots which skips any pre-, intra-, or post-processing. Route any aspects which + * have a registered routing GMS client to the respective GMS for ingestion. Finally, continue to save the aspect + * locally as well (i.e. dual write) + * @param snapshot snapshot to process + * @param aspectsToIgnore aspects to ignore + * @param trackingContext context for tracking ingestion health + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ + @Nonnull + protected Task rawIngestInternal(@Nonnull SNAPSHOT snapshot, + @Nonnull Set> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext, + @Nullable IngestionParams ingestionParams) { + // TODO: META-18950: add trackingContext to BaseAspectRoutingResource. currently the param is unused. + return RestliUtils.toTask(() -> { + final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot); + final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext()); + ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> + ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, true)); return null; }); } + /** + * Internal ingest method for assets. First execute any pre-ingestion updates. Then, route any aspects which have a registered routing + * GMS client to the respective GMS for ingestion. Finally, continue to save the aspect locally as well (i.e. dual write) + * @param asset asset to process + * @param aspectsToIgnore aspects to ignore + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ @Nonnull @Override protected Task ingestInternalAsset(@Nonnull ASSET asset, @@ -359,43 +366,90 @@ protected Task ingestInternalAsset(@Nonnull ASSET asset, final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext()); IngestionTrackingContext trackingContext = ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null; - ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> { - if (!aspectsToIgnore.contains(aspect.getClass())) { - if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) { - try { - // get the updated aspect if there is a preupdate routing lambda registered - RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry(); - if (registry != null && registry.isRegistered(aspect.getClass())) { - log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass())); - aspect = preUpdateRouting(urn, aspect, registry); - log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect); - // Get the fqcn of the aspect class - String aspectFQCN = aspect.getClass().getCanonicalName(); - //TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting - if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) { - log.info("Skip ingestion in ingestInternalAsset for urn: {}, aspectFQCN: {}", urn, aspectFQCN); - return; - } - } - if (trackingContext != null) { - getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()) - .ingestWithTracking(urn, aspect, trackingContext, ingestionParams); - } else { - getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect); - } - // since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add - // which skips pre-update lambdas. - getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams); - } catch (Exception exception) { - log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception); + ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> + ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, false)); + return null; + }); + } + + /** + * Raw internal ingest method for assets which skips any pre-, intra-, or post-processing. Route any aspects which + * have a registered routing GMS client to the respective GMS for ingestion. Finally, continue to save the aspect + * locally as well (i.e. dual write) + * @param asset asset to process + * @param aspectsToIgnore aspects to ignore + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ + @Nonnull + protected Task rawIngestInternalAsset(@Nonnull ASSET asset, + @Nonnull Set> aspectsToIgnore, + @Nullable IngestionParams ingestionParams) { + // TODO: META-18950: add trackingContext to BaseAspectRoutingResource. currently the param is unused. + return RestliUtils.toTask(() -> { + final URN urn = (URN) ModelUtils.getUrnFromAsset(asset); + final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext()); + IngestionTrackingContext trackingContext = + ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null; + ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> + ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, true)); + return null; + }); + } + + /** + * Helper function to ingest an aspect either via routing or locally (or both). There is a flag that can be toggled + * to indicate whether to execute pre-, intra-, or post-ingestion updates if they exist. + * @param aspectsToIgnore set of aspect classes to ignore, if any + * @param urn urn associated with the aspect to ingest + * @param aspect aspect to ingest + * @param trackingContext context for tracking ingestion health + * @param ingestionParams optional ingestion parameters + * @param auditStamp audit information of the update + * @param skipExtraProcessing flag to indicate whether to execute pre-, intra-, or post-ingestion updates + */ + private void ingestAspect(Set> aspectsToIgnore, Urn urn, RecordTemplate aspect, + IngestionTrackingContext trackingContext, IngestionParams ingestionParams, AuditStamp auditStamp, + boolean skipExtraProcessing) { + if (!aspectsToIgnore.contains(aspect.getClass())) { + if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) { + try { + // get the updated aspect if there is a preupdate routing lambda registered + RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry(); + if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass())) { + log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass())); + aspect = preUpdateRouting((URN) urn, aspect, registry); + log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect); + // Get the fqcn of the aspect class + String aspectFQCN = aspect.getClass().getCanonicalName(); + //TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting + if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) { + log.info("Skip ingestion in ingestInternalAsset for urn: {}, aspectFQCN: {}", urn, aspectFQCN); + return; } + } + if (trackingContext != null) { + getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()) + .ingestWithTracking(urn, aspect, trackingContext, ingestionParams); } else { - getLocalDAO().add(urn, aspect, auditStamp, trackingContext, ingestionParams); + getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect); } + // here, always call a simple version of BaseLocalDAO::add which skips pre-update lambdas regardless of + // the value of param skipExtraProcessing since any pre-update lambdas would have already been executed + // in the code above. + getLocalDAO().rawAdd((URN) urn, aspect, auditStamp, trackingContext, ingestionParams); + } catch (Exception exception) { + log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception); } - }); - return null; - }); + } else { + if (skipExtraProcessing) { + // call a simple version of BaseLocalDAO::add which skips pre-update lambdas. + getLocalDAO().rawAdd((URN) urn, aspect, auditStamp, trackingContext, ingestionParams); + } else { + getLocalDAO().add((URN) urn, aspect, auditStamp, trackingContext, ingestionParams); + } + } + } } /** diff --git a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java index 5ac8cf543..da5f3154e 100644 --- a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java +++ b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java @@ -331,7 +331,23 @@ public Task ingestWithTracking(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPS } /** - * An action method for automated ingestion pipeline. + * Deprecated to use {@link #rawIngestAsset(RecordTemplate, IngestionParams)} instead. + * Same as {@link #ingestWithTracking(RecordTemplate, IngestionTrackingContext, IngestionParams)} but skips any pre-ingestion updates. + * @param snapshot Snapshot of the metadata change to be ingested + * @param trackingContext {@link IngestionTrackingContext} to 1) track DAO-level metrics and 2) to pass on to MAE emission + * @return ingest task + */ + @Deprecated + @Action(name = ACTION_RAW_INGEST) + @Nonnull + public Task rawIngest(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPSHOT snapshot, + @ActionParam(PARAM_TRACKING_CONTEXT) @Nonnull IngestionTrackingContext trackingContext, + @Optional @ActionParam(PARAM_INGESTION_PARAMS) IngestionParams ingestionParams) { + return rawIngestInternal(snapshot, Collections.emptySet(), trackingContext, ingestionParams); + } + + /** + * An action method for automated ingestion pipeline, also called high-level write. * @param asset Asset of the metadata change to be ingested * @return ingest task */ @@ -342,6 +358,26 @@ public Task ingestAsset(@ActionParam(PARAM_ASSET) @Nonnull ASSET asset, return ingestInternalAsset(asset, Collections.emptySet(), ingestionParams); } + /** + * An action method for automated ingestion pipeline which skips any pre-ingestion updates, also called low-level write. + * @param asset Asset of the metadata change to be ingested + * @return ingest task + */ + @Action(name = ACTION_RAW_INGEST_ASSET) + @Nonnull + public Task rawIngestAsset(@ActionParam(PARAM_ASSET) @Nonnull ASSET asset, + @Optional @ActionParam(PARAM_INGESTION_PARAMS) IngestionParams ingestionParams) { + return rawIngestAssetInternal(asset, Collections.emptySet(), ingestionParams); + } + + /** + * Internal ingest method for snapshots. First execute any pre-ingestion updates. Then, save the aspect locally. + * @param snapshot snapshot to process + * @param aspectsToIgnore aspects to ignore + * @param trackingContext context for tracking ingestion health + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ @Nonnull protected Task ingestInternal(@Nonnull SNAPSHOT snapshot, @Nonnull Set> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext, @@ -358,6 +394,37 @@ protected Task ingestInternal(@Nonnull SNAPSHOT snapshot, }); } + /** + * Raw internal ingest method for snapshots which skips any pre-, intra-, or post-processing. Save the aspect locally. + * @param snapshot snapshot to process + * @param aspectsToIgnore aspects to ignore + * @param trackingContext context for tracking ingestion health + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ + @Nonnull + protected Task rawIngestInternal(@Nonnull SNAPSHOT snapshot, + @Nonnull Set> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext, + @Nullable IngestionParams ingestionParams) { + return RestliUtils.toTask(() -> { + final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot); + final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext()); + ModelUtils.getAspectsFromSnapshot(snapshot).stream().forEach(aspect -> { + if (!aspectsToIgnore.contains(aspect.getClass())) { + getLocalDAO().rawAdd(urn, aspect, auditStamp, trackingContext, ingestionParams); + } + }); + return null; + }); + } + + /** + * Internal ingest method for assets. First execute any pre-ingestion updates. Then, save the aspect locally. + * @param asset asset to process + * @param aspectsToIgnore aspects to ignore + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ @Nonnull protected Task ingestInternalAsset(@Nonnull ASSET asset, @Nonnull Set> aspectsToIgnore, @@ -376,6 +443,31 @@ protected Task ingestInternalAsset(@Nonnull ASSET asset, }); } + /** + * Raw internal ingest method for assets which skips any pre-, intra-, or post-processing. Save the aspect locally. + * @param asset asset to process + * @param aspectsToIgnore aspects to ignore + * @param ingestionParams optional ingestion parameters + * @return Restli Task for metadata ingestion + */ + @Nonnull + protected Task rawIngestAssetInternal(@Nonnull ASSET asset, + @Nonnull Set> aspectsToIgnore, + @Nullable IngestionParams ingestionParams) { + return RestliUtils.toTask(() -> { + final URN urn = (URN) ModelUtils.getUrnFromAsset(asset); + final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext()); + IngestionTrackingContext ingestionTrackingContext = + ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null; + ModelUtils.getAspectsFromAsset(asset).stream().forEach(aspect -> { + if (!aspectsToIgnore.contains(aspect.getClass())) { + getLocalDAO().rawAdd(urn, aspect, auditStamp, ingestionTrackingContext, ingestionParams); + } + }); + return null; + }); + } + /** * Deprecated to use {@link #getAsset(String, String[])} instead. * An action method for getting a snapshot of aspects for an entity. diff --git a/restli-resources/src/main/java/com/linkedin/metadata/restli/RestliConstants.java b/restli-resources/src/main/java/com/linkedin/metadata/restli/RestliConstants.java index 1126b5a95..42622c5d2 100644 --- a/restli-resources/src/main/java/com/linkedin/metadata/restli/RestliConstants.java +++ b/restli-resources/src/main/java/com/linkedin/metadata/restli/RestliConstants.java @@ -27,6 +27,8 @@ private RestliConstants() { } public static final String ACTION_INGEST_ASSET = "ingestAsset"; public static final String ACTION_INGEST_WITH_TRACKING = "ingestWithTracking"; public static final String ACTION_QUERY = "query"; + public static final String ACTION_RAW_INGEST = "rawIngest"; + public static final String ACTION_RAW_INGEST_ASSET = "rawIngestAsset"; public static final String ACTION_LIST_URNS_FROM_INDEX = "listUrnsFromIndex"; public static final String ACTION_LIST_URNS = "listUrns"; public static final String PARAM_INPUT = "input"; diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java index a196389ee..e47dd8de4 100644 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java @@ -306,7 +306,7 @@ public void testIngestWithRoutingAspect() { verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes)); verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry(); - verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any()); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @Test @@ -326,7 +326,7 @@ public void testIngestWithTrackingWithRoutingAspect() { verify(_mockAspectFooGmsClient, times(1)).ingestWithTracking(eq(urn), eq(foo), eq(trackingContext), eq(null)); verify(_mockAspectAttributeGmsClient, times(1)).ingestWithTracking(eq(urn), eq(attributes), eq(trackingContext), eq(null)); verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry(); - verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any()); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @Test @@ -354,7 +354,7 @@ public void testIngestWithOnlyRoutingAspect() { runAndWait(_resource.ingest(snapshot)); verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry(); - verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any()); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); // verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes)); @@ -570,9 +570,9 @@ public void testPreUpdateRoutingWithRegisteredAspect() { AspectFoo foobar = new AspectFoo().setValue("foobar"); // dual write pt1: ensure the ingestion request is forwarded to the routed GMS. verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foobar)); - // dual write pt2: ensure local write using addSkipPreIngestionUpdates() and not add(). + // dual write pt2: ensure local write using rawAdd() and not add(). verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any()); - verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foobar), any(), any(), any()); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foobar), any(), any(), any()); } @Test @@ -589,9 +589,9 @@ public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() { // expected: the aspect value remains unchanged and the aspect is dual-written. // dual write pt1: ensure the ingestion request is forwarded to the routed GMS. verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); - // dual write pt2: ensure local write using addSkipPreIngestionUpdates() and not add(). + // dual write pt2: ensure local write using rawAdd() and not add(). verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any()); - verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any()); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @Test @@ -679,7 +679,7 @@ public void testPreUpdateRoutingWithSkipIngestionNoPreLambda() throws NoSuchFiel // Should check for pre lambda verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry(); // Should continue to dual-write into local DAO - verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any()); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); verifyNoMoreInteractions(_mockLocalDAO); } } diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java index f3971903e..44f0400fd 100644 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java @@ -1509,4 +1509,22 @@ public void testGetAsset() { assertEquals(asset.getAspectFooBar(), fooBar); assertEquals(asset.getAspectAttributes(), attributes); } + + @Test + public void testRawIngestSkipPreIngestionUpdates() { + FooUrn urn = makeFooUrn(1); + AspectFoo foo = new AspectFoo().setValue("foo"); + AspectBar bar = new AspectBar().setValue("bar"); + List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo), + ModelUtils.newAspectUnion(EntityAspectUnion.class, bar)); + EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); + + runAndWait(_resource.rawIngest(snapshot, new IngestionTrackingContext(), null)); + + verify(_mockLocalDAO, times(0)).add(eq(urn), eq(foo), any(), any(), eq(null)); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), eq(null)); + verify(_mockLocalDAO, times(0)).add(eq(urn), eq(bar), any(), any(), eq(null)); + verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(bar), any(), any(), eq(null)); + verifyNoMoreInteractions(_mockLocalDAO); + } }