Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Catalog] Remove unused parameters/code #8666

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,23 @@ public interface CatalogService {
* @param reqParams Parameters holding the Nessie reference specification, snapshot format and
* more.
* @param key content key of the table or view
* @param catalogUriResolver produces URIs for related entities, like Iceberg manifest lists or
* manifest files.
* @param expectedType The expected content-type.
* @return The response is either a response object or callback to produce the result. The latter
* is useful to return results that are quite big, for example Iceberg manifest lists or
* manifest files.
*/
CompletionStage<SnapshotResponse> retrieveSnapshot(
SnapshotReqParams reqParams,
ContentKey key,
CatalogUriResolver catalogUriResolver,
Content.Type expectedType)
SnapshotReqParams reqParams, ContentKey key, Content.Type expectedType)
throws NessieNotFoundException;

Stream<Supplier<CompletionStage<SnapshotResponse>>> retrieveSnapshots(
SnapshotReqParams reqParams,
List<ContentKey> keys,
CatalogUriResolver catalogUriResolver,
Consumer<Reference> effectiveReferenceConsumer)
throws NessieNotFoundException;

CompletionStage<Stream<SnapshotResponse>> commit(
ParsedReference reference,
CatalogCommit commit,
SnapshotReqParams reqParams,
CatalogUriResolver catalogUriResolver)
ParsedReference reference, CatalogCommit commit, SnapshotReqParams reqParams)
throws BaseNessieClientServerException;

interface CatalogUriResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException;
import org.projectnessie.catalog.service.api.CatalogService;
import org.projectnessie.catalog.service.api.SnapshotFormat;
import org.projectnessie.catalog.service.api.SnapshotReqParams;
import org.projectnessie.catalog.service.api.SnapshotResponse;
import org.projectnessie.catalog.service.config.CatalogConfig;
Expand Down Expand Up @@ -122,7 +121,6 @@ public class CatalogServiceImpl implements CatalogService {
public Stream<Supplier<CompletionStage<SnapshotResponse>>> retrieveSnapshots(
SnapshotReqParams reqParams,
List<ContentKey> keys,
CatalogUriResolver catalogUriResolver,
Consumer<Reference> effectiveReferenceConsumer)
throws NessieNotFoundException {
ParsedReference reference = reqParams.ref();
Expand Down Expand Up @@ -164,23 +162,17 @@ public Stream<Supplier<CompletionStage<SnapshotResponse>>> retrieveSnapshots(
reference.hashWithRelativeSpec(),
key);
CompletionStage<NessieEntitySnapshot<?>> snapshotStage =
icebergStuff.retrieveIcebergSnapshot(
snapshotId, c.getContent(), reqParams.snapshotFormat());
icebergStuff.retrieveIcebergSnapshot(snapshotId, c.getContent());
return snapshotStage.thenApply(
snapshot ->
snapshotResponse(
key, reqParams, catalogUriResolver, snapshot, effectiveReference));
snapshot -> snapshotResponse(key, reqParams, snapshot, effectiveReference));
};
})
.filter(Objects::nonNull);
}

@Override
public CompletionStage<SnapshotResponse> retrieveSnapshot(
SnapshotReqParams reqParams,
ContentKey key,
CatalogUriResolver catalogUriResolver,
Content.Type expectedType)
SnapshotReqParams reqParams, ContentKey key, Content.Type expectedType)
throws NessieNotFoundException {

ParsedReference reference = reqParams.ref();
Expand All @@ -207,26 +199,24 @@ public CompletionStage<SnapshotResponse> retrieveSnapshot(

CompletionStage<NessieEntitySnapshot<?>> snapshotStage =
new IcebergStuff(objectIO, persist, tasksService, executor)
.retrieveIcebergSnapshot(snapshotId, content, reqParams.snapshotFormat());
.retrieveIcebergSnapshot(snapshotId, content);

return snapshotStage.thenApply(
snapshot ->
snapshotResponse(key, reqParams, catalogUriResolver, snapshot, effectiveReference));
snapshot -> snapshotResponse(key, reqParams, snapshot, effectiveReference));
}

private SnapshotResponse snapshotResponse(
ContentKey key,
SnapshotReqParams reqParams,
CatalogUriResolver catalogUriResolver,
NessieEntitySnapshot<?> snapshot,
Reference effectiveReference) {
if (snapshot instanceof NessieTableSnapshot) {
return snapshotTableResponse(
key, reqParams, catalogUriResolver, (NessieTableSnapshot) snapshot, effectiveReference);
key, reqParams, (NessieTableSnapshot) snapshot, effectiveReference);
}
if (snapshot instanceof NessieViewSnapshot) {
return snapshotViewResponse(
key, reqParams, catalogUriResolver, (NessieViewSnapshot) snapshot, effectiveReference);
key, reqParams, (NessieViewSnapshot) snapshot, effectiveReference);
}
throw new IllegalArgumentException(
"Unsupported snapshot type " + snapshot.getClass().getSimpleName());
Expand All @@ -235,7 +225,6 @@ private SnapshotResponse snapshotResponse(
private SnapshotResponse snapshotTableResponse(
ContentKey key,
SnapshotReqParams reqParams,
CatalogUriResolver catalogUriResolver,
NessieTableSnapshot snapshot,
Reference effectiveReference) {
Object result;
Expand Down Expand Up @@ -282,7 +271,6 @@ private SnapshotResponse snapshotTableResponse(
private SnapshotResponse snapshotViewResponse(
ContentKey key,
SnapshotReqParams reqParams,
CatalogUriResolver catalogUriResolver,
NessieViewSnapshot snapshot,
Reference effectiveReference) {
Object result;
Expand Down Expand Up @@ -328,10 +316,7 @@ private SnapshotResponse snapshotViewResponse(

@Override
public CompletionStage<Stream<SnapshotResponse>> commit(
ParsedReference reference,
CatalogCommit commit,
SnapshotReqParams reqParams,
CatalogUriResolver catalogUriResolver)
ParsedReference reference, CatalogCommit commit, SnapshotReqParams reqParams)
throws BaseNessieClientServerException {

GetContentBuilder contentRequest =
Expand Down Expand Up @@ -421,7 +406,6 @@ public CompletionStage<Stream<SnapshotResponse>> commit(
snapshotResponse(
singleTableUpdate.key,
reqParams,
catalogUriResolver,
singleTableUpdate.snapshot,
commitResponse.getTargetBranch()));
} catch (Exception e) {
Expand Down Expand Up @@ -642,14 +626,14 @@ private CompletionStage<NessieTableSnapshot> loadExistingTableSnapshot(Content c
throws NessieContentNotFoundException {
ObjId snapshotId = snapshotIdFromContent(content);
return new IcebergStuff(objectIO, persist, tasksService, executor)
.retrieveIcebergSnapshot(snapshotId, content, SnapshotFormat.NESSIE_SNAPSHOT);
.retrieveIcebergSnapshot(snapshotId, content);
}

private CompletionStage<NessieViewSnapshot> loadExistingViewSnapshot(Content content)
throws NessieContentNotFoundException {
ObjId snapshotId = snapshotIdFromContent(content);
return new IcebergStuff(objectIO, persist, tasksService, executor)
.retrieveIcebergSnapshot(snapshotId, content, SnapshotFormat.NESSIE_SNAPSHOT);
.retrieveIcebergSnapshot(snapshotId, content);
}

private IcebergTableMetadata storeTableSnapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.model.snapshot.NessieViewSnapshot;
import org.projectnessie.catalog.service.api.SnapshotFormat;
import org.projectnessie.model.Content;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
Expand Down Expand Up @@ -59,15 +58,15 @@ public IcebergStuff(
* imported from the data lake into the Nessie Data Catalog's database.
*/
public <S extends NessieEntitySnapshot<?>> CompletionStage<S> retrieveIcebergSnapshot(
ObjId snapshotId, Content content, SnapshotFormat format) {
ObjId snapshotId, Content content) {
EntitySnapshotTaskRequest snapshotTaskRequest =
entitySnapshotTaskRequest(snapshotId, content, null, persist, objectIO, executor);
return triggerIcebergSnapshot(format, snapshotTaskRequest);
return triggerIcebergSnapshot(snapshotTaskRequest);
}

@SuppressWarnings("unchecked")
private <S extends NessieEntitySnapshot<?>> CompletionStage<S> triggerIcebergSnapshot(
SnapshotFormat format, EntitySnapshotTaskRequest snapshotTaskRequest) {
EntitySnapshotTaskRequest snapshotTaskRequest) {
// TODO Handle hash-collision - when entity-snapshot refers to a different(!) snapshot
return tasksService
.forPersist(persist)
Expand All @@ -91,7 +90,7 @@ public <S extends NessieEntitySnapshot<?>> CompletionStage<S> storeSnapshot(
EntitySnapshotTaskRequest snapshotTaskRequest =
entitySnapshotTaskRequest(
nessieIdToObjId(snapshot.id()), content, snapshot, persist, objectIO, executor);
return triggerIcebergSnapshot(SnapshotFormat.ICEBERG_TABLE_METADATA, snapshotTaskRequest);
return triggerIcebergSnapshot(snapshotTaskRequest);
}

/** Fetch requested metadata from the database, the snapshot already exists. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.projectnessie.catalog.files.local.LocalObjectIO;
import org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures;
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.service.api.SnapshotFormat;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.nessie.tasks.async.pool.JavaPoolTasksAsync;
import org.projectnessie.nessie.tasks.service.TasksServiceConfig;
Expand Down Expand Up @@ -115,8 +114,7 @@ public void icebergTableImports(
IcebergTable.of(icebergTableMetadata, 1, 1, 1, 1, randomUUID().toString());

CompletionStage<NessieTableSnapshot> stage =
icebergStuff.retrieveIcebergSnapshot(
snapshotId, icebergTable, SnapshotFormat.NESSIE_SNAPSHOT);
icebergStuff.retrieveIcebergSnapshot(snapshotId, icebergTable);
NessieTableSnapshot snapshot = stage.toCompletableFuture().get(1, TimeUnit.MINUTES);
soft.assertThat(snapshot).isNotNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ Uni<Response> snapshotBased(
Uni<SnapshotResponse> snapshotResponse(
ContentKey key, SnapshotReqParams snapshotReqParams, Content.Type expectedType)
throws NessieNotFoundException {
CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo);

return Uni.createFrom()
.completionStage(
catalogService.retrieveSnapshot(
snapshotReqParams, key, catalogUriResolver, expectedType));
.completionStage(catalogService.retrieveSnapshot(snapshotReqParams, key, expectedType));
}

private static Response snapshotToResponse(SnapshotResponse snapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignRequest;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignResponse;
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.CatalogService;
import org.projectnessie.catalog.service.api.SnapshotReqParams;
import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind;

Expand Down Expand Up @@ -172,12 +171,10 @@ public Uni<Void> commitTransaction(

SnapshotReqParams reqParams = SnapshotReqParams.forSnapshotHttpReq(ref, "iceberg", null);

CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo);

// Although we don't return anything, need to make sure that the commit operation starts and all
// results are consumed.
return Uni.createFrom()
.completionStage(catalogService.commit(ref, commit.build(), reqParams, catalogUriResolver))
.completionStage(catalogService.commit(ref, commit.build(), reqParams))
.map(stream -> stream.reduce(null, (ident, snap) -> ident, (i1, i2) -> i1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException;
import org.projectnessie.catalog.service.api.CatalogService;
import org.projectnessie.catalog.service.api.SnapshotReqParams;
import org.projectnessie.catalog.service.api.SnapshotResponse;
import org.projectnessie.catalog.service.config.WarehouseConfig;
Expand Down Expand Up @@ -518,11 +517,8 @@ Uni<SnapshotResponse> createOrUpdateTable(
SnapshotReqParams reqParams =
SnapshotReqParams.forSnapshotHttpReq(tableRef.reference(), "iceberg", null);

CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo);

return Uni.createFrom()
.completionStage(
catalogService.commit(tableRef.reference(), commit, reqParams, catalogUriResolver))
.completionStage(catalogService.commit(tableRef.reference(), commit, reqParams))
.map(Stream::findFirst)
.map(Optional::orElseThrow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement;
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException;
import org.projectnessie.catalog.service.api.CatalogService;
import org.projectnessie.catalog.service.api.SnapshotReqParams;
import org.projectnessie.catalog.service.api.SnapshotResponse;
import org.projectnessie.catalog.service.config.WarehouseConfig;
Expand Down Expand Up @@ -316,11 +315,8 @@ Uni<SnapshotResponse> createOrUpdateView(
SnapshotReqParams reqParams =
SnapshotReqParams.forSnapshotHttpReq(tableRef.reference(), "iceberg", null);

CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo);

return Uni.createFrom()
.completionStage(
catalogService.commit(tableRef.reference(), commit, reqParams, catalogUriResolver))
.completionStage(catalogService.commit(tableRef.reference(), commit, reqParams))
.map(Stream::findFirst)
.map(Optional::orElseThrow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.projectnessie.catalog.files.api.SigningRequest;
import org.projectnessie.catalog.files.api.SigningResponse;
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.CatalogService;
import org.projectnessie.catalog.service.api.SnapshotReqParams;
import org.projectnessie.catalog.service.api.SnapshotResponse;
import org.projectnessie.error.BaseNessieClientServerException;
Expand All @@ -73,17 +72,14 @@ public Multi<Object> tableSnapshots(
throws NessieNotFoundException {
SnapshotReqParams reqParams = forSnapshotHttpReq(parseRefPathString(ref), format, specVersion);

CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo);

AtomicReference<Reference> effectiveReference = new AtomicReference<>();

// The order of the returned items does not necessarily match the order of the requested items,
// Nessie's getContents() does neither.

// This operation can block --> @Blocking
Stream<Supplier<CompletionStage<SnapshotResponse>>> snapshots =
catalogService.retrieveSnapshots(
reqParams, keys, catalogUriResolver, effectiveReference::set);
catalogService.retrieveSnapshots(reqParams, keys, effectiveReference::set);

Multi<Object> multi =
Multi.createFrom()
Expand Down Expand Up @@ -133,10 +129,8 @@ public Uni<Response> commit(

SnapshotReqParams reqParams = forSnapshotHttpReq(reference, format, specVersion);

CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo);

return Uni.createFrom()
.completionStage(catalogService.commit(reference, commit, reqParams, catalogUriResolver))
.completionStage(catalogService.commit(reference, commit, reqParams))
.map(v -> Response.ok().build());
}

Expand Down
Loading