Skip to content

Commit

Permalink
feat: add action emitNoChangeMetadataAuditEvent & add ingestionMode i…
Browse files Browse the repository at this point in the history
…n mae (#303)

* add action emitNoChangeMetadataAuditEvent & add ingestionMode in mae

* update doc
  • Loading branch information
JiaoMaWHU authored Oct 19, 2023
1 parent 00c2142 commit 26cc629
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 14 deletions.
20 changes: 12 additions & 8 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.tracking.TrackingUtils;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.IndexCriterion;
Expand Down Expand Up @@ -59,6 +60,8 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.dao.utils.IngestionUtils.*;


/**
* A base class for all Local DAOs.
Expand Down Expand Up @@ -577,7 +580,8 @@ private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResul
if (_emitAspectSpecificAuditEvent) {
if (_alwaysEmitAspectSpecificAuditEvent || oldValue != newValue) {
if (_trackingProducer != null) {
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, trackingContext);
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, trackingContext,
IngestionMode.LIVE);
} else {
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp);
}
Expand Down Expand Up @@ -1089,6 +1093,7 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
* @param urns set of urns to backfill
* @return map of urn to their backfilled aspect values
*/
@Deprecated
@Nonnull
public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> backfillWithNewValue(
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nonnull Set<URN> urns) {
Expand Down Expand Up @@ -1153,21 +1158,20 @@ private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode
updateLocalIndex(urn, aspect, FIRST_VERSION);
}

ASPECT oldValue = (mode == BackfillMode.MAE_ONLY_WITH_OLD_VALUE_NULL) ? null : aspect;

if (mode == BackfillMode.MAE_ONLY
|| mode == BackfillMode.BACKFILL_ALL
|| mode == BackfillMode.MAE_ONLY_WITH_OLD_VALUE_NULL) {
|| mode == BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX) {
IngestionMode ingestionMode = ALLOWED_INGESTION_BACKFILL_BIMAP.inverse().get(mode);
if (_trackingProducer != null) {
_trackingProducer.produceMetadataAuditEvent(urn, oldValue, aspect);
_trackingProducer.produceMetadataAuditEvent(urn, aspect, aspect);
IngestionTrackingContext trackingContext = new IngestionTrackingContext();
trackingContext.setTrackingId(TrackingUtils.getRandomUUID());
trackingContext.setEmitter("dao_backfill_endpoint");
trackingContext.setEmitTime(System.currentTimeMillis());
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, null, trackingContext);
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, trackingContext, ingestionMode);
} else {
_producer.produceMetadataAuditEvent(urn, oldValue, aspect);
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, null);
_producer.produceMetadataAuditEvent(urn, aspect, aspect);
_producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -26,7 +27,9 @@ public BaseTrackingMetadataEventProducer(@Nonnull Class<SNAPSHOT> snapshotClass,
* @param newValue the value after the update
* @param trackingContext nullable tracking context passed in to be appended to produced MAEv5s
* @param auditStamp {@link AuditStamp} containing version auditing information for the metadata change
* @param ingestionMode {@link IngestionMode} of the change
*/
public abstract <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext);
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nullable IngestionMode ingestionMode);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.linkedin.metadata.dao.utils;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.events.IngestionMode;


public class IngestionUtils {

private IngestionUtils() {
//Utils class
}

/**
* This method provides the bidirectional mapping between {@link IngestionMode} and {@link BackfillMode}. Only
* user-allowed ingestion modes are included in the mapping.
*/
public static final BiMap<IngestionMode, BackfillMode> ALLOWED_INGESTION_BACKFILL_BIMAP = createBiMap();
private static BiMap<IngestionMode, BackfillMode> createBiMap() {
BiMap<IngestionMode, BackfillMode> biMap = HashBiMap.create();
biMap.put(IngestionMode.BACKFILL, BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX);
biMap.put(IngestionMode.BOOTSTRAP, BackfillMode.BACKFILL_ALL);
return biMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ enum BackfillMode {
BACKFILL_ALL

/**
* Backfill only using MAE. Setting the old value in MAE payload as null.
* DO NOT USE, it's deprecated. Backfill only using MAE. Setting the old value in MAE payload as null.
*/
MAE_ONLY_WITH_OLD_VALUE_NULL

/**
* This type is a replacement type with the deprecation of MAE_ONLY_WITH_OLD_VALUE_NULL. It informs the downstream
* consumers that this backfill request should be ingested into the live index as well.
*/
BACKFILL_INCLUDING_LIVE_INDEX
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.IndexFilter;
Expand Down Expand Up @@ -333,8 +334,10 @@ public void testMAEv5WithTracking() throws URISyntaxException {

verify(_mockTrackingEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockTrackingEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, foo);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp, mockTrackingContext);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo, foo, _dummyAuditStamp, mockTrackingContext);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null,
foo, _dummyAuditStamp, mockTrackingContext, IngestionMode.LIVE);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo,
foo, _dummyAuditStamp, mockTrackingContext, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockTrackingEventProducer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.metadata.dao.utils;

import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.events.IngestionMode;
import org.testng.annotations.Test;

import static com.linkedin.metadata.dao.utils.IngestionUtils.*;
import static org.testng.Assert.*;


public class IngestionUtilsTest {

@Test
public void testAllowedIngestionModeBackfillModeBimap() {
IngestionMode ingestionMode = IngestionMode.BACKFILL;
BackfillMode backfillMode = ALLOWED_INGESTION_BACKFILL_BIMAP.get(ingestionMode);
assertEquals(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX, backfillMode);

assertEquals(ingestionMode, ALLOWED_INGESTION_BACKFILL_BIMAP.inverse().get(backfillMode));
assertNull(ALLOWED_INGESTION_BACKFILL_BIMAP.get(IngestionMode.LIVE));
assertNull(ALLOWED_INGESTION_BACKFILL_BIMAP.inverse().get(BackfillMode.MAE_ONLY_WITH_OLD_VALUE_NULL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,11 +913,24 @@ public void testBackfillUsingSCSI() {
Urn urn = urns.get(index);
RecordTemplate aspect = aspects.get(urn).get(AspectBar.class);
assertEquals(backfilledAspects.get(urn).get(AspectBar.class).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, aspect, null);
verify(_mockProducer, times(0)).produceMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(0)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
}
verifyNoMoreInteractions(_mockProducer);
assertEquals(dao.listUrns(indexFilter, null, 3).size(), 3);

// Backfill in BACKFILL_INCLUDING_LIVE_INDEX mode
clearInvocations(_mockProducer);
backfilledAspects =
dao.backfill(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX, ImmutableSet.of(AspectBar.class), FooUrn.class, null, 3);
for (int index = 0; index < 3; index++) {
Urn urn = urns.get(index);
RecordTemplate aspect = aspects.get(urn).get(AspectBar.class);
assertEquals(backfilledAspects.get(urn).get(AspectBar.class).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
}
verifyNoMoreInteractions(_mockProducer);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.metadata.dao.UrnAspectEntry;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.IndexCriterion;
import com.linkedin.metadata.query.IndexCriterionArray;
Expand Down Expand Up @@ -52,6 +53,7 @@
import javax.annotation.Nullable;

import static com.linkedin.metadata.dao.BaseReadDAO.*;
import static com.linkedin.metadata.dao.utils.IngestionUtils.*;
import static com.linkedin.metadata.restli.RestliConstants.*;


Expand Down Expand Up @@ -326,11 +328,32 @@ public Task<BackfillResult> backfill(@ActionParam(PARAM_URNS) @Nonnull String[]
});
}

/**
* An action method for emitting no change MAE messages (oldValue == newValue). This action will add ingestionMode
* in the MAE payload to allow downstream consumers to decide processing strategy. Only BOOTSTRAP and BACKFILL are
* supported ingestion mode, other mode will result in no-op.
*/
@Action(name = ACTION_EMIT_NO_CHANGE_METADATA_AUDIT_EVENT)
@Nonnull
public Task<BackfillResult> emitNoChangeMetadataAuditEvent(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
@ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames,
@ActionParam(PARAM_INGESTION_MODE) @Nonnull IngestionMode ingestionMode) {
BackfillMode backfillMode = ALLOWED_INGESTION_BACKFILL_BIMAP.get(ingestionMode);
if (backfillMode == null) {
return RestliUtils.toTask(BackfillResult::new);
}
return RestliUtils.toTask(() -> {
final Set<URN> urnSet = Arrays.stream(urns).map(urnString -> parseUrnParam(urnString)).collect(Collectors.toSet());
return RestliUtils.buildBackfillResult(getLocalDAO().backfill(backfillMode, parseAspectsParam(aspectNames), urnSet));
});
}

/**
* An action method for emitting MAE backfill messages with new value (old value will be set as null). This action
* should be deprecated once the secondary store is moving away from elastic search, or the standard backfill
* method starts to safely backfill against live index.
*/
@Deprecated
@Action(name = ACTION_BACKFILL_WITH_NEW_VALUE)
@Nonnull
public Task<BackfillResult> backfillWithNewValue(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ private RestliConstants() { }
public static final String ACTION_BACKFILL_LEGACY = "backfillLegacy";
public static final String ACTION_BROWSE = "browse";
public static final String ACTION_COUNT_AGGREGATE = "countAggregate";
public static final String ACTION_EMIT_NO_CHANGE_METADATA_AUDIT_EVENT = "emitNoChangeMetadataAuditEvent";
public static final String ACTION_GET_BROWSE_PATHS = "getBrowsePaths";
public static final String ACTION_GET_SNAPSHOT = "getSnapshot";
public static final String ACTION_INGEST = "ingest";
Expand All @@ -37,5 +38,6 @@ private RestliConstants() { }
public static final String PARAM_URN = "urn";
public static final String PARAM_URNS = "urns";
public static final String PARAM_MODE = "mode";
public static final String PARAM_INGESTION_MODE = "ingestionMode";
public static final String PARAM_TRACKING_CONTEXT = "trackingContext";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.IndexCriterion;
import com.linkedin.metadata.query.IndexCriterionArray;
Expand Down Expand Up @@ -684,6 +685,88 @@ public void testBackfillWithNewValue() {
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
}

@Test
public void testEmitNoChangeMetadataAuditEvent() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectBar bar2 = new AspectBar().setValue("bar2");
String[] aspects = new String[]{"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"};
when(_mockLocalDAO.backfill(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX, _resource.parseAspectsParam(aspects), ImmutableSet.of(urn1, urn2)))
.thenReturn(
ImmutableMap.of(urn1, ImmutableMap.of(AspectFoo.class, Optional.of(foo1), AspectBar.class, Optional.of(bar1)),
urn2, ImmutableMap.of(AspectBar.class, Optional.of(bar2)))
);

BackfillResult backfillResult =
runAndWait(_resource.emitNoChangeMetadataAuditEvent(new String[]{urn1.toString(), urn2.toString()}, aspects,
IngestionMode.BACKFILL));
assertEquals(backfillResult.getEntities().size(), 2);

// Test first entity
BackfillResultEntity backfillResultEntity = backfillResult.getEntities().get(0);
assertEquals(backfillResultEntity.getUrn(), urn1);
assertEquals(backfillResultEntity.getAspects().size(), 2);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectFoo"));
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));

// Test second entity
backfillResultEntity = backfillResult.getEntities().get(1);
assertEquals(backfillResultEntity.getUrn(), urn2);
assertEquals(backfillResultEntity.getAspects().size(), 1);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
}

@Test
public void testEmitNoChangeMetadataAuditEventBootstrap() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectBar bar2 = new AspectBar().setValue("bar2");
String[] aspects = new String[]{"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"};
when(_mockLocalDAO.backfill(BackfillMode.BACKFILL_ALL, _resource.parseAspectsParam(aspects), ImmutableSet.of(urn1, urn2)))
.thenReturn(
ImmutableMap.of(urn1, ImmutableMap.of(AspectFoo.class, Optional.of(foo1), AspectBar.class, Optional.of(bar1)),
urn2, ImmutableMap.of(AspectBar.class, Optional.of(bar2)))
);

BackfillResult backfillResult =
runAndWait(_resource.emitNoChangeMetadataAuditEvent(new String[]{urn1.toString(), urn2.toString()}, aspects,
IngestionMode.BOOTSTRAP));
assertEquals(backfillResult.getEntities().size(), 2);

// Test first entity
BackfillResultEntity backfillResultEntity = backfillResult.getEntities().get(0);
assertEquals(backfillResultEntity.getUrn(), urn1);
assertEquals(backfillResultEntity.getAspects().size(), 2);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectFoo"));
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));

// Test second entity
backfillResultEntity = backfillResult.getEntities().get(1);
assertEquals(backfillResultEntity.getUrn(), urn2);
assertEquals(backfillResultEntity.getAspects().size(), 1);
assertTrue(backfillResultEntity.getAspects().contains("com.linkedin.testing.AspectBar"));
}

@Test
public void testEmitNoChangeMetadataAuditEventNoResult() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectBar bar2 = new AspectBar().setValue("bar2");
String[] aspects = new String[]{"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"};

BackfillResult backfillResult =
runAndWait(_resource.emitNoChangeMetadataAuditEvent(new String[]{urn1.toString(), urn2.toString()}, aspects,
IngestionMode.LIVE));
verify(_mockLocalDAO, times(0)).backfill(any(BackfillMode.class), any(Set.class), any(Set.class));
assertFalse(backfillResult.hasEntities());
}

@Test
public void testBackfillRelationshipTables() {
FooUrn fooUrn = makeFooUrn(1);
Expand Down

0 comments on commit 26cc629

Please sign in to comment.