diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml
index ca06f7488aa6..dc184e459a4b 100644
--- a/cdap-common/src/main/resources/cdap-default.xml
+++ b/cdap-common/src/main/resources/cdap-default.xml
@@ -409,7 +409,8 @@
data.tx.snapshot.codecs
co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV1,
- co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV2
+ co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV2,
+ co.cask.tephra.snapshot.SnapshotCodecV3
Specifies the class names of all supported transaction state codecs
diff --git a/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/coprocessor/DefaultTransactionStateCache.java b/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/coprocessor/DefaultTransactionStateCache.java
index 1a975f18f1a4..321941e9f4c8 100644
--- a/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/coprocessor/DefaultTransactionStateCache.java
+++ b/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/coprocessor/DefaultTransactionStateCache.java
@@ -22,6 +22,7 @@
import co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV2;
import co.cask.cdap.data2.util.hbase.ConfigurationTable;
import co.cask.tephra.coprocessor.TransactionStateCache;
+import co.cask.tephra.snapshot.SnapshotCodecV3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -38,6 +39,7 @@ public class DefaultTransactionStateCache extends TransactionStateCache {
// DO NOT REMOVE
private static final SnapshotCodecV1 codecV1 = null;
private static final SnapshotCodecV2 codecV2 = null;
+ private static final SnapshotCodecV3 codecV3 = null;
private String sysConfigTablePrefix;
private ConfigurationTable configTable;
diff --git a/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV1.java b/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV1.java
index cbdedd2223b5..29791780f326 100644
--- a/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV1.java
+++ b/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV1.java
@@ -28,7 +28,9 @@
/**
* Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} and
* its elements to {@code byte[]}.
+ * @deprecated Replaced by use of {@code co.cask.tephra.snapshot.SnapshotCodecV3}.
*/
+@Deprecated
public class SnapshotCodecV1 extends AbstractSnapshotCodec {
public static final int VERSION = 1;
diff --git a/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV2.java b/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV2.java
index 0620e408b104..c7362b0ef80b 100644
--- a/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV2.java
+++ b/cdap-data-fabric/src/main/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecV2.java
@@ -28,7 +28,10 @@
/**
* Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
* and its elements to {@code byte[]}.
+ *
+ * @deprecated Replaced by use of {@code co.cask.tephra.snapshot.SnapshotCodecV3}.
*/
+@Deprecated
public class SnapshotCodecV2 extends AbstractSnapshotCodec {
public static final int VERSION = 2;
diff --git a/cdap-data-fabric/src/test/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecCompatibilityTest.java b/cdap-data-fabric/src/test/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecCompatibilityTest.java
index e36c4761a834..5f3b0c4ea4be 100644
--- a/cdap-data-fabric/src/test/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecCompatibilityTest.java
+++ b/cdap-data-fabric/src/test/java/co/cask/cdap/data2/transaction/snapshot/SnapshotCodecCompatibilityTest.java
@@ -17,31 +17,50 @@
package co.cask.cdap.data2.transaction.snapshot;
import co.cask.cdap.api.common.Bytes;
+import co.cask.cdap.common.conf.CConfiguration;
+import co.cask.cdap.common.conf.CConfigurationUtil;
import co.cask.tephra.ChangeId;
import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TransactionType;
import co.cask.tephra.TxConstants;
import co.cask.tephra.persist.TransactionSnapshot;
+import co.cask.tephra.persist.TransactionStateStorage;
+import co.cask.tephra.runtime.ConfigModule;
+import co.cask.tephra.runtime.DiscoveryModules;
+import co.cask.tephra.runtime.TransactionModules;
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
/**
* Tests snapshot codecs.
*/
public class SnapshotCodecCompatibilityTest {
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
@Test
public void testV1CodecV2Compat() throws Exception {
@@ -87,4 +106,130 @@ public void testV1CodecV2Compat() throws Exception {
assertEquals(snapshot, decoded);
}
+
+ /**
+ * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
+ * the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
+ * correctly when the snapshot is loaded.
+ */
+ @Test
+ public void testV2ToTephraV3Compatibility() throws Exception {
+ long now = System.currentTimeMillis();
+ long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
+ /*
+ * Snapshot consisting of transactions at:
+ */
+ long tInvalid = nowWritePointer - 5; // t1 - invalid
+ long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
+ long tLong = nowWritePointer - 3; // t3 - in-progress LONG
+ long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
+ long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
+
+ TreeMap inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
+ tLong, new TransactionManager.InProgressTx(readPtr,
+ TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
+ TransactionType.LONG),
+ tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+
+ TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
+ Lists.newArrayList(tInvalid), // invalid
+ inProgress,
+ ImmutableMap.>of(
+ tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
+ ImmutableMap.>of(
+ tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));
+
+ Configuration conf1 = new Configuration();
+ conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV2.class.getName());
+ SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ provider1.encode(out, snapshot);
+ } finally {
+ out.close();
+ }
+
+ TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
+ assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
+ assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
+ assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
+ // in-progress transactions will have missing types
+ assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
+ assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
+ assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());
+
+ // after fixing in-progress, full snapshot should match
+ Map fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
+ TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
+ assertEquals(snapshot.getInProgress(), fixedInProgress);
+ assertEquals(snapshot, snapshot2);
+ }
+
+ /**
+ * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
+ */
+ @Test
+ public void testV2ToTephraV3Migration() throws Exception {
+ File testDir = tmpDir.newFolder("testV2ToTephraV3Migration");
+ Configuration conf = new Configuration();
+ conf.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
+ SnapshotCodecV1.class.getName(), SnapshotCodecV2.class.getName());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+
+ Injector injector = Guice.createInjector(new ConfigModule(conf),
+ new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
+
+ TransactionManager txManager = injector.getInstance(TransactionManager.class);
+ txManager.startAndWait();
+
+ txManager.startLong();
+
+ // shutdown to force a snapshot
+ txManager.stopAndWait();
+
+ TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
+ txStorage.startAndWait();
+
+ // confirm that the in-progress entry is missing a type
+ TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+ assertNotNull(snapshot);
+ assertEquals(1, snapshot.getInProgress().size());
+ Map.Entry entry =
+ snapshot.getInProgress().entrySet().iterator().next();
+ assertNull(entry.getValue().getType());
+
+
+ // start a new Tx manager to test fixup
+ Configuration conf2 = new Configuration();
+ // make sure we work with the default CDAP conf for snapshot codecs
+ CConfiguration cconf = CConfiguration.create();
+ CConfigurationUtil.copyTxProperties(cconf, conf2);
+ // override snapshot dir
+ conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+
+ Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
+ new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
+
+ TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
+ txManager2.startAndWait();
+
+ // state should be recovered
+ TransactionSnapshot snapshot2 = txManager2.getCurrentState();
+ assertEquals(1, snapshot2.getInProgress().size());
+ Map.Entry inProgressTx =
+ snapshot2.getInProgress().entrySet().iterator().next();
+ assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());
+
+ // save a new snapshot
+ txManager2.stopAndWait();
+
+ TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
+ txStorage2.startAndWait();
+
+ TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
+ // full snapshot should have deserialized correctly without any fixups
+ assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
+ assertEquals(snapshot2, snapshot3);
+ }
}
diff --git a/pom.xml b/pom.xml
index a258bdacc9e5..67422e6d3e8a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
3.0.1
1.2.1
1.7.5
- 0.4.1
+ 0.4.2
0.9.0
0.5.0-incubating
2.3.6