diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUpgradeTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUpgradeTest.java index f953fa4095726..21527a5f3ae88 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUpgradeTest.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUpgradeTest.java @@ -20,13 +20,13 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; +import org.apache.flink.api.common.typeutils.TypeSerializerConditions; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; import org.apache.flink.api.java.typeutils.runtime.WritableSerializerUpgradeTest.WritableName; import org.apache.hadoop.io.Writable; -import org.hamcrest.Matcher; +import org.assertj.core.api.Condition; import java.io.DataInput; import java.io.DataOutput; @@ -35,8 +35,6 @@ import java.util.Collection; import java.util.Objects; -import static org.hamcrest.Matchers.is; - /** A {@link TypeSerializerUpgradeTestBase} for {@link WritableSerializer}. */ class WritableSerializerUpgradeTest extends TypeSerializerUpgradeTestBase { @@ -129,16 +127,16 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { + public Condition testDataCondition() { WritableName writable = new WritableName(); writable.setName("flink"); - return is(writable); + return new Condition<>(writable::equals, "writable is " + writable); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java index 492c911ba52a5..d035df43a64a4 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -263,7 +263,7 @@ private void testLoadingSchedulerTypeFromConfiguration( configFromConfiguration.configure( configuration, Thread.currentThread().getContextClassLoader()); - assertThat(configFromConfiguration.getSchedulerType()).contains(schedulerType); + assertThat(configFromConfiguration.getSchedulerType()).hasValue(schedulerType); } @Test diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java index 4f311d40fe69a..b3e6ed68c5ede 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java @@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; -public class DelimitedInputFormatSamplingTest { +class DelimitedInputFormatSamplingTest { private static final String TEST_DATA1 = "123456789\n" @@ -77,7 +77,7 @@ public class DelimitedInputFormatSamplingTest { // ======================================================================== @BeforeAll - public static void initialize() { + static void initialize() { try { testTempFolder = TempDirUtils.newFolder(tempDir); // make sure we do 4 samples diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 8e9f62ca3c9cc..4d6a1371a3504 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -53,13 +53,13 @@ class DelimitedInputFormatTest { // -------------------------------------------------------------------------------------------- @BeforeEach - public void setup() { + void setup() { format = new MyTextInputFormat(); this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read")); } @AfterEach - public void shutdown() throws Exception { + void shutdown() throws Exception { if (this.format != null) { this.format.close(); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java index 051e6c87a1199..815981c2a18e6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java @@ -45,13 +45,13 @@ class EnumerateNestedFilesTest { private DummyFileInputFormat format; @BeforeEach - public void setup() { + void setup() { this.config = new Configuration(); format = new DummyFileInputFormat(); } @AfterEach - public void setdown() throws Exception { + void setdown() throws Exception { if (this.format != null) { this.format.close(); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index 84a9bcfdd5529..265f1d4c0f130 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -53,13 +53,13 @@ class GenericCsvInputFormatTest { // -------------------------------------------------------------------------------------------- @BeforeEach - public void setup() { + void setup() { format = new TestCsvInputFormat(); format.setFilePath("file:///some/file/that/will/not/be/read"); } @AfterEach - public void setdown() throws Exception { + void setdown() throws Exception { if (this.format != null) { this.format.close(); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java index 2c8bd032b11ef..7a37624fd313c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java @@ -42,7 +42,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; @SuppressWarnings("unused") -public class ExpressionKeysTest { +class ExpressionKeysTest { @Test void testBasicType() { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/OrderingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/OrderingTest.java index 28e81fc0c3e7c..5a48a207e3dc3 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/OrderingTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/OrderingTest.java @@ -22,7 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; -public class OrderingTest { +class OrderingTest { @Test void testNewOrdering() { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java index 2d8de569f3eb6..c6fb4988c1ebf 100755 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java @@ -141,12 +141,12 @@ void testMerge() { ResourceSpec rs3 = rs1.merge(rs2); assertThat(rs3.getCpuCores()).isEqualTo(new CPUResource(2.0)); assertThat(rs3.getTaskHeapMemory().getMebiBytes()).isEqualTo(200); - assertThat(rs3.getExtendedResource(EXTERNAL_RESOURCE_NAME).get()) - .isEqualTo(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1.1)); + assertThat(rs3.getExtendedResource(EXTERNAL_RESOURCE_NAME)) + .hasValue(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1.1)); ResourceSpec rs4 = rs1.merge(rs3); - assertThat(rs4.getExtendedResource(EXTERNAL_RESOURCE_NAME).get()) - .isEqualTo(new ExternalResource(EXTERNAL_RESOURCE_NAME, 2.2)); + assertThat(rs4.getExtendedResource(EXTERNAL_RESOURCE_NAME)) + .hasValue(new ExternalResource(EXTERNAL_RESOURCE_NAME, 2.2)); } @Test @@ -228,8 +228,8 @@ void testSubtract() { final ResourceSpec subtracted = rs1.subtract(rs2); assertThat(subtracted.getCpuCores()).isEqualTo(new CPUResource(0.8)); assertThat(subtracted.getTaskHeapMemory().getMebiBytes()).isZero(); - assertThat(subtracted.getExtendedResource(EXTERNAL_RESOURCE_NAME).get()) - .isEqualTo(new ExternalResource(EXTERNAL_RESOURCE_NAME, 0.6)); + assertThat(subtracted.getExtendedResource(EXTERNAL_RESOURCE_NAME)) + .contains(new ExternalResource(EXTERNAL_RESOURCE_NAME, 0.6)); } @Test diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java index 64d89229b5c7a..3b5756337661e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java @@ -45,10 +45,10 @@ void testBuildSlotSharingGroupWithSpecificResource() { .build(); assertThat(slotSharingGroup.getName()).isEqualTo(name); - assertThat(slotSharingGroup.getCpuCores()).contains(1.0); - assertThat(slotSharingGroup.getTaskHeapMemory()).contains(heap); - assertThat(slotSharingGroup.getTaskOffHeapMemory()).contains(offHeap); - assertThat(slotSharingGroup.getManagedMemory()).contains(managed); + assertThat(slotSharingGroup.getCpuCores()).hasValue(1.0); + assertThat(slotSharingGroup.getTaskHeapMemory()).hasValue(heap); + assertThat(slotSharingGroup.getTaskOffHeapMemory()).hasValue(offHeap); + assertThat(slotSharingGroup.getManagedMemory()).hasValue(managed); assertThat(slotSharingGroup.getExternalResources()) .isEqualTo(Collections.singletonMap("gpu", 1.0)); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 41d1db230aa9d..6e7b7fdee6d02 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -42,7 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** The test for flat map operator. */ -public class FlatMapOperatorCollectionTest implements Serializable { +class FlatMapOperatorCollectionTest implements Serializable { @Test void testExecuteOnCollection() throws Exception { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index 3ca873ca97528..7dcbaf41ae115 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -45,7 +45,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** The test for inner join operator. */ -public class InnerJoinOperatorBaseTest implements Serializable { +class InnerJoinOperatorBaseTest implements Serializable { @Test void testJoinPlain() throws Exception { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 36ac10c48814e..91067617ecfe8 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -44,7 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** The test for map operator. */ -public class MapOperatorTest implements java.io.Serializable { +class MapOperatorTest implements java.io.Serializable { @Test void testMapPlain() throws Exception { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java index 7b5e3dae8cc79..9e89130c26804 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java @@ -48,7 +48,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** The test for outer join operator. */ -public class OuterJoinOperatorBaseTest implements Serializable { +class OuterJoinOperatorBaseTest implements Serializable { private MockRichFlatJoinFunction joiner; @@ -61,7 +61,7 @@ public class OuterJoinOperatorBaseTest implements Serializable { @SuppressWarnings({"rawtypes", "unchecked"}) @BeforeEach - public void setup() { + void setup() { joiner = new MockRichFlatJoinFunction(); baseOperator = diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index d68ade163d40b..82b60ec4d8536 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -42,7 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** The test for partition map operator. */ -public class PartitionMapOperatorTest implements java.io.Serializable { +class PartitionMapOperatorTest implements java.io.Serializable { @Test void testMapPartitionWithRuntimeContext() throws Exception { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java index 2401930d551b6..29fd5cb777a23 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java @@ -27,7 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link SlotSharingGroupUtils}. */ -public class SlotSharingGroupUtilsTest { +class SlotSharingGroupUtilsTest { @Test void testCovertToResourceSpec() { final ExternalResource gpu = new ExternalResource("gpu", 1); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java index b087ee3facada..5c20d35dea11c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java @@ -58,8 +58,8 @@ void testReadingDefaultConfig() { @Test void testDoubleTypeRegistration() { SerializerConfig config = new SerializerConfigImpl(); - List> types = Arrays.>asList(Double.class, Integer.class, Double.class); - List> expectedTypes = Arrays.>asList(Double.class, Integer.class); + List> types = Arrays.asList(Double.class, Integer.class, Double.class); + List> expectedTypes = Arrays.asList(Double.class, Integer.class); for (Class tpe : types) { config.registerKryoType(tpe); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java index bac9f7c5fb1ca..22b852bd75988 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java @@ -31,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link ValueStateDescriptor}. */ -public class ValueStateDescriptorTest { +class ValueStateDescriptorTest { @Test void testHashCodeEquals() throws Exception { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java index 5eea533a9dec1..5e9526fbed08f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java @@ -26,14 +26,14 @@ import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; import org.apache.flink.types.Either; -import org.hamcrest.Matcher; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; -import static org.hamcrest.Matchers.is; +import static org.assertj.core.api.Assertions.assertThat; /** A {@link TypeSerializerUpgradeTestBase} for {@link GenericArraySerializer}. */ class CompositeTypeSerializerUpgradeTest extends TypeSerializerUpgradeTestBase { @@ -90,14 +90,16 @@ public TypeSerializer> createUpgradedSerializer() { } @Override - public Matcher> testDataMatcher() { - return is(new Either.Left<>("ApacheFlink")); + public Condition> testDataCondition() { + return new Condition<>( + value -> new Either.Left<>("ApacheFlink").equals(value), + "value is Either.Left(\"ApacheFlink\")"); } @Override - public Matcher>> - schemaCompatibilityMatcher(FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition>> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -134,15 +136,16 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { + public Condition testDataCondition() { String[] data = {"Apache", "Flink"}; - return is(data); + return new Condition<>( + value -> Arrays.equals(data, value), "data is " + Arrays.toString(data)); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java index bf5b3583b5c87..b688f1ac0594e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java @@ -29,7 +29,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; /** Tests for the {@link CompositeTypeSerializerUtil}. */ class CompositeTypeSerializerUtilTest { @@ -62,11 +61,11 @@ void testCompatibleAsIsIntermediateCompatibilityResult() { assertThat(intermediateCompatibilityResult.isCompatibleAsIs()).isTrue(); assertThat(intermediateCompatibilityResult.getFinalResult().isCompatibleAsIs()).isTrue(); - assertArrayEquals( - Arrays.stream(newSerializerSnapshots) - .map(TypeSerializerSnapshot::restoreSerializer) - .toArray(), - intermediateCompatibilityResult.getNestedSerializers()); + assertThat(intermediateCompatibilityResult.getNestedSerializers()) + .containsExactly( + Arrays.stream(newSerializerSnapshots) + .map(TypeSerializerSnapshot::restoreSerializer) + .toArray(TypeSerializer[]::new)); } @Test diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConditions.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConditions.java new file mode 100644 index 0000000000000..c33f7526afea9 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConditions.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.assertj.core.api.Condition; + +import java.util.function.Predicate; + +/** + * A Collection of useful {@link Condition}s for {@link TypeSerializer} and {@link + * TypeSerializerSchemaCompatibility}. + */ +public final class TypeSerializerConditions { + + private TypeSerializerConditions() {} + + public static Condition> isCompatibleAsIs() { + return new Condition<>( + TypeSerializerSchemaCompatibility::isCompatibleAsIs, + "type serializer schema that is a compatible as is"); + } + + public static Condition> isIncompatible() { + return new Condition<>( + TypeSerializerSchemaCompatibility::isIncompatible, + "type serializer schema that is incompatible"); + } + + public static Condition> isCompatibleAfterMigration() { + return new Condition<>( + TypeSerializerSchemaCompatibility::isCompatibleAfterMigration, + "type serializer schema that is compatible after migration"); + } + + public static + Condition> + isCompatibleWithReconfiguredSerializer() { + return new Condition<>( + TypeSerializerSchemaCompatibility::isCompatibleWithReconfiguredSerializer, + "type serializer schema that is compatible with a reconfigured serializer"); + } + + public static + Condition> isCompatibleWithReconfiguredSerializer( + Predicate> reconfiguredSerializerMatcher) { + return new Condition<>( + compatibility -> + compatibility.isCompatibleWithReconfiguredSerializer() + && reconfiguredSerializerMatcher.test( + compatibility.getReconfiguredSerializer()), + "type serializer schema that is compatible with a reconfigured serializer matching " + + reconfiguredSerializerMatcher); + } + + public static Condition> hasSameCompatibilityAs( + TypeSerializerSchemaCompatibility expectedCompatibility) { + return new Condition<>( + actual -> + actual.isCompatibleAsIs() == expectedCompatibility.isCompatibleAsIs() + && actual.isIncompatible() == expectedCompatibility.isIncompatible() + && actual.isCompatibleAfterMigration() + == expectedCompatibility.isCompatibleAfterMigration() + && actual.isCompatibleWithReconfiguredSerializer() + == expectedCompatibility + .isCompatibleWithReconfiguredSerializer(), + "same compatibility as " + expectedCompatibility); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java deleted file mode 100644 index 841d8ccc3cd0a..0000000000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils; - -import org.hamcrest.CoreMatchers; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; -import org.hamcrest.TypeSafeMatcher; - -import java.util.function.Predicate; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link - * TypeSerializerSchemaCompatibility}. - */ -public final class TypeSerializerMatchers { - - private TypeSerializerMatchers() {} - - // ------------------------------------------------------------------------------------------------------------- - // Matcher Factories - // ------------------------------------------------------------------------------------------------------------- - - /** - * Matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}. - * - * @param element type - * @return a {@code Matcher} that matches {@code compatibleAsIs} {@link - * TypeSerializerSchemaCompatibility}. - */ - public static Matcher> isCompatibleAsIs() { - return propertyMatcher( - TypeSerializerSchemaCompatibility::isCompatibleAsIs, - "type serializer schema that is a compatible as is"); - } - - /** - * Matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}. - * - * @param element type - * @return a {@code Matcher} that matches {@code isIncompatible} {@link - * TypeSerializerSchemaCompatibility}. - */ - public static Matcher> isIncompatible() { - return propertyMatcher( - TypeSerializerSchemaCompatibility::isIncompatible, - "type serializer schema that is incompatible"); - } - - /** - * Matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}. - * - * @param element type - * @return a {@code Matcher} that matches {@code isCompatibleAfterMigration} {@link - * TypeSerializerSchemaCompatibility}. - */ - public static Matcher> isCompatibleAfterMigration() { - return propertyMatcher( - TypeSerializerSchemaCompatibility::isCompatibleAfterMigration, - "type serializer schema that is compatible after migration"); - } - - /** - * Matches {@code isCompatibleWithReconfiguredSerializer} {@link - * TypeSerializerSchemaCompatibility}. - * - * @param element type - * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link - * TypeSerializerSchemaCompatibility}. - */ - public static - Matcher> isCompatibleWithReconfiguredSerializer() { - @SuppressWarnings("unchecked") - Matcher> anything = - (Matcher>) (Matcher) CoreMatchers.anything(); - - return new CompatibleAfterReconfiguration<>(anything); - } - - /** - * Matches {@code isCompatibleWithReconfiguredSerializer} {@link - * TypeSerializerSchemaCompatibility}. - * - * @param reconfiguredSerializerMatcher matches the reconfigured serializer. - * @param element type - * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link - * TypeSerializerSchemaCompatibility}. - */ - public static - Matcher> isCompatibleWithReconfiguredSerializer( - Matcher> reconfiguredSerializerMatcher) { - - return new CompatibleAfterReconfiguration<>(reconfiguredSerializerMatcher); - } - - /** - * Matches if the expected {@code TypeSerializerSchemaCompatibility} has the same compatibility - * as {@code expectedCompatibility}. - * - * @param expectedCompatibility the compatibility to match to. - * @param element type. - * @return a {@code Matcher} that matches if it has the same compatibility as {@code - * expectedCompatibility}. - */ - public static Matcher> hasSameCompatibilityAs( - TypeSerializerSchemaCompatibility expectedCompatibility) { - - return new SchemaCompatibilitySameAs<>(expectedCompatibility); - } - - // ------------------------------------------------------------------------------------------------------------- - // Helpers - // ------------------------------------------------------------------------------------------------------------- - - private static Matcher propertyMatcher( - Predicate predicate, String matcherDescription) { - return new TypeSafeMatcher() { - - @Override - protected boolean matchesSafely(T item) { - return predicate.test(item); - } - - @Override - public void describeTo(Description description) { - description.appendText(matcherDescription); - } - }; - } - - // ------------------------------------------------------------------------------------------------------------- - // Matchers - // ------------------------------------------------------------------------------------------------------------- - - private static final class CompatibleAfterReconfiguration - extends TypeSafeDiagnosingMatcher> { - - private final Matcher> reconfiguredSerializerMatcher; - - private CompatibleAfterReconfiguration( - Matcher> reconfiguredSerializerMatcher) { - this.reconfiguredSerializerMatcher = checkNotNull(reconfiguredSerializerMatcher); - } - - @Override - protected boolean matchesSafely( - TypeSerializerSchemaCompatibility item, Description mismatchDescription) { - if (!item.isCompatibleWithReconfiguredSerializer()) { - mismatchDescription.appendText( - "serializer schema is not compatible with a reconfigured serializer"); - return false; - } - TypeSerializer reconfiguredSerializer = item.getReconfiguredSerializer(); - if (!reconfiguredSerializerMatcher.matches(reconfiguredSerializer)) { - reconfiguredSerializerMatcher.describeMismatch( - reconfiguredSerializer, mismatchDescription); - return false; - } - return true; - } - - @Override - public void describeTo(Description description) { - description - .appendText("type serializer schema that is compatible after reconfiguration,") - .appendText("with a reconfigured serializer matching ") - .appendDescriptionOf(reconfiguredSerializerMatcher); - } - } - - private static class SchemaCompatibilitySameAs - extends TypeSafeMatcher> { - - private final TypeSerializerSchemaCompatibility expectedCompatibility; - - private SchemaCompatibilitySameAs( - TypeSerializerSchemaCompatibility expectedCompatibility) { - this.expectedCompatibility = checkNotNull(expectedCompatibility); - } - - @Override - protected boolean matchesSafely( - TypeSerializerSchemaCompatibility testResultCompatibility) { - if (expectedCompatibility.isCompatibleAsIs()) { - return testResultCompatibility.isCompatibleAsIs(); - } else if (expectedCompatibility.isIncompatible()) { - return testResultCompatibility.isIncompatible(); - } else if (expectedCompatibility.isCompatibleAfterMigration()) { - return testResultCompatibility.isCompatibleAfterMigration(); - } else if (expectedCompatibility.isCompatibleWithReconfiguredSerializer()) { - return testResultCompatibility.isCompatibleWithReconfiguredSerializer(); - } - return false; - } - - @Override - public void describeTo(Description description) { - description.appendText("same compatibility as ").appendValue(expectedCompatibility); - } - } -} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java index ba77f0e4bb13e..d16eb88681308 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java @@ -27,7 +27,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link TypeSerializerSnapshot} */ -public class TypeSerializerSnapshotTest { +class TypeSerializerSnapshotTest { @Test void testIllegalSchemaCompatibility() { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java index d304c9c0419a7..e9a644e3cf377 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java @@ -25,8 +25,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.test.util.MigrationTest; -import org.assertj.core.api.HamcrestCondition; -import org.hamcrest.Matcher; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -43,7 +42,6 @@ import static org.apache.flink.util.Preconditions.checkState; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; -import static org.hamcrest.CoreMatchers.not; /** A test base for testing {@link TypeSerializer} upgrades. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -98,15 +96,15 @@ public interface UpgradeVerifier { /** Creates a post-upgrade {@link TypeSerializer}. */ TypeSerializer createUpgradedSerializer(); - /** Returns a {@link Matcher} for asserting the deserialized test data. */ - Matcher testDataMatcher(); + /** Returns a {@link Condition} for asserting the deserialized test data. */ + Condition testDataCondition(); /** - * Returns a {@link Matcher} for comparing the {@link TypeSerializerSchemaCompatibility} + * Returns a {@link Condition} for comparing the {@link TypeSerializerSchemaCompatibility} * that the serializer upgrade produced with an expected {@link * TypeSerializerSchemaCompatibility}. */ - Matcher> schemaCompatibilityMatcher( + Condition> schemaCompatibilityCondition( FlinkVersion version); } @@ -176,19 +174,19 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { + public Condition testDataCondition() { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { - return delegateVerifier.testDataMatcher(); + return delegateVerifier.testDataCondition(); } } @Override - public Matcher> - schemaCompatibilityMatcher(FlinkVersion version) { + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) { - return delegateVerifier.schemaCompatibilityMatcher(version); + return delegateVerifier.schemaCompatibilityCondition(version); } } } @@ -275,14 +273,14 @@ void restoreSerializerIsValid( throws Exception { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) { - assumeThat(TypeSerializerSchemaCompatibility.incompatible()) + assumeThat( + testSpecification + .verifier + .schemaCompatibilityCondition(testSpecification.flinkVersion) + .matches(TypeSerializerSchemaCompatibility.incompatible())) .as( "This test only applies for test specifications that verify an upgraded serializer that is not incompatible.") - .is( - HamcrestCondition.matching( - not( - testSpecification.verifier.schemaCompatibilityMatcher( - testSpecification.flinkVersion)))); + .isFalse(); TypeSerializerSnapshot restoredSerializerSnapshot = snapshotUnderTest(testSpecification); @@ -292,7 +290,7 @@ void restoreSerializerIsValid( assertSerializerIsValid( restoredSerializer, dataUnderTest(testSpecification), - testSpecification.verifier.testDataMatcher()); + testSpecification.verifier.testDataCondition()); } } @@ -315,9 +313,8 @@ void upgradedSerializerHasExpectedSchemaCompatibility( assertThat(upgradeCompatibility) .is( - HamcrestCondition.matching( - testSpecification.verifier.schemaCompatibilityMatcher( - testSpecification.flinkVersion))); + (testSpecification.verifier.schemaCompatibilityCondition( + testSpecification.flinkVersion))); } } @@ -341,9 +338,7 @@ void upgradedSerializerIsValidAfterMigration( assumeThat(upgradeCompatibility) .as( "This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.") - .is( - HamcrestCondition.matching( - TypeSerializerMatchers.isCompatibleAfterMigration())); + .is(TypeSerializerConditions.isCompatibleAfterMigration()); // migrate the previous data schema, TypeSerializer restoreSerializer = @@ -353,11 +348,13 @@ void upgradedSerializerIsValidAfterMigration( dataUnderTest(testSpecification), restoreSerializer, upgradedSerializer, - testSpecification.verifier.testDataMatcher()); + testSpecification.verifier.testDataCondition()); // .. and then assert that the upgraded serializer is valid with the migrated data assertSerializerIsValid( - upgradedSerializer, migratedData, testSpecification.verifier.testDataMatcher()); + upgradedSerializer, + migratedData, + testSpecification.verifier.testDataCondition()); } } @@ -380,17 +377,14 @@ void upgradedSerializerIsValidAfterReconfiguration( assumeThat(upgradeCompatibility) .as( "This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.") - .is( - HamcrestCondition.matching( - TypeSerializerMatchers - .isCompatibleWithReconfiguredSerializer())); + .is(TypeSerializerConditions.isCompatibleWithReconfiguredSerializer()); TypeSerializer reconfiguredUpgradedSerializer = upgradeCompatibility.getReconfiguredSerializer(); assertSerializerIsValid( reconfiguredUpgradedSerializer, dataUnderTest(testSpecification), - testSpecification.verifier.testDataMatcher()); + testSpecification.verifier.testDataCondition()); } } @@ -413,12 +407,12 @@ void upgradedSerializerIsValidWhenCompatibleAsIs( assumeThat(upgradeCompatibility) .as( "This test only applies for test specifications that verify an upgraded serializer that is compatible as is.") - .is(HamcrestCondition.matching(TypeSerializerMatchers.isCompatibleAsIs())); + .is((TypeSerializerConditions.isCompatibleAsIs())); assertSerializerIsValid( upgradedSerializer, dataUnderTest(testSpecification), - testSpecification.verifier.testDataMatcher()); + testSpecification.verifier.testDataCondition()); } } @@ -439,7 +433,7 @@ void upgradedSerializerIsValidWhenCompatibleAsIs( * */ private static void assertSerializerIsValid( - TypeSerializer serializer, DataInputView dataInput, Matcher testDataMatcher) + TypeSerializer serializer, DataInputView dataInput, Condition testDataMatcher) throws Exception { DataInputView serializedData = @@ -568,11 +562,11 @@ private static DataInputView readAndThenWriteData( DataInputView originalDataInput, TypeSerializer readSerializer, TypeSerializer writeSerializer, - Matcher testDataMatcher) + Condition testDataCondition) throws IOException { T data = readSerializer.deserialize(originalDataInput); - assertThat(data).is(HamcrestCondition.matching(testDataMatcher)); + assertThat(data).is(testDataCondition); DataOutputSerializer out = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE); writeSerializer.serialize(data, out); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BasicTypeSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BasicTypeSerializerUpgradeTestSpecifications.java index 76410d558da2f..dffa004791a18 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BasicTypeSerializerUpgradeTestSpecifications.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BasicTypeSerializerUpgradeTestSpecifications.java @@ -20,7 +20,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; +import org.apache.flink.api.common.typeutils.TypeSerializerConditions; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; import org.apache.flink.types.BooleanValue; @@ -34,8 +34,7 @@ import org.apache.flink.types.ShortValue; import org.apache.flink.types.StringValue; -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; +import org.assertj.core.api.Condition; import java.math.BigDecimal; import java.math.BigInteger; @@ -71,14 +70,18 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new BigDecimal("123456789012345678901234567890123456.789")); + public Condition testDataCondition() { + return new Condition<>( + value -> + value.equals( + new BigDecimal("123456789012345678901234567890123456.789")), + "value is 123456789012345678901234567890123456.789"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -108,14 +111,16 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new BigInteger("123456789012345678901234567890123456")); + public Condition testDataCondition() { + return new Condition<>( + value -> value.equals(new BigInteger("123456789012345678901234567890123456")), + "value is 123456789012345678901234567890123456"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -145,14 +150,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(Boolean.TRUE); + public Condition testDataCondition() { + return new Condition<>(Boolean.TRUE::equals, "value is true"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -182,14 +187,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(BooleanValue.TRUE); + public Condition testDataCondition() { + return new Condition<>(BooleanValue.TRUE::equals, "value is true"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -219,14 +224,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(Byte.valueOf("42")); + public Condition testDataCondition() { + return new Condition<>(value -> value.equals(Byte.valueOf("42")), "value is 42"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -256,14 +261,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new ByteValue((byte) 42)); + public Condition testDataCondition() { + return new Condition<>(new ByteValue((byte) 42)::equals, "value is 42"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -293,14 +298,15 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(Character.MAX_VALUE); + public Condition testDataCondition() { + return new Condition<>( + value -> value.equals(Character.MAX_VALUE), "value is Character.MAX_VALUE"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -330,14 +336,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new CharValue((char) 42)); + public Condition testDataCondition() { + return new Condition<>(new CharValue((char) 42)::equals, "value is 42"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -367,14 +373,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new Date(1580382960L)); + public Condition testDataCondition() { + return new Condition<>(new Date(1580382960L)::equals, "value is 1580382960L"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -404,14 +410,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new Double("12345.6789")); + public Condition testDataCondition() { + return new Condition<>(new Double("12345.6789")::equals, "value is 12345.6789"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -441,14 +447,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new DoubleValue(12345.6789)); + public Condition testDataCondition() { + return new Condition<>(new DoubleValue(12345.6789)::equals, "value is 12345.6789"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -478,14 +484,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new Float("123.456")); + public Condition testDataCondition() { + return new Condition<>(new Float("123.456")::equals, "value is 123.456"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -515,14 +521,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new FloatValue(123.456f)); + public Condition testDataCondition() { + return new Condition<>(new FloatValue(123.456f)::equals, "value is 123.456f"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -552,14 +558,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(123456); + public Condition testDataCondition() { + return new Condition<>(value -> value.equals(123456), "value is 123456"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -589,14 +595,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new IntValue(123456)); + public Condition testDataCondition() { + return new Condition<>(new IntValue(123456)::equals, "value is 123456"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -626,14 +632,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(1234567890L); + public Condition testDataCondition() { + return new Condition<>(value -> value.equals(1234567890L), "value is 1234567890L"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -663,14 +669,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new LongValue(1234567890)); + public Condition testDataCondition() { + return new Condition<>(new LongValue(1234567890)::equals, "value is 1234567890"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -700,14 +706,15 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(NullValue.getInstance()); + public Condition testDataCondition() { + return new Condition<>( + NullValue.getInstance()::equals, "value is NullValue.getInstance()"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -737,14 +744,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is((short) 123); + public Condition testDataCondition() { + return new Condition<>(value -> value == 123, "value is 123"); } @Override - public Matcher> schemaCompatibilityMatcher( + public Condition> schemaCompatibilityCondition( FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -774,14 +781,14 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new ShortValue((short) 123)); + public Condition testDataCondition() { + return new Condition<>(new ShortValue((short) 123)::equals, "value is 123"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -811,14 +818,15 @@ public TypeSerializer createUpgradedSerializer() { } @Override - public Matcher testDataMatcher() { - return Matchers.is(new java.sql.Date(1580382960L)); + public Condition testDataCondition() { + return new Condition<>( + value -> value.equals(new java.sql.Date(1580382960L)), "value is 1580382960L"); } @Override - public Matcher> schemaCompatibilityMatcher( - FlinkVersion version) { - return TypeSerializerMatchers.isCompatibleAsIs(); + public Condition> + schemaCompatibilityCondition(FlinkVersion version) { + return TypeSerializerConditions.isCompatibleAsIs(); } } @@ -848,14 +856,15 @@ public TypeSerializer