Skip to content

Commit

Permalink
[Part 1][Persist message id] Adjust interface to persist message id (#…
Browse files Browse the repository at this point in the history
…308)

* Persist message id into entity table

* revert

* use UUID

* fix style

* fix style

* fix style

---------

Co-authored-by: Jesse Jia <[email protected]>
  • Loading branch information
zhixuanjia and Jesse Jia authored Oct 29, 2023
1 parent d7394b5 commit 1d73a34
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao;

import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void ensureSchemaUpToDate() {
@Override
@Transactional
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp) {
@Nonnull AuditStamp auditStamp, @Nullable UUID messageId) {

final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
Expand Down Expand Up @@ -282,9 +283,6 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
}
}




@Nonnull
@Override
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<ASPECT> aspectClass, int start,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.DataSchema;
Expand Down Expand Up @@ -630,7 +631,7 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn,
return null; // unused
}
AuditStamp auditStamp = makeAuditStamp(result);
_localAccess.add(urn, toRecordTemplate(aspectClass, result).orElse(null), aspectClass, auditStamp);
_localAccess.add(urn, toRecordTemplate(aspectClass, result).orElse(null), aspectClass, auditStamp, null);
return null; // unused
}, 1);
}
Expand Down Expand Up @@ -760,7 +761,8 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// ensure atomicity by running old schema update + new schema update in a transaction
numOfUpdatedRows = runInTransactionWithRetry(() -> {
_localAccess.add(urn, (ASPECT) value, aspectClass, newAuditStamp);
UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null;
_localAccess.add(urn, (ASPECT) value, aspectClass, newAuditStamp, messageId);
return _server.execute(update);
}, 1);
} else {
Expand All @@ -782,7 +784,8 @@ protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullabl
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
// the metadata entity tables shouldn't been updated.
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp);
UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null;
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, messageId);
}

if (_changeLogEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao;

import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
Expand Down Expand Up @@ -31,7 +32,7 @@ public interface IEbeanLocalAccess<URN extends Urn> {
* @return number of rows inserted or updated
*/
<ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp);
@Nonnull AuditStamp auditStamp, @Nullable UUID messageId);

/**
* Upsert relationships to the local relationship table(s).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.linkedin.metadata.dao;

import com.google.common.io.Resources;
import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.data.ByteString;
import com.linkedin.metadata.dao.localrelationship.SampleLocalRelationshipRegistryImpl;
import com.linkedin.metadata.dao.scsi.EmptyPathExtractor;
import com.linkedin.metadata.dao.utils.BarUrnPathExtractor;
Expand Down Expand Up @@ -55,6 +57,7 @@ public class EbeanLocalAccessTest {
private static IEbeanLocalAccess<BurgerUrn> _ebeanLocalAccessBurger;
private static long _now;
private static final LocalRelationshipFilter EMPTY_FILTER = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray());
private static final byte[] UUID = {0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1};

@BeforeClass
public void init() {
Expand All @@ -81,7 +84,7 @@ public void setupTest() throws IOException {
AspectFoo aspectFoo = new AspectFoo();
aspectFoo.setValue(String.valueOf(i));
AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis());
_ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp);
_ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));
}
}

Expand Down Expand Up @@ -266,7 +269,7 @@ public void testCountAggregate() {
AspectFoo aspectFoo = new AspectFoo();
aspectFoo.setValue(String.valueOf(25));
AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis());
_ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp);
_ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));
countMap = _ebeanLocalAccessFoo.countAggregate(indexFilter, indexGroupByCriterion);

// Expect: there are 2 counts for value 25
Expand All @@ -280,7 +283,7 @@ public void testEscapeSpecialCharInUrn() {

// Single quote is a special char in SQL.
BurgerUrn johnsBurgerUrn1 = makeBurgerUrn("urn:li:burger:John's burger");
_ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp);
_ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));

AspectKey aspectKey1 = new AspectKey(AspectFoo.class, johnsBurgerUrn1, 0L);
List<EbeanMetadataAspect> ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey1), 1, 0, false);
Expand All @@ -289,7 +292,7 @@ public void testEscapeSpecialCharInUrn() {

// Double quote is a special char in SQL.
BurgerUrn johnsBurgerUrn2 = makeBurgerUrn("urn:li:burger:John\"s burger");
_ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp);
_ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));

AspectKey aspectKey2 = new AspectKey(AspectFoo.class, johnsBurgerUrn2, 0L);
ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey2), 1, 0, false);
Expand All @@ -298,7 +301,7 @@ public void testEscapeSpecialCharInUrn() {

// Backslash is a special char in SQL.
BurgerUrn johnsBurgerUrn3 = makeBurgerUrn("urn:li:burger:John\\s burger");
_ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp);
_ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));

AspectKey aspectKey3 = new AspectKey(AspectFoo.class, johnsBurgerUrn3, 0L);
ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey3), 1, 0, false);
Expand All @@ -315,10 +318,10 @@ public void testAddWithLocalRelationshipBuilder() throws URISyntaxException {
AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(barUrn1, barUrn2, barUrn3));
AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis());

_ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp);
_ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp);
_ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp);
_ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp);
_ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp, new UUID(ByteString.copy(UUID)));
_ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));
_ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));
_ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));

// Verify local relationships and entity are added.
EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server);
Expand All @@ -345,7 +348,7 @@ public void testAtomicityWithLocalRelationshipBuilder() throws URISyntaxExceptio
AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis());

try {
_ebeanLocalAccessFoo.add(makeFooUrn(1), aspectFooBar, AspectFooBar.class, auditStamp);
_ebeanLocalAccessFoo.add(makeFooUrn(1), aspectFooBar, AspectFooBar.class, auditStamp, new UUID(ByteString.copy(UUID)));
} catch (Exception exception) {
// Verify no relationship is added.
List<SqlRow> relationships = _server.createSqlQuery("SELECT * FROM metadata_relationship_belongsto").findList();
Expand All @@ -357,7 +360,7 @@ public void testAtomicityWithLocalRelationshipBuilder() throws URISyntaxExceptio
public void testUrnExtraction() {
FooUrn urn1 = makeFooUrn(1);
AspectFoo foo1 = new AspectFoo().setValue("foo");
_ebeanLocalAccessFoo.add(urn1, foo1, AspectFoo.class, makeAuditStamp("actor", _now));
_ebeanLocalAccessFoo.add(urn1, foo1, AspectFoo.class, makeAuditStamp("actor", _now), new UUID(ByteString.copy(UUID)));

// get content of virtual column
List<SqlRow> results = _server.createSqlQuery("SELECT i_urn$fooId as id FROM metadata_entity_foo").findList();
Expand All @@ -379,10 +382,10 @@ public void testAddRelationships() throws URISyntaxException {

// Turn off local relationship ingestion first, to fill only the entity tables.
_ebeanLocalAccessFoo.setLocalRelationshipBuilderRegistry(null);
_ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp);
_ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp);
_ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp);
_ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp);
_ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp, new UUID(ByteString.copy(UUID)));
_ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));
_ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));
_ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID)));

// Verify that NO local relationships were added
EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server);
Expand Down Expand Up @@ -440,7 +443,7 @@ public void testFindLatestMetadataAspect() throws URISyntaxException {
@Test
public void testGetAspectNoSoftDeleteCheck() {
FooUrn fooUrn = makeFooUrn(0);
_ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis()));
_ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis()), new UUID(ByteString.copy(UUID)));
AspectKey<FooUrn, AspectFoo> aspectKey = new AspectKey(AspectFoo.class, fooUrn, 0L);
List<EbeanMetadataAspect> ebeanMetadataAspectList =
_ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false);
Expand Down
Loading

0 comments on commit 1d73a34

Please sign in to comment.