Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support restore/rollback sync during conversion (1/2) #569

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

danielhumanmod
Copy link
Contributor

@danielhumanmod danielhumanmod commented Oct 27, 2024

Important Read

What is the purpose of the pull request

Previously, if a rollback/restore occurred in the source table, XTable would reflect it as file changes (added or deleted) in the target table. In this PR, we aim to improve this by issuing a rollback command in the target tables, ensuring more consistent histories between the source and target. This approach is also more efficient, as it allows us to restore directly to a specific version/snapshot instead of computing a large diff against the table’s current state.

Brief change log

This is the first part of this enhancement (1/2), focusing primarily on supporting commit-level information for all target table formats and the ability to locate certain target commit with given source identifier.

  1. Add a source identifier in target transaction
  • snapshot ID in Iceberg, version ID in Delta, and instant timestamp in Hudi
  1. Locate target commit with given source identifier

Blank diagram

Additional Info

Fallback scenarios

Fallback will happen when a rollback or restore is detected in the source table, but the corresponding commit is not found in the target table. We will still leverage the rollback information from the source, but this round of sync will be treated as file changes in the target table, following the previous behavior.

Here’s an example:

Iceberg (Source)          Delta (Target)  
┌────────────┐      ┌─────────────────────┐
│ Snapshot 0 │ ◀  ▶ │ Version 0 (Synced)  │  
│ Snapshot 1 │ ◀  ▶ │ Version 1 (Synced)  │  
│ Snapshot 2 │ ◀  ▶ │ Version 2 (Synced)  │  
│ Snapshot 3 │ ◀  ▶ │ Version 3 (Synced)  │  
│ Snapshot 4 │      │                     │  
│ Snapshot 5 │ ◀  ▶ │ Version 4 (Synced)  │
└────────────┘      └─────────────────────┘  
  • If rollback to Snapshot 0, 1, 2, 3, 5, can find a corresponding commit in target
  • If rollback to snapshot 4, can not find a corresponding commit in target. This makes it impossible to issue a rollback command, but we can still make use of the info of source snapshot to issue a file changes

In this case, we can not guarantee complete metadata consistency between the source and target, but it helps reduce some computation.

Verify this pull request

This pull request is already covered by existing tests, all existing tests should pass

@danielhumanmod danielhumanmod marked this pull request as draft October 27, 2024 23:22
@danielhumanmod danielhumanmod changed the title [Enhancement] Support restore/rollback sync during conversion (1/3) [Enhancement] Support restore/rollback sync during conversion (1/2) Oct 28, 2024
@@ -47,9 +49,20 @@ public IncrementalTableChanges extractTableChanges(
commitsBacklog.getCommitsToProcess().stream()
.map(conversionSource::getTableChangeForCommit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we'll want the identifier on the commit level, right?

Copy link
Contributor Author

@danielhumanmod danielhumanmod Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we'll want the identifier on the commit level, right?

Thanks for the response @the-other-tim-brown.

Yes, ideally, every commit in source table should directly map to one in target table. However, based on my understanding of how XTable works, this isn’t guaranteed. Instead, the mapping (Source -> Target) is more like a N:1 mapping, which means:

  • Every commit in the target table has a corresponding mapping in the source table.
  • Not every commit in the source table has a one-to-one mapping in the target table.

The reason is, between each sync(), there could be multiple changes on source, and all these changes will sync as only one commit in target, just like this example

Iceberg (Source)          Delta (Target)  
┌────────────┐      ┌─────────────────────┐
│ Snapshot 0 │ ◀  ▶ │ Version 0 (Synced)  │  (can map to snapshot 0)
│ Snapshot 1 │      │                     │  
│ Snapshot 2 │      │                     │  
│ Snapshot 3 │      │                     │  
│ Snapshot 4 │      │                     │  
│ Snapshot 5 │ ◀  ▶ │ Version 1 (Synced)  │ (can map to snapshot 5)
└────────────┘      └─────────────────────┘  

Given this, I’ve chosen to use the information from the latest commit in the source table as the source identifier.
But my understanding might be wrong, appreciate if there is any feedback or suggestion :)

Copy link
Contributor

@the-other-tim-brown the-other-tim-brown Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for snapshot sync but with incremental sync, there are multiple commits all synced to the target as their own commits. One thing we should confirm is whether we are able to track at a per commit level in each target. I am unsure if the metadata history is tracked in Iceberg and Delta. It is tracked in the Hudi target

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out—it’s an important detail! I’ll investigate further into the ability to track commit-level information in Iceberg and Delta. I will get back to you once I figure it out

@danielhumanmod
Copy link
Contributor Author

danielhumanmod commented Nov 20, 2024

Hi @the-other-tim-brown, based on my investigation, both Iceberg and Delta support storing commit-level information, but we might need to adjust our current code. Here’s a summary of the findings:

  • Iceberg
    • Can use the snapshot summary to store snapshot-level information.
    • However, we currently store sync metadata in properties, which is table-level information.
  • Delta
    • Can use commit tags to store commit-level information.
    • However, we currently store sync metadata in configurations, which is table-level information.

To align with these capabilities, some code adjustments may be needed for both Iceberg and Delta. I’ll start working on a proof of concept to explore this, and will get back to you once it’s completed.

@the-other-tim-brown
Copy link
Contributor

@danielhumanmod Thanks for looking into it. I think being able to store this at a commit level better aligns with my our intentions so it would be great to fix this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants