Skip to content

Commit

Permalink
fix(batchGet): synchronize batchGet() behavior with get() (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdonn authored Feb 1, 2023
1 parent d098d8f commit 2d613e6
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import com.linkedin.metadata.query.MapMetadata;
import com.linkedin.parseq.Task;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.BatchResult;
import com.linkedin.restli.server.CollectionResult;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.annotations.Action;
import com.linkedin.restli.server.annotations.ActionParam;
import com.linkedin.restli.server.annotations.Finder;
Expand All @@ -36,6 +39,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -157,21 +161,58 @@ public Task<VALUE> get(@Nonnull KEY id,
}

/**
* Similar to {@link #get(Object, String[])} but for multiple entities.
* Similar to {@link #get(Object, String[])} but for multiple entities. This method is deprecated in favor of
* {@link #batchGetWithErrors}. This method has incorrect behavior when dealing with keys which don't exist
* in the database (<a href="https://github.com/linkedin/datahub-gma/issues/136">Issue #136</a>). The latter method
* properly returns a BatchResult which includes a map of errors in addition to the successful batch results.
*/
@RestMethod.BatchGet
@Deprecated
@Nonnull
public Task<Map<KEY, VALUE>> batchGet(
@Nonnull Set<KEY> ids,
@QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) {
// TODO: discuss and sync with get()'s intended behavior (check if urn exists). https://github.com/linkedin/datahub-gma/issues/136
return RestliUtils.toTask(() -> {
final Map<URN, KEY> urnMap =
ids.stream().collect(Collectors.toMap(id -> toUrn(id), Function.identity()));
ids.stream().collect(Collectors.toMap(this::toUrn, Function.identity()));
return getInternal(urnMap.keySet(), parseAspectsParam(aspectNames)).entrySet()
.stream()
.collect(
Collectors.toMap(e -> urnMap.get(e.getKey()), e -> e.getValue()));
Collectors.toMap(e -> urnMap.get(e.getKey()), Map.Entry::getValue));
});
}

/**
* Similar to {@link #get(Object, String[])} but for multiple entities. Compared to the deprecated {@link #batchGet}
* method, this method properly returns a BatchResult which includes a map of errors in addition to the successful
* batch results.
*/
@RestMethod.BatchGet
@Nonnull
public Task<BatchResult<KEY, VALUE>> batchGetWithErrors(
@Nonnull Set<KEY> ids,
@QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) {
return RestliUtils.toTask(() -> {
final Map<KEY, RestLiServiceException> errors = new HashMap<>();
final Map<KEY, HttpStatus> statuses = new HashMap<>();
final Map<URN, KEY> urnMap =
ids.stream().collect(Collectors.toMap(this::toUrn, Function.identity()));
final Map<URN, VALUE> batchResult = getInternal(urnMap.keySet(), parseAspectsParam(aspectNames));
batchResult.entrySet().removeIf(entry -> {
if (!entry.getValue().data().isEmpty()) {
// don't remove if there is a non-empty value associated with the key
statuses.put(urnMap.get(entry.getKey()), HttpStatus.S_200_OK);
return false;
}
// if this key's value is empty, then this key doesn't exist in the db.
// mark this key with 404 and remove the entry from the map
errors.put(urnMap.get(entry.getKey()), new RestLiServiceException(HttpStatus.S_404_NOT_FOUND));
statuses.put(urnMap.get(entry.getKey()), HttpStatus.S_404_NOT_FOUND);
return true;
});
return new BatchResult<>(
batchResult.entrySet().stream().collect(Collectors.toMap(e -> urnMap.get(e.getKey()), Map.Entry::getValue)),
statuses, errors);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.BatchResult;
import com.linkedin.restli.server.CollectionResult;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.ResourceContext;
Expand Down Expand Up @@ -184,11 +185,7 @@ public void testGetUrnNotFound() {
public void testGetWithEmptyAspects() {
FooUrn urn = makeFooUrn(1234);

AspectKey<FooUrn, AspectFoo> aspect1Key = new AspectKey<>(AspectFoo.class, urn, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspect2Key = new AspectKey<>(AspectBar.class, urn, LATEST_VERSION);

when(_mockLocalDAO.exists(urn)).thenReturn(true);
when(_mockLocalDAO.get(new HashSet<>(Arrays.asList(aspect1Key, aspect2Key)))).thenReturn(Collections.emptyMap());

try {
EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), new String[0]));
Expand Down Expand Up @@ -277,6 +274,169 @@ public void testBatchGetSpecificAspect() {
verifyNoMoreInteractions(_mockLocalDAO);
}

@Test
public void testBatchGetWithErrorsUrnsNotFound() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
String[] aspectNames = {ModelUtils.getAspectName(AspectFoo.class)};

AspectKey<FooUrn, AspectFoo> aspectFooKey1 = new AspectKey<>(AspectFoo.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectFoo> aspectFooKey2 = new AspectKey<>(AspectFoo.class, urn1, LATEST_VERSION);

when(_mockLocalDAO.get(ImmutableSet.of(aspectFooKey1, aspectFooKey2)))
.thenReturn(Collections.emptyMap());

BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> result =
runAndWait(_resource.batchGetWithErrors(ImmutableSet.of(makeResourceKey(urn1), makeResourceKey(urn2)), aspectNames));

// convert BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> to BatchResult<EntityKey, EntityValue>
BatchResult<EntityKey, EntityValue> batchResultMap = convertBatchResult(result);

// ensure there are 2 404s in the form of HttpStatus
Map<EntityKey, HttpStatus> statuses = batchResultMap.getStatuses();
assertEquals(statuses.size(), 2);
assertEquals(statuses.get(makeKey(1)), HttpStatus.S_404_NOT_FOUND);
assertEquals(statuses.get(makeKey(2)), HttpStatus.S_404_NOT_FOUND);

// ensure there are 2 404s in the form of RestLiServiceException
Map<EntityKey, RestLiServiceException> errors = batchResultMap.getErrors();
assertEquals(errors.size(), 2);
assertEquals(errors.get(makeKey(1)).getStatus(), HttpStatus.S_404_NOT_FOUND);
assertEquals(errors.get(makeKey(2)).getStatus(), HttpStatus.S_404_NOT_FOUND);

// ensure the urns that don't exist are not in the result data map
assertEquals(batchResultMap.size(), 0);
}

@Test
public void testBatchGetWithErrorsWithEmptyAspects() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);

AspectKey<FooUrn, AspectFoo> aspectFooKey1 = new AspectKey<>(AspectFoo.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspectBarKey1 = new AspectKey<>(AspectBar.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectFooBar> aspectFooBarKey1 = new AspectKey<>(AspectFooBar.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectAttributes> aspectAttKey1 = new AspectKey<>(AspectAttributes.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectFoo> aspectFooKey2 = new AspectKey<>(AspectFoo.class, urn2, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspectBarKey2 = new AspectKey<>(AspectBar.class, urn2, LATEST_VERSION);
AspectKey<FooUrn, AspectFooBar> aspectFooBarKey2 = new AspectKey<>(AspectFooBar.class, urn2, LATEST_VERSION);
AspectKey<FooUrn, AspectAttributes> aspectAttKey2 = new AspectKey<>(AspectAttributes.class, urn2, LATEST_VERSION);

when(_mockLocalDAO.get(ImmutableSet.of(aspectFooBarKey1, aspectFooBarKey2, aspectFooKey1, aspectBarKey1, aspectFooKey2,
aspectBarKey2, aspectAttKey1, aspectAttKey2)))
.thenReturn(Collections.emptyMap());

BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> result =
runAndWait(_resource.batchGetWithErrors(ImmutableSet.of(makeResourceKey(urn1), makeResourceKey(urn2)), new String[0]));

// convert BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> to BatchResult<EntityKey, EntityValue>
BatchResult<EntityKey, EntityValue> batchResultMap = convertBatchResult(result);

// ensure there are 2 404s in the form of HttpStatus
Map<EntityKey, HttpStatus> statuses = batchResultMap.getStatuses();
assertEquals(statuses.size(), 2);
assertEquals(statuses.get(makeKey(1)), HttpStatus.S_404_NOT_FOUND);
assertEquals(statuses.get(makeKey(2)), HttpStatus.S_404_NOT_FOUND);

// ensure there are 2 404s in the form of RestLiServiceException
Map<EntityKey, RestLiServiceException> errors = batchResultMap.getErrors();
assertEquals(errors.size(), 2);
assertEquals(errors.get(makeKey(1)).getStatus(), HttpStatus.S_404_NOT_FOUND);
assertEquals(errors.get(makeKey(2)).getStatus(), HttpStatus.S_404_NOT_FOUND);

// ensure the urns that don't exist are not in the result data map
assertEquals(batchResultMap.size(), 0);
}

@Test
public void testBatchGetWithErrorsSpecificAspectsPartialSuccess() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectBar bar = new AspectBar().setValue("bar");
String[] aspectNames = {AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()};

AspectKey<FooUrn, AspectFoo> aspectFooKey1 = new AspectKey<>(AspectFoo.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspectBarKey1 = new AspectKey<>(AspectBar.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectFoo> aspectFooKey2 = new AspectKey<>(AspectFoo.class, urn2, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspectBarKey2 = new AspectKey<>(AspectBar.class, urn2, LATEST_VERSION);

when(_mockLocalDAO.get(ImmutableSet.of(aspectFooKey1, aspectBarKey1, aspectFooKey2, aspectBarKey2)))
.thenReturn(ImmutableMap.of(aspectFooKey1, Optional.of(foo), aspectBarKey2, Optional.of(bar)));

BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> result =
runAndWait(_resource.batchGetWithErrors(ImmutableSet.of(makeResourceKey(urn1), makeResourceKey(urn2)), aspectNames));

// convert BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> to BatchResult<EntityKey, EntityValue>
BatchResult<EntityKey, EntityValue> batchResultMap = convertBatchResult(result);

// ensure there are 2 200s and 0 404s in the form of HttpStatus
Map<EntityKey, HttpStatus> statuses = batchResultMap.getStatuses();
assertEquals(statuses.size(), 2);
assertEquals(statuses.get(makeKey(1)), HttpStatus.S_200_OK);
assertEquals(statuses.get(makeKey(2)), HttpStatus.S_200_OK);

// ensure there are 0 404s in the form of RestLiServiceException
Map<EntityKey, RestLiServiceException> errors = batchResultMap.getErrors();
assertEquals(errors.size(), 0);

// ensure there are 2 results in the result data map
assertEquals(batchResultMap.size(), 2);
assertEquals(batchResultMap.get(makeKey(1)).getFoo(), foo);
assertFalse(batchResultMap.get(makeKey(1)).hasBar());
assertEquals(batchResultMap.get(makeKey(2)).getBar(), bar);
assertFalse(batchResultMap.get(makeKey(2)).hasFoo());
}

@Test
public void testBatchGetWithErrorsUrnsPartialSuccess() {
FooUrn urn1 = makeFooUrn(1);
FooUrn urn2 = makeFooUrn(2);
AspectFoo foo = new AspectFoo().setValue("foo");
String[] aspectNames = {AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()};

AspectKey<FooUrn, AspectFoo> aspectFooKey1 = new AspectKey<>(AspectFoo.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspectBarKey1 = new AspectKey<>(AspectBar.class, urn1, LATEST_VERSION);
AspectKey<FooUrn, AspectFoo> aspectFooKey2 = new AspectKey<>(AspectFoo.class, urn2, LATEST_VERSION);
AspectKey<FooUrn, AspectBar> aspectBarKey2 = new AspectKey<>(AspectBar.class, urn2, LATEST_VERSION);

when(_mockLocalDAO.get(ImmutableSet.of(aspectFooKey1, aspectBarKey1, aspectFooKey2, aspectBarKey2)))
.thenReturn(ImmutableMap.of(aspectFooKey1, Optional.of(foo), aspectBarKey2, Optional.empty()));

BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> result =
runAndWait(_resource.batchGetWithErrors(ImmutableSet.of(makeResourceKey(urn1), makeResourceKey(urn2)), aspectNames));

// convert BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> to BatchResult<EntityKey, EntityValue>
BatchResult<EntityKey, EntityValue> batchResultMap = convertBatchResult(result);

// ensure there is 1 200 (urn1) and 1 404 (urn2) in the form of HttpStatus
Map<EntityKey, HttpStatus> statuses = batchResultMap.getStatuses();
assertEquals(statuses.size(), 2);
assertEquals(statuses.get(makeKey(1)), HttpStatus.S_200_OK);
assertEquals(statuses.get(makeKey(2)), HttpStatus.S_404_NOT_FOUND);

// ensure there is 1 404 in the form of RestLiServiceException (urn2)
Map<EntityKey, RestLiServiceException> errors = batchResultMap.getErrors();
assertEquals(errors.size(), 1);
assertEquals(errors.get(makeKey(2)).getStatus(), HttpStatus.S_404_NOT_FOUND);

// ensure there is 1 result in the result data map (urn1)
assertEquals(batchResultMap.size(), 1);
assertEquals(batchResultMap.get(makeKey(1)).getFoo(), foo);
assertFalse(batchResultMap.get(makeKey(1)).hasBar());
}

// convert BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> to BatchResult<EntityKey, EntityValue>
private BatchResult<EntityKey, EntityValue> convertBatchResult(BatchResult<ComplexResourceKey<EntityKey, EmptyRecord>, EntityValue> result) {
Map<EntityKey, EntityValue> dataMap =
result.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().getKey(), Map.Entry::getValue));
Map<EntityKey, HttpStatus> statusMap =
result.getStatuses().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().getKey(), Map.Entry::getValue));
Map<EntityKey, RestLiServiceException> errorMap =
result.getErrors().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().getKey(), Map.Entry::getValue));
return new BatchResult<>(dataMap, statusMap, errorMap);
}

@Test
public void testIngest() {
FooUrn urn = makeFooUrn(1);
Expand Down

0 comments on commit 2d613e6

Please sign in to comment.