Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support restore/rollback sync during conversion (1/2) #569

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@
@Value
@Builder
public class InternalSnapshot {
public static final String DEFAULT_IDENTIFIER = "";

// The instant of the Snapshot
String version;
// The source table snapshot identifier
// Snapshot ID in Iceberg, version ID in Delta, and instant <timestamp_action> in Hudi
@Builder.Default String sourceIdentifier = DEFAULT_IDENTIFIER;
// Table reference
InternalTable table;
// Data files grouped by partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public class TableChange {

/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

String sourceIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class TableSyncMetadata {
/** Property name for the XTABLE metadata in the table metadata/properties */
public static final String XTABLE_METADATA = "XTABLE_METADATA";

public static final String XTABLE_SOURCE_IDENTIFIER = "XTABLE_SOURCE_IDENTIFIER";

Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit, the identifier defined as 1. Snapshot ID in
* Iceberg 2. Version ID in Delta 3. timestamp in Hudi
*
* @param commit The provided commit
* @return the string version of commit identifier
*/
String getCommitIdentifier(COMMIT commit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public interface ConversionTarget {
* Starts the sync and performs any initialization required
*
* @param table the table that will be synced
* @param sourceIdentifier
*/
void beginSync(InternalTable table);
void beginSync(InternalTable table, String sourceIdentifier);

/** Completes the sync and performs any cleanup required. */
void completeSync();
Expand All @@ -90,4 +91,7 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/** Return the commit identifier from target table */
Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Map<String, SyncResult> syncSnapshot(
internalTable,
target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
snapshot.getPendingCommits()));
snapshot.getPendingCommits(),
snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
Expand Down Expand Up @@ -121,7 +122,8 @@ public Map<String, List<SyncResult>> syncChanges(
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
changes.getPendingCommits()));
changes.getPendingCommits(),
change.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e));
Expand Down Expand Up @@ -149,9 +151,10 @@ private SyncResult getSyncResult(
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
List<Instant> pendingCommits) {
List<Instant> pendingCommits,
String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
conversionTarget.beginSync(tableState, sourceIdentifier);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
// sync partition updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ void syncSnapshotWithFailureForOneFormat() {
.table(startingTableState)
.partitionedDataFiles(fileGroups)
.pendingCommits(pendingCommitInstants)
.sourceIdentifier("1")
.build();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
doThrow(new RuntimeException("Failure"))
.when(mockConversionTarget1)
.beginSync(startingTableState);
.beginSync(startingTableState, snapshot.getSourceIdentifier());
Map<String, SyncResult> result =
TableFormatSync.getInstance()
.syncSnapshot(Arrays.asList(mockConversionTarget1, mockConversionTarget2), snapshot);
Expand All @@ -106,7 +107,10 @@ void syncSnapshotWithFailureForOneFormat() {
failureResult.getStatus());

verifyBaseConversionTargetCalls(
mockConversionTarget2, startingTableState, pendingCommitInstants);
mockConversionTarget2,
startingTableState,
pendingCommitInstants,
snapshot.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForSnapshot(fileGroups);
verify(mockConversionTarget2).completeSync();
verify(mockConversionTarget1, never()).completeSync();
Expand All @@ -124,22 +128,36 @@ void syncChangesWithFailureForOneFormat() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("1")
.build();
InternalTable tableState2 = getTableState(2);
DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
TableChange.builder()
.tableAsOfChange(tableState2)
.filesDiff(dataFilesDiff2)
.sourceIdentifier("2")
.build();
InternalTable tableState3 = getTableState(3);
DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
TableChange.builder()
.tableAsOfChange(tableState3)
.filesDiff(dataFilesDiff3)
.sourceIdentifier("3")
.build();

List<Instant> pendingCommitInstants = Collections.singletonList(Instant.now());
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
// throw exception on second change and show that first change is still returned for this format
// and other conversionTarget is not affected
doThrow(new RuntimeException("Failure")).when(mockConversionTarget1).beginSync(tableState2);
doThrow(new RuntimeException("Failure"))
.when(mockConversionTarget1)
.beginSync(tableState2, tableChange2.getSourceIdentifier());

List<TableChange> tableChanges = Arrays.asList(tableChange1, tableChange2, tableChange3);
IncrementalTableChanges incrementalTableChanges =
Expand Down Expand Up @@ -194,13 +212,29 @@ void syncChangesWithFailureForOneFormat() {
assertSyncResultTimes(successResults.get(i), start);
}

verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(1)).completeSync();
verify(mockConversionTarget2, times(3)).completeSync();
Expand Down Expand Up @@ -280,15 +314,31 @@ void syncChangesWithDifferentFormatsAndMetadata() {
}

// conversionTarget1 syncs table changes 1 and 3
verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(2)).completeSync();
// conversionTarget2 syncs table changes 2 and 3
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget2, times(2)).completeSync();
}
Expand All @@ -299,7 +349,11 @@ void syncChangesOneFormatWithNoRequiredChanges() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("1")
.build();

List<Instant> pendingCommitInstants = Collections.emptyList();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
Expand Down Expand Up @@ -334,11 +388,15 @@ void syncChangesOneFormatWithNoRequiredChanges() {
assertSyncResultTimes(syncResult, start);
});

verify(mockConversionTarget1, never()).beginSync(any());
verify(mockConversionTarget1, never()).beginSync(any(), any());
verify(mockConversionTarget1, never()).syncFilesForDiff(any());
verify(mockConversionTarget1, never()).completeSync();

verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
}

Expand Down Expand Up @@ -379,8 +437,9 @@ private DataFilesDiff getFilesDiff(int id) {
private void verifyBaseConversionTargetCalls(
ConversionTarget mockConversionTarget,
InternalTable startingTableState,
List<Instant> pendingCommitInstants) {
verify(mockConversionTarget).beginSync(startingTableState);
List<Instant> pendingCommitInstants,
String sourceIdentifier) {
verify(mockConversionTarget).beginSync(startingTableState, sourceIdentifier);
verify(mockConversionTarget).syncSchema(startingTableState.getReadSchema());
verify(mockConversionTarget).syncPartitionSpec(startingTableState.getPartitioningFields());
verify(mockConversionTarget)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public InternalSnapshot getCurrentSnapshot() {
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(getInternalDataFiles(snapshot, table.getReadSchema()))
.sourceIdentifier(getCommitIdentifier(snapshot.version()))
.build();
}

Expand Down Expand Up @@ -125,7 +126,11 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
}
DataFilesDiff dataFilesDiff =
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
return TableChange.builder()
.tableAsOfChange(tableAtVersion)
.filesDiff(dataFilesDiff)
.sourceIdentifier(getCommitIdentifier(versionNumber))
.build();
}

@Override
Expand Down Expand Up @@ -158,6 +163,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
}

@Override
public String getCommitIdentifier(Long commit) {
return String.valueOf(commit);
}

private DeltaIncrementalChangesState getChangesState() {
return deltaIncrementalChangesState.orElseThrow(
() -> new IllegalStateException("DeltaIncrementalChangesState is not initialized"));
Expand Down
Loading