Skip to content

Commit

Permalink
Persist: introduce type-safe bulk-fetch functions (#8763)
Browse files Browse the repository at this point in the history
This change introduces `Persist.fetchTypedObjs()` and `Persist.fetchTypedObjsIfExist()` as bulk-couterparts to `Persist.fetchTypedObj()`.

Also refactor the code paths to let the untyped functions use the typed functions to unify/keep common code paths.
  • Loading branch information
snazy authored Jun 11, 2024
1 parent dcb5cf7 commit 7071399
Show file tree
Hide file tree
Showing 21 changed files with 458 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -178,23 +179,6 @@ public void upsertObjs(@Nonnull @javax.annotation.Nonnull Obj[] objs)
}
}

@Override
@Nonnull
@javax.annotation.Nonnull
public Obj fetchObj(@Nonnull @javax.annotation.Nonnull ObjId id) throws ObjNotFoundException {
readLock();
try {
Obj r = pendingObj(id);
if (r != null) {
return r;
}
} finally {
readUnlock();
}

return delegate().fetchObj(id);
}

private Obj pendingObj(ObjId id) {
Obj r = pendingUpserts.get(id);
if (r == null) {
Expand All @@ -207,13 +191,13 @@ private Obj pendingObj(ObjId id) {
@Nonnull
@javax.annotation.Nonnull
public <T extends Obj> T fetchTypedObj(
@Nonnull @javax.annotation.Nonnull ObjId id, ObjType type, Class<T> typeClass)
@Nonnull @javax.annotation.Nonnull ObjId id, ObjType type, @Nonnull Class<T> typeClass)
throws ObjNotFoundException {
readLock();
try {
Obj r = pendingObj(id);
if (r != null) {
if (!r.type().equals(type)) {
if (type != null && !r.type().equals(type)) {
throw new ObjNotFoundException(id);
}
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -246,23 +230,24 @@ public ObjType fetchObjType(@Nonnull @javax.annotation.Nonnull ObjId id)
@Override
@Nonnull
@javax.annotation.Nonnull
public Obj[] fetchObjs(@Nonnull @javax.annotation.Nonnull ObjId[] ids)
throws ObjNotFoundException {
public <T extends Obj> T[] fetchTypedObjs(
@Nonnull ObjId[] ids, ObjType type, @Nonnull Class<T> typeClass) throws ObjNotFoundException {

ObjId[] backendIds = null;
Obj[] r = new Obj[ids.length];
@SuppressWarnings("unchecked")
T[] r = (T[]) Array.newInstance(typeClass, ids.length);

backendIds = fetchObjsPre(ids, r, backendIds);

if (backendIds == null) {
return r;
}

Obj[] backendResult = delegate().fetchObjs(backendIds);
T[] backendResult = delegate().fetchTypedObjs(backendIds, type, typeClass);
return fetchObjsPost(backendResult, r);
}

private ObjId[] fetchObjsPre(ObjId[] ids, Obj[] r, ObjId[] backendIds) {
private <T extends Obj> ObjId[] fetchObjsPre(ObjId[] ids, T[] r, ObjId[] backendIds) {
readLock();
try {
for (int i = 0; i < ids.length; i++) {
Expand All @@ -272,7 +257,9 @@ private ObjId[] fetchObjsPre(ObjId[] ids, Obj[] r, ObjId[] backendIds) {
}
Obj o = pendingObj(id);
if (o != null) {
r[i] = o;
@SuppressWarnings("unchecked")
T typed = (T) o;
r[i] = typed;
} else {
if (backendIds == null) {
backendIds = new ObjId[ids.length];
Expand All @@ -286,9 +273,9 @@ private ObjId[] fetchObjsPre(ObjId[] ids, Obj[] r, ObjId[] backendIds) {
return backendIds;
}

private static Obj[] fetchObjsPost(Obj[] backendResult, Obj[] r) {
private static <T extends Obj> T[] fetchObjsPost(T[] backendResult, T[] r) {
for (int i = 0; i < backendResult.length; i++) {
Obj o = backendResult[i];
T o = backendResult[i];
if (o != null) {
r[i] = o;
}
Expand All @@ -299,18 +286,20 @@ private static Obj[] fetchObjsPost(Obj[] backendResult, Obj[] r) {
@Override
@Nonnull
@javax.annotation.Nonnull
public Obj[] fetchObjsIfExist(@Nonnull @javax.annotation.Nonnull ObjId[] ids) {
public <T extends Obj> T[] fetchTypedObjsIfExist(
@Nonnull ObjId[] ids, ObjType type, @Nonnull Class<T> typeClass) {

ObjId[] backendIds = null;
Obj[] r = new Obj[ids.length];
@SuppressWarnings("unchecked")
T[] r = (T[]) Array.newInstance(typeClass, ids.length);

backendIds = fetchObjsPre(ids, r, backendIds);

if (backendIds == null) {
return r;
}

Obj[] backendResult = delegate().fetchObjsIfExist(backendIds);
T[] backendResult = delegate().fetchTypedObjsIfExist(backendIds, type, typeClass);
return fetchObjsPost(backendResult, r);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import jakarta.annotation.Nonnull;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -305,13 +305,20 @@ private static Reference referenceFromRow(Row row) {

@Override
@Nonnull
public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException {
public <T extends Obj> T fetchTypedObj(
@Nonnull ObjId id, ObjType type, @Nonnull Class<T> typeClass) throws ObjNotFoundException {
try {
ByteString key = dbKey(id);

Row row = backend.client().readRow(backend.tableObjsId, key);
if (row != null) {
return objFromRow(row);
Obj obj = objFromRow(row);
if (type != null && !type.equals(obj.type())) {
throw new ObjNotFoundException(id);
}
@SuppressWarnings("unchecked")
T r = (T) obj;
return r;
}
throw new ObjNotFoundException(id);
} catch (ApiException e) {
Expand All @@ -320,54 +327,23 @@ public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException {
}

@Override
@Nonnull
public <T extends Obj> T fetchTypedObj(@Nonnull ObjId id, ObjType type, Class<T> typeClass)
throws ObjNotFoundException {
Obj obj = fetchObj(id);
if (!obj.type().equals(type)) {
throw new ObjNotFoundException(id);
}
@SuppressWarnings("unchecked")
T r = (T) obj;
return r;
}

@Override
@Nonnull
public ObjType fetchObjType(@Nonnull ObjId id) throws ObjNotFoundException {
return fetchObj(id).type();
}

@Override
@Nonnull
public Obj[] fetchObjs(@Nonnull ObjId[] ids) throws ObjNotFoundException {
public <T extends Obj> T[] fetchTypedObjsIfExist(
@Nonnull ObjId[] ids, ObjType type, @Nonnull Class<T> typeClass) {
try {
Obj[] r = new Obj[ids.length];
List<ObjId> notFound = new ArrayList<>();
bulkFetch(backend.tableObjsId, ids, r, this::dbKey, this::objFromRow, notFound::add);

if (!notFound.isEmpty()) {
throw new ObjNotFoundException(notFound);
}
@SuppressWarnings("unchecked")
T[] r = (T[]) Array.newInstance(typeClass, ids.length);

return r;
} catch (ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
} catch (ApiException e) {
throw apiException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

@Override
@Nonnull
public Obj[] fetchObjsIfExist(@Nonnull ObjId[] ids) {
try {
Obj[] r = new Obj[ids.length];
bulkFetch(backend.tableObjsId, ids, r, this::dbKey, this::objFromRow, x -> {});

if (type != null) {
for (int i = 0; i < r.length; i++) {
Obj o = r[i];
if (o != null && !type.equals(o.type())) {
r[i] = null;
}
}
}

return r;
} catch (ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import static org.projectnessie.versioned.storage.cache.CacheBackend.NOT_FOUND_OBJ_SENTINEL;

import jakarta.annotation.Nonnull;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.projectnessie.versioned.storage.common.config.StoreConfig;
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
Expand Down Expand Up @@ -129,6 +132,53 @@ public Obj[] fetchObjs(@Nonnull ObjId[] ids) throws ObjNotFoundException {
return fetchObjsPost(backendIds, backendResult, r, null);
}

@Nonnull
@Override
public <T extends Obj> T[] fetchTypedObjs(
@Nonnull ObjId[] ids, ObjType type, @Nonnull Class<T> typeClass) throws ObjNotFoundException {
@SuppressWarnings("unchecked")
T[] r = (T[]) Array.newInstance(typeClass, ids.length);

ObjId[] backendIds = fetchObjsPre(ids, r, type, typeClass);

if (backendIds != null) {
T[] backendResult = persist.fetchTypedObjsIfExist(backendIds, type, typeClass);
r = fetchObjsPost(backendIds, backendResult, r, type);
}

List<ObjId> notFound = null;
for (int i = 0; i < ids.length; i++) {
ObjId id = ids[i];
if (r[i] == null && id != null) {
if (notFound == null) {
notFound = new ArrayList<>();
}
notFound.add(id);
}
}
if (notFound != null) {
throw new ObjNotFoundException(notFound);
}

return r;
}

@Override
public <T extends Obj> T[] fetchTypedObjsIfExist(
@Nonnull ObjId[] ids, ObjType type, @Nonnull Class<T> typeClass) {
@SuppressWarnings("unchecked")
T[] r = (T[]) Array.newInstance(typeClass, ids.length);

ObjId[] backendIds = fetchObjsPre(ids, r, type, typeClass);

if (backendIds == null) {
return r;
}

T[] backendResult = persist.fetchTypedObjsIfExist(backendIds, type, typeClass);
return fetchObjsPost(backendIds, backendResult, r, type);
}

private <T extends Obj> ObjId[] fetchObjsPre(
ObjId[] ids, T[] r, ObjType type, @Nonnull Class<T> typeClass) {
ObjId[] backendIds = null;
Expand Down
Loading

0 comments on commit 7071399

Please sign in to comment.