Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
sidhdirenge committed Dec 10, 2024
1 parent 738a83e commit a87128e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.cdap.cdap.spi.data.table.StructuredTableSchema;
import io.cdap.cdap.spi.data.table.StructuredTableSpecification;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,14 +82,6 @@ public void createOrUpdate(StructuredTableSpecification spec)
createTable(spec);
}

@Override
public void createOrUpdate(List<StructuredTableSpecification> specs)
throws IOException, TableSchemaIncompatibleException {
for (StructuredTableSpecification spec : specs) {
createOrUpdate(spec);
}
}

private void createTable(StructuredTableSpecification spec) throws IOException {
LOG.info("Creating table {} in namespace {}", spec, NamespaceId.SYSTEM);
DatasetAdmin indexTableAdmin = indexTableDefinition.getAdmin(SYSTEM_CONTEXT, indexTableSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,6 @@ public void createOrUpdate(StructuredTableSpecification spec)
createTable(spec);
}

@Override
public void createOrUpdate(List<StructuredTableSpecification> specs)
throws IOException, TableSchemaIncompatibleException {
for (StructuredTableSpecification spec : specs) {
createOrUpdate(spec);
}
}

@Override
public boolean exists(StructuredTableId tableId) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,37 @@ private StoreDefinition() {
* @param tableAdmin the table admin to create the table
*/
public static void createAllTables(StructuredTableAdmin tableAdmin) throws IOException {
ArtifactStore.create(tableAdmin);
OwnerStore.create(tableAdmin);
NamespaceStore.create(tableAdmin);
SecretStore.create(tableAdmin);
WorkflowStore.create(tableAdmin);
ConfigStore.create(tableAdmin);
PreferencesStore.create(tableAdmin);
ProvisionerStore.create(tableAdmin);
AppMetadataStore.create(tableAdmin);
ProfileStore.create(tableAdmin);
ProgramScheduleStore.create(tableAdmin);
DatasetInstanceStore.create(tableAdmin);
DatasetTypeStore.create(tableAdmin);
LineageStore.create(tableAdmin);
JobQueueStore.create(tableAdmin);
TimeScheduleStore.create(tableAdmin);
RemoteRuntimeStore.create(tableAdmin);
ProgramHeartbeatStore.create(tableAdmin);
LogCheckpointStore.create(tableAdmin);
UsageStore.create(tableAdmin);
FieldLineageStore.create(tableAdmin);
LogFileMetaStore.create(tableAdmin);
CapabilitiesStore.create(tableAdmin);
TetheringStore.create(tableAdmin);
AppStateStore.create(tableAdmin);
CredentialProviderStore.create(tableAdmin);
OperationRunsStore.create(tableAdmin);

// Please ensure createIfNotExists() is not followed by any other create calls.
// Please use register() in case of multiple table creation calls.
// Some structured table admin implementations batch these table creation calls for better performance.
ArtifactStore.register();
OwnerStore.register();
NamespaceStore.register();
SecretStore.register();
WorkflowStore.register();
ConfigStore.register();
PreferencesStore.register();
ProvisionerStore.register();
AppMetadataStore.register();
ProfileStore.register();
ProgramScheduleStore.register();
DatasetInstanceStore.register();
DatasetTypeStore.register();
LineageStore.register();
JobQueueStore.register();
TimeScheduleStore.register();
RemoteRuntimeStore.register();
ProgramHeartbeatStore.register();
LogCheckpointStore.register();
UsageStore.register();
FieldLineageStore.register();
LogFileMetaStore.register();
CapabilitiesStore.register();
TetheringStore.register();
AppStateStore.register();
CredentialProviderStore.register();
OperationRunsStore.register();

// Please ensure createIfNotExists() is not followed by any other register calls.
createIfNotExists(tableAdmin);
}

Expand All @@ -96,6 +98,9 @@ private static void createIfNotExists(StructuredTableAdmin admin) throws IOExcep
}
}

/**
* Adds table to list of tables to be created.
*/
private static void registerTable(StructuredTableSpecification spec) {
if (tableSpecifications == null) {
tableSpecifications = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,22 @@ public void testCreateOrUpdateWithListOfSpecs() throws Exception {
Assert.assertEquals(simpleTableSchema, convertSpecToCompatibleSchema(SIMPLE_TABLE_SPEC));
}

@Test
public void testCreateOrUpdateWithListOfSpecsTwice() throws Exception {
StructuredTableAdmin admin = getStructuredTableAdmin();

// Assert SIMPLE_TABLE Empty
Assert.assertFalse(admin.exists(SIMPLE_TABLE));

admin.createOrUpdate(Collections.singletonList(SIMPLE_TABLE_SPEC));
admin.createOrUpdate(Collections.singletonList(SIMPLE_TABLE_SPEC));
Assert.assertTrue(admin.exists(SIMPLE_TABLE));

// Assert SIMPLE_TABLE schema
StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE);
Assert.assertEquals(simpleTableSchema, convertSpecToCompatibleSchema(SIMPLE_TABLE_SPEC));
}

@Test
@Override
public void testInconsistentKeyOrderInSchema() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ default void createOrUpdate(StructuredTableSpecification spec)
*/
default void createOrUpdate(List<StructuredTableSpecification> specs)
throws IOException, TableSchemaIncompatibleException {
throw new UnsupportedOperationException("Storage SPI did not implement createOrUpdate.");
for (StructuredTableSpecification spec : specs) {
createOrUpdate(spec);
}
}

/**
Expand Down

0 comments on commit a87128e

Please sign in to comment.