diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..f06130a8 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,45 @@ +################################################################################ +# Copyright (c) 2024 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +name: Tests on JDK 8 +on: + push: + pull_request: + paths-ignore: + - 'docs/**' + - '**/*.md' +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: self-hosted + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Test + timeout-minutes: 60 + run: | + mvn -B verify -Ptest-coverage + env: + MAVEN_OPTS: -Xmx4096m + ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }} + ARTIFACTS_OSS_REGION: ${{ secrets.ARTIFACTS_OSS_REGION }} + ARTIFACTS_OSS_BUCKET: ${{ secrets.ARTIFACTS_OSS_BUCKET }} + ARTIFACTS_OSS_ACCESS_KEY: ${{ secrets.ARTIFACTS_OSS_ACCESS_KEY }} + ARTIFACTS_OSS_SECRET_KEY: ${{ secrets.ARTIFACTS_OSS_SECRET_KEY }} + ARTIFACTS_OSS_STS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_STS_ENDPOINT }} + ARTIFACTS_OSS_ROLE_ARN: ${{ secrets.ARTIFACTS_OSS_ROLE_ARN }} \ No newline at end of file diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index 630ce6e8..325d37af 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -245,6 +245,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception { // assert the cluster should have tablet server number to be 3 assertHasTabletServerNumber(3); + FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata(); // we can create the table now admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get(); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java index 57f8ca0e..1df38465 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java @@ -275,9 +275,13 @@ void testKvHeavyWriteAndScan() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testScanFromStartTimestamp(boolean isPartitioned) throws Exception { + TablePath tablePath = + TablePath.of( + "test_db_1", + "test_scan_from_timestamp" + (isPartitioned ? "_partitioned" : "")); long tableId = createTable( - DATA1_TABLE_PATH, + tablePath, isPartitioned ? DATA1_PARTITIONED_TABLE_INFO.getTableDescriptor() : DATA1_TABLE_INFO.getTableDescriptor(), @@ -289,7 +293,7 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception { FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId); } else { Map partitionNameAndIds = - FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH); + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); // just pick one partition Map.Entry partitionNameAndIdEntry = partitionNameAndIds.entrySet().iterator().next(); @@ -298,12 +302,12 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception { FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId, partitionId); } - PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH, partitionName); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); long firstStartTimestamp = System.currentTimeMillis(); int batchRecordSize = 10; List expectedRows = new ArrayList<>(); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { // 1. first write one batch of data. AppendWriter appendWriter = table.getAppendWriter(); for (int i = 0; i < batchRecordSize; i++) { @@ -315,6 +319,9 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception { appendWriter.append(row).get(); } + // sleep a while to avoid secondStartTimestamp is same with firstStartTimestamp + Thread.sleep(10); + // record second batch start timestamp, we move this before first scan to make it // as early as possible to avoid potential time backwards // as early as possible to avoid potential time backwards @@ -371,9 +378,13 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testScanFromLatestOffsets(boolean isPartitioned) throws Exception { + TablePath tablePath = + TablePath.of( + "test_db_1", + "test_scan_from_latest_offsets" + (isPartitioned ? "_partitioned" : "")); long tableId = createTable( - DATA1_TABLE_PATH, + tablePath, isPartitioned ? DATA1_PARTITIONED_TABLE_INFO.getTableDescriptor() : DATA1_TABLE_INFO.getTableDescriptor(), @@ -384,16 +395,16 @@ void testScanFromLatestOffsets(boolean isPartitioned) throws Exception { FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId); } else { Map partitionNameAndIds = - FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH); + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); // just pick one partition partitionName = partitionNameAndIds.keySet().iterator().next(); partitionId = partitionNameAndIds.get(partitionName); FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId, partitionId); } - PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH, partitionName); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); int batchRecordSize = 10; - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { // 1. first write one batch of data. AppendWriter appendWriter = table.getAppendWriter(); for (int i = 0; i < batchRecordSize; i++) { diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogDownloaderTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogDownloaderTest.java index 392f3ea5..6ee44ffd 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogDownloaderTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogDownloaderTest.java @@ -100,7 +100,7 @@ void testPrefetchNum() throws Exception { retry( Duration.ofMinutes(1), () -> { - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 4; i++) { assertThat(futures.get(i).isDone()).isTrue(); } }); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index a9dbbce0..09e16589 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -119,10 +119,11 @@ void testPartitionedLogTable() throws Exception { @Test void testUnsubscribePartitionBucket() throws Exception { // write rows - Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false); + TablePath tablePath = TablePath.of("test_db_1", "unsubscribe_partition_bucket_table"); + Schema schema = createPartitionedTable(tablePath, false); Map partitionIdByNames = - FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH); - Table table = conn.getTable(DATA1_TABLE_PATH); + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + Table table = conn.getTable(tablePath); Map> expectPartitionAppendRows = writeRows(table, schema, partitionIdByNames); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 82e7e824..feaddf7e 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -47,6 +47,7 @@ import com.alibaba.fluss.utils.Preconditions; import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -101,6 +102,7 @@ void testAppendOnly() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) + @Disabled("TODO, fix me in #116") void testAppendWithSmallBuffer(boolean indexedFormat) throws Exception { TableDescriptor desc = indexedFormat @@ -186,8 +188,9 @@ void testUpsertWithSmallBuffer() throws Exception { @Test void testPutAndLookup() throws Exception { - createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); - verifyPutAndLookup(DATA1_TABLE_PATH_PK, DATA1_SCHEMA_PK, new Object[] {1, "a"}); + TablePath tablePath = TablePath.of("test_db_1", "test_put_and_lookup_table"); + createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); + verifyPutAndLookup(tablePath, DATA1_SCHEMA_PK, new Object[] {1, "a"}); // test put/lookup data for primary table with pk index is not 0 Schema schema = @@ -319,7 +322,8 @@ void verifyPutAndLookup(TablePath tablePath, Schema tableSchema, Object[] fields try (Table table = conn.getTable(tablePath)) { UpsertWriter upsertWriter = table.getUpsertWriter(); // put data. - upsertWriter.upsert(row).get(); + upsertWriter.upsert(row); + upsertWriter.flush(); } // lookup this key. IndexedRow keyRow = keyRow(tableSchema, fields); @@ -601,9 +605,10 @@ void testAppendAndProject() throws Exception { .column("d", DataTypes.BIGINT()) .build(); TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build(); - createTable(DATA1_TABLE_PATH, tableDescriptor, false); + TablePath tablePath = TablePath.of("test_db_1", "test_append_and_project"); + createTable(tablePath, tableDescriptor, false); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); int expectedSize = 30; for (int i = 0; i < expectedSize; i++) { diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java index a77ad8ca..b27deda1 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java @@ -19,6 +19,8 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import com.alibaba.fluss.utils.ExceptionUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; @@ -120,8 +122,15 @@ static void afterAll() { @BeforeEach void beforeEach() throws Exception { - if (catalog != null) { + try { catalog.createDatabase(DEFAULT_DB, null, true); + } catch (CatalogException e) { + // the auto partitioned manager may create the db zk node + // in an another thread, so if exception is NodeExistsException, just ignore + if (!ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) + .isPresent()) { + throw e; + } } } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtilTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtilTest.java index fb6d7c34..d8214194 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtilTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtilTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import java.time.ZoneId; +import java.util.TimeZone; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; import static com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp; @@ -41,7 +42,7 @@ void testParseTimestamp() { parseTimestamp( "2023-12-09 23:09:12", SCAN_STARTUP_TIMESTAMP.key(), - ZoneId.systemDefault())) + TimeZone.getTimeZone("Asia/Shanghai").toZoneId())) .isEqualTo(1702134552000L); assertThatThrownBy( diff --git a/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSTestCredentials.java b/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSTestCredentials.java index 73c58d43..50f3435d 100644 --- a/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSTestCredentials.java +++ b/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSTestCredentials.java @@ -40,7 +40,15 @@ public class OSSTestCredentials { // ------------------------------------------------------------------------ public static boolean credentialsAvailable() { - return ENDPOINT != null && BUCKET != null && ACCESS_KEY != null && SECRET_KEY != null; + return isNotEmpty(ENDPOINT) + && isNotEmpty(BUCKET) + && isNotEmpty(ACCESS_KEY) + && isNotEmpty(SECRET_KEY); + } + + /** Checks if a String is not null and not empty. */ + private static boolean isNotEmpty(@Nullable String str) { + return str != null && !str.isEmpty(); } public static void assumeCredentialsAvailable() { diff --git a/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSWithTokenFileSystemBehaviorITCase.java b/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSWithTokenFileSystemBehaviorITCase.java index 94bb13ae..1f5634a3 100644 --- a/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSWithTokenFileSystemBehaviorITCase.java +++ b/fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSWithTokenFileSystemBehaviorITCase.java @@ -32,25 +32,26 @@ class OSSWithTokenFileSystemBehaviorITCase extends OSSWithTokenFileSystemBehavio private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID(); - private static final FsPath basePath = - new FsPath(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR); - @BeforeAll static void setup() throws Exception { // init a filesystem with ak/sk so that it can generate sts token initFileSystemWithSecretKey(); // now, we can init with sts token - initFileSystemWithToken(); + initFileSystemWithToken(getFsPath()); } @Override protected FileSystem getFileSystem() throws Exception { - return basePath.getFileSystem(); + return getFsPath().getFileSystem(); } @Override protected FsPath getBasePath() { - return basePath; + return getFsPath(); + } + + private static FsPath getFsPath() { + return new FsPath(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR); } @AfterAll @@ -58,11 +59,10 @@ static void clearFsConfig() { FileSystem.initialize(new Configuration(), null); } - private static void initFileSystemWithToken() throws Exception { + private static void initFileSystemWithToken(FsPath fsPath) throws Exception { Configuration configuration = new Configuration(); // obtain a security token and call onNewTokensObtained - ObtainedSecurityToken obtainedSecurityToken = - basePath.getFileSystem().obtainSecurityToken(); + ObtainedSecurityToken obtainedSecurityToken = fsPath.getFileSystem().obtainSecurityToken(); OSSSecurityTokenReceiver ossSecurityTokenReceiver = new OSSSecurityTokenReceiver(); ossSecurityTokenReceiver.onNewTokensObtained(obtainedSecurityToken); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java index 4705ac54..4bb0f0be 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java @@ -103,13 +103,13 @@ private long prepareLogTable( if (isPartitioned) { Map partitionNameById = waitUntilPartitions(tablePath); for (String partition : partitionNameById.values()) { - for (int i = 0; i < 10; i++) { - flinkRows.addAll(writeRows(tablePath, 3, partition)); + for (int i = 0; i < 3; i++) { + flinkRows.addAll(writeRows(tablePath, 10, partition)); } } } else { - for (int i = 0; i < 10; i++) { - flinkRows.addAll(writeRows(tablePath, 3, null)); + for (int i = 0; i < 3; i++) { + flinkRows.addAll(writeRows(tablePath, 10, null)); } } return t1Id; diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java index 7bc675ee..fb50c486 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java @@ -126,8 +126,9 @@ void testLogTableEnumerator() throws Throwable { TableBucket tableBucket = new TableBucket(t1Id, i); // no any paimon data written for the bucket if (bucketLogEndOffset.get(i) <= 0) { - expectedAssignment.put( - i, Collections.singletonList(new LogSplit(tableBucket, null, -2, 0))); + expectedAssignment + .computeIfAbsent(i, (k) -> new ArrayList<>()) + .add(new LogSplit(tableBucket, null, -2, 0)); } } Map> actualAssignment = diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/PaimonSyncTestBase.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/PaimonSyncTestBase.java index 6f25f596..3b846b7e 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/PaimonSyncTestBase.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/PaimonSyncTestBase.java @@ -87,10 +87,11 @@ protected PaimonDataBaseSyncSinkBuilder getDatabaseSyncSinkBuilder( DataStreamSource input = execEnv.fromSource( - flussDatabaseSyncSource, - WatermarkStrategy.noWatermarks(), - "flinkSycDatabaseSource"); - + flussDatabaseSyncSource, + WatermarkStrategy.noWatermarks(), + "flinkSycDatabaseSource") + // limit resource usage + .setParallelism(2); Map paimonCatalogConf = FlinkPaimonTestBase.getPaimonCatalogConf(); return new PaimonDataBaseSyncSinkBuilder(paimonCatalogConf, configuration).withInput(input); @@ -120,7 +121,7 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart TableDescriptor.Builder tableBuilder = TableDescriptor.builder() - .distributedBy(bucketNum) + .distributedBy(bucketNum, "a") .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); if (isPartitioned) { diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java index 43dab2df..430f18be 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java @@ -36,7 +36,15 @@ public RequestChannel(int queueCapacity) { this.requestQueue = new PriorityBlockingQueue<>( queueCapacity, - (req1, req2) -> Integer.compare(req2.getPriority(), req1.getPriority())); + (req1, req2) -> { + // less value will be popped first + int res = Integer.compare(req2.getPriority(), req1.getPriority()); + // if priority is same, we want to keep FIFO + if (res == 0 && req1 != req2) { + res = (req1.getRequestId() < req2.getRequestId() ? -1 : 1); + } + return res; + }); } /** diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java index 24e40d2a..1139be25 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java @@ -25,6 +25,9 @@ import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; /** The test for {@link RequestChannel}. */ @@ -32,38 +35,32 @@ public class RequestChannelTest { @Test void testRequestPriority() throws Exception { - RequestChannel channel = new RequestChannel(10); + RequestChannel channel = new RequestChannel(100); // 1. request with same priority score. Use FIFO. - RpcRequest rpcRequest1 = - new RpcRequest( - ApiKeys.GET_TABLE.id, - (short) 0, - 2, - null, - new GetTableRequest(), - new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), - null); - RpcRequest rpcRequest2 = - new RpcRequest( - ApiKeys.GET_TABLE.id, - (short) 0, - 1, - null, - new GetTableRequest(), - new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), - null); - - channel.putRequest(rpcRequest1); - channel.putRequest(rpcRequest2); - - RpcRequest rpcRequest = channel.pollRequest(100); - assertThat(rpcRequest).isEqualTo(rpcRequest1); - rpcRequest = channel.pollRequest(100); - assertThat(rpcRequest).isEqualTo(rpcRequest2); + List rpcRequests = new ArrayList<>(); + // push rpc requests + for (int i = 0; i < 100; i++) { + RpcRequest rpcRequest = + new RpcRequest( + ApiKeys.GET_TABLE.id, + (short) 0, + i, + null, + new GetTableRequest(), + new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), + null); + channel.putRequest(rpcRequest); + rpcRequests.add(rpcRequest); + } + // pop rpc requests + for (int i = 0; i < 100; i++) { + RpcRequest gotRequest = channel.pollRequest(100); + assertThat(gotRequest).isEqualTo(rpcRequests.get(i)); + } // 2. request with different priority score. Should be ordered by priority score. - RpcRequest rpcRequest3 = + RpcRequest rpcRequest1 = new RpcRequest( ApiKeys.GET_TABLE.id, (short) 0, @@ -72,7 +69,7 @@ void testRequestPriority() throws Exception { new GetTableRequest(), new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), null); - RpcRequest rpcRequest4 = + RpcRequest rpcRequest2 = new RpcRequest( ApiKeys.FETCH_LOG.id, (short) 0, @@ -81,11 +78,11 @@ void testRequestPriority() throws Exception { new FetchLogRequest().setMaxBytes(100).setFollowerServerId(2), new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)), null); - channel.putRequest(rpcRequest3); - channel.putRequest(rpcRequest4); - rpcRequest = channel.pollRequest(100); - assertThat(rpcRequest).isEqualTo(rpcRequest4); + channel.putRequest(rpcRequest1); + channel.putRequest(rpcRequest2); + RpcRequest rpcRequest = channel.pollRequest(100); + assertThat(rpcRequest).isEqualTo(rpcRequest2); rpcRequest = channel.pollRequest(100); - assertThat(rpcRequest).isEqualTo(rpcRequest3); + assertThat(rpcRequest).isEqualTo(rpcRequest1); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java index 321aac54..6519e39e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -53,6 +53,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTableBucket; + /** A request sender for coordinator server to request to tablet server by batch. */ public class CoordinatorRequestBatch { @@ -316,13 +318,7 @@ private void sendNotifyLeaderAndIsrRequest(int coordinatorEpoch) { for (PbNotifyLeaderAndIsrRespForBucket protoNotifyLeaderRespForBucket : response.getNotifyBucketsLeaderRespsList()) { TableBucket tableBucket = - new TableBucket( - protoNotifyLeaderRespForBucket - .getTableBucket() - .getTableId(), - protoNotifyLeaderRespForBucket - .getTableBucket() - .getBucketId()); + toTableBucket(protoNotifyLeaderRespForBucket.getTableBucket()); // construct the result for notify bucket leader and isr NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket = protoNotifyLeaderRespForBucket.hasErrorCode() @@ -360,7 +356,7 @@ private void sendStopRequest(int coordinatorEpoch) { Set deletedReplicaBuckets = stopReplicas.values().stream() .filter(PbStopReplicaReqForBucket::isDelete) - .map(t -> RpcMessageUtils.toTableBucket(t.getTableBucket())) + .map(t -> toTableBucket(t.getTableBucket())) .collect(Collectors.toSet()); coordinatorChannelManager.sendStopBucketReplicaRequest( @@ -387,8 +383,7 @@ private void sendStopRequest(int coordinatorEpoch) { for (PbStopReplicaRespForBucket stopReplicaRespForBucket : stopReplicasResps) { TableBucket tableBucket = - RpcMessageUtils.toTableBucket( - stopReplicaRespForBucket.getTableBucket()); + toTableBucket(stopReplicaRespForBucket.getTableBucket()); // now, for stop replica(delete=false), it's best effort without any // error handling. diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java index dee77cd1..69d626a5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java @@ -288,10 +288,15 @@ public static NotifyLeaderAndIsrResponse makeNotifyLeaderAndIsrResponse( for (NotifyLeaderAndIsrResultForBucket bucketResult : bucketsResult) { PbNotifyLeaderAndIsrRespForBucket respForBucket = new PbNotifyLeaderAndIsrRespForBucket(); - respForBucket - .setTableBucket() - .setTableId(bucketResult.getTableId()) - .setBucketId(bucketResult.getBucketId()); + TableBucket tableBucket = bucketResult.getTableBucket(); + PbTableBucket pbTableBucket = + respForBucket + .setTableBucket() + .setTableId(tableBucket.getTableId()) + .setBucketId(tableBucket.getBucket()); + if (tableBucket.getPartitionId() != null) { + pbTableBucket.setPartitionId(tableBucket.getPartitionId()); + } if (bucketResult.failed()) { respForBucket.setError(bucketResult.getErrorCode(), bucketResult.getErrorMessage()); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 4bdc4cdb..a4571dc7 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -75,6 +75,10 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyBucketForPartitionInState; +import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyBucketForTableInState; +import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyReplicaForPartitionInState; +import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyReplicaForTableInState; import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket; import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OnlineBucket; import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica; @@ -161,7 +165,7 @@ void testCreateAndDropTable() throws Exception { CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess( eventProcessor.getCoordinatorContext(), testCoordinatorChannelManager); // create a table, - TablePath t1 = TablePath.of(defaultDatabase, "t1"); + TablePath t1 = TablePath.of(defaultDatabase, "create_drop_t1"); TableDescriptor tableDescriptor = TEST_TABLE; int nBuckets = 3; int replicationFactor = 3; @@ -170,7 +174,7 @@ void testCreateAndDropTable() throws Exception { nBuckets, replicationFactor, new int[] {0, 1, 2}); long t1Id = metaDataManager.createTable(t1, tableDescriptor, tableAssignment, false); - TablePath t2 = TablePath.of(defaultDatabase, "t2"); + TablePath t2 = TablePath.of(defaultDatabase, "create_drop_t2"); long t2Id = metaDataManager.createTable(t2, tableDescriptor, tableAssignment, false); verifyTableCreated(coordinatorContext, t2Id, tableAssignment, nBuckets, replicationFactor); @@ -181,9 +185,8 @@ void testCreateAndDropTable() throws Exception { verifyTableDropped(coordinatorContext, t1Id); // replicas and buckets for t2 should still be online - CoordinatorTestUtils.verifyBucketForTableInState( - coordinatorContext, t2Id, nBuckets, BucketState.OnlineBucket); - CoordinatorTestUtils.verifyReplicaForTableInState( + verifyBucketForTableInState(coordinatorContext, t2Id, nBuckets, BucketState.OnlineBucket); + verifyReplicaForTableInState( coordinatorContext, t2Id, nBuckets * replicationFactor, ReplicaState.OnlineReplica); // shutdown event processor and delete the table node for t2 from zk @@ -242,7 +245,7 @@ void testDropTableWithRetry() throws Exception { waitUtil( () -> coordinatorContext.getTablePathById(t1Id) != null, Duration.ofMinutes(1), - "Fail to wait for coordinator handling create table event for table %s" + t1Id); + "Fail to wait for coordinator handling create table event for table " + t1Id); // drop the table; metaDataManager.dropTable(t1, false); @@ -360,8 +363,10 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { retry( Duration.ofMinutes(1), () -> - assertThat(coordinatorContext.getLiveTabletServers()) - .containsKey(newlyServerId)); + assertThat( + new HashSet<>( + coordinatorContext.getLiveTabletServers().keySet())) + .contains(newlyServerId)); // make sure the bucket that remains in offline should be online again // since the server become online @@ -422,7 +427,7 @@ void testRestartTriggerReplicaToOffline() throws Exception { .add(0, BucketAssignment.of(0, 1, 2)) .add(1, BucketAssignment.of(1, 2, 0)) .build(); - TablePath tablePath = TablePath.of(defaultDatabase, "t1"); + TablePath tablePath = TablePath.of(defaultDatabase, "t_restart"); long table1Id = metaDataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); // let's restart @@ -462,8 +467,14 @@ void testRestartTriggerReplicaToOffline() throws Exception { // check the changed leader and isr info CoordinatorTestUtils.checkLeaderAndIsr(zookeeperClient, t1Bucket0, 1, 1); - assertThat(coordinatorContext.getBucketState(t1Bucket0)).isEqualTo(OnlineBucket); - assertThat(coordinatorContext.getBucketState(t1Bucket1)).isEqualTo(OnlineBucket); + retry( + Duration.ofMinutes(1), + () -> { + assertThat(coordinatorContext.getBucketState(t1Bucket0)) + .isEqualTo(OnlineBucket); + assertThat(coordinatorContext.getBucketState(t1Bucket1)) + .isEqualTo(OnlineBucket); + }); // only replica0 will be offline verifyReplicaOnlineOrOffline( coordinatorContext, table1Id, tableAssignment, Collections.singleton(failedServer)); @@ -556,6 +567,11 @@ void testCreateAndDropPartition() throws Exception { long tableId = metaDataManager.createTable(tablePath, tablePartitionTableDescriptor, null, false); + retry( + Duration.ofMinutes(1), + // retry util the table has been put into context + () -> assertThat(coordinatorContext.getTablePathById(tableId)).isNotNull()); + // create partition long partition1Id = zookeeperClient.getPartitionIdAndIncrement(); long partition2Id = zookeeperClient.getPartitionIdAndIncrement(); @@ -645,13 +661,19 @@ private void verifyTableCreated( .isTrue(); }); // make sure all should be online - CoordinatorTestUtils.verifyBucketForTableInState( - coordinatorContext, tableId, nBuckets, BucketState.OnlineBucket); - CoordinatorTestUtils.verifyReplicaForTableInState( - coordinatorContext, - tableId, - nBuckets * replicationFactor, - ReplicaState.OnlineReplica); + retry( + Duration.ofMinutes(1), + () -> + verifyBucketForTableInState( + coordinatorContext, tableId, nBuckets, BucketState.OnlineBucket)); + retry( + Duration.ofMinutes(1), + () -> + verifyReplicaForTableInState( + coordinatorContext, + tableId, + nBuckets * replicationFactor, + ReplicaState.OnlineReplica)); for (TableBucket tableBucket : coordinatorContext.getAllBucketsForTable(tableId)) { CoordinatorTestUtils.checkLeaderAndIsr( zookeeperClient, @@ -687,13 +709,22 @@ private void verifyPartitionCreated( .isTrue(); }); // make sure all should be online - CoordinatorTestUtils.verifyBucketForPartitionInState( - coordinatorContext, tablePartition, nBuckets, BucketState.OnlineBucket); - CoordinatorTestUtils.verifyReplicaForPartitionInState( - coordinatorContext, - tablePartition, - nBuckets * replicationFactor, - ReplicaState.OnlineReplica); + retry( + Duration.ofMinutes(1), + () -> + verifyBucketForPartitionInState( + coordinatorContext, + tablePartition, + nBuckets, + BucketState.OnlineBucket)); + retry( + Duration.ofMinutes(1), + () -> + verifyReplicaForPartitionInState( + coordinatorContext, + tablePartition, + nBuckets * replicationFactor, + ReplicaState.OnlineReplica)); for (TableBucket tableBucket : coordinatorContext.getAllBucketsForPartition( tablePartition.getTableId(), tablePartition.getPartitionId())) { @@ -761,12 +792,25 @@ private void verifyReplicaOnlineOrOffline( new TableBucketReplica(bucket, replica); // if expected to be offline if (expectedOfflineReplicas.contains(replica)) { - assertThat(coordinatorContext.getReplicaState(bucketReplica)) - .isEqualTo(OfflineReplica); + retry( + Duration.ofMinutes(1), + () -> + assertThat( + coordinatorContext + .getReplicaState( + bucketReplica)) + .isEqualTo(OfflineReplica)); + } else { // otherwise, should be online - assertThat(coordinatorContext.getReplicaState(bucketReplica)) - .isEqualTo(OnlineReplica); + retry( + Duration.ofMinutes(1), + () -> { + assertThat( + coordinatorContext.getReplicaState( + bucketReplica)) + .isEqualTo(OnlineReplica); + }); } } }); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java index 76d154fa..46b903b6 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java @@ -339,7 +339,7 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception { void testPartitionedTableManagement(AutoPartitionTimeUnit timeUnit) throws Exception { AdminGateway adminGateway = getAdminGateway(); String db1 = "db1"; - String tb1 = "tb1"; + String tb1 = "tb1_" + timeUnit.name(); TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 4535fbb7..0e78164c 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -181,6 +181,12 @@ void testPartitionedTable() throws Exception { expectedEvents.add( new CreatePartitionEvent(tablePath, tableId, 2L, "2022", partitionAssignment)); + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + metaDataManager.dropTable(tablePath, false); // drop partitions event diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java index 80dd3946..af1029ab 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; +import java.util.Objects; import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; @@ -64,7 +65,9 @@ void testDeleteRemoteLog() throws Exception { TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); - int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + int leader = + Objects.requireNonNull( + FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb).getLeaderId()); TabletServerGateway leaderGateWay = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); // produce many records to trigger remote log copy. diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java index ea107a41..4584e6b6 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java @@ -116,7 +116,7 @@ void testRestore() throws Exception { final int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket); Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); - int recordsNum = 3000; + int recordsNum = 10000; // create pretty many records to make can flush to file List records = new ArrayList<>(recordsNum); for (int i = 0; i < recordsNum; i++) { @@ -140,7 +140,11 @@ void testRestore() throws Exception { return false; }, Duration.ofMinutes(2), - "Fail to wait for the replica to restore in another server"); + "Fail to wait for the replica to restore in another server. " + + "Previous leader is " + + leaderServer + + ", restored leader is " + + newLeaderServer.get()); // wait the new replica become leader TabletServerGateway leaderGateway = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(newLeaderServer.get()); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java index 3b9a2d05..e52caa6e 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java @@ -47,6 +47,7 @@ import com.alibaba.fluss.server.zk.data.PartitionAssignment; import com.alibaba.fluss.server.zk.data.RemoteLogManifestHandle; import com.alibaba.fluss.server.zk.data.TableAssignment; +import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.NetUtils; import org.apache.curator.test.TestingServer; @@ -59,6 +60,7 @@ import java.io.File; import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -142,6 +144,11 @@ public void afterEach(ExtensionContext extensionContext) { for (String database : databases) { if (!database.equals(defaultDb)) { metaDataManager.dropDatabase(database, true, true); + // delete the data dirs + for (int serverId : tabletServers.keySet()) { + String dataDir = getDataDir(serverId); + FileUtils.deleteDirectoryQuietly(Paths.get(dataDir, database).toFile()); + } } } List tables = metaDataManager.listTables(defaultDb); @@ -239,8 +246,7 @@ public void startTabletServer(int serverId) throws Exception { if (tabletServers.containsKey(serverId)) { throw new IllegalArgumentException("Tablet server " + serverId + " already exists."); } - String dataDir = tempDir.getAbsolutePath() + File.separator + "tablet-server-" + serverId; - + String dataDir = getDataDir(serverId); final ServerNode serverNode; final TabletServer tabletServer; try (NetUtils.Port availablePort = getAvailablePort()) { @@ -269,6 +275,10 @@ public void startTabletServer(int serverId) throws Exception { tabletServerNodes.add(serverNode); } + private String getDataDir(int serverId) { + return tempDir.getAbsolutePath() + File.separator + "tablet-server-" + serverId; + } + private void setRemoteDataDir(Configuration conf) { String remoteDataDir = LocalFileSystem.getLocalFsURI().getScheme() diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index b2b5fba0..7edb37de 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -253,7 +253,7 @@ com.alibaba.fluss.fs.hdfs.HadoopFsPlugin com.alibaba.fluss.fs.hdfs.HadoopSecurityTokenReceiver - com.alibaba.fluss.fs.oss.OSSFileSystem + com.alibaba.fluss.fs.oss.* com.alibaba.fluss.fs.s3.* com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser* com.alibaba.fluss.rocksdb.RocksIteratorWrapper