Skip to content

Commit

Permalink
[build] Enable ci (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuxia authored Dec 10, 2024
1 parent 8ae33af commit 2291484
Show file tree
Hide file tree
Showing 24 changed files with 280 additions and 125 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
################################################################################
# Copyright (c) 2024 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
name: Tests on JDK 8
on:
push:
pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'
concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true

jobs:
build:
runs-on: self-hosted
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Test
timeout-minutes: 60
run: |
mvn -B verify -Ptest-coverage
env:
MAVEN_OPTS: -Xmx4096m
ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }}
ARTIFACTS_OSS_REGION: ${{ secrets.ARTIFACTS_OSS_REGION }}
ARTIFACTS_OSS_BUCKET: ${{ secrets.ARTIFACTS_OSS_BUCKET }}
ARTIFACTS_OSS_ACCESS_KEY: ${{ secrets.ARTIFACTS_OSS_ACCESS_KEY }}
ARTIFACTS_OSS_SECRET_KEY: ${{ secrets.ARTIFACTS_OSS_SECRET_KEY }}
ARTIFACTS_OSS_STS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_STS_ENDPOINT }}
ARTIFACTS_OSS_ROLE_ARN: ${{ secrets.ARTIFACTS_OSS_ROLE_ARN }}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {

// assert the cluster should have tablet server number to be 3
assertHasTabletServerNumber(3);
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();

// we can create the table now
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,13 @@ void testKvHeavyWriteAndScan() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
TablePath tablePath =
TablePath.of(
"test_db_1",
"test_scan_from_timestamp" + (isPartitioned ? "_partitioned" : ""));
long tableId =
createTable(
DATA1_TABLE_PATH,
tablePath,
isPartitioned
? DATA1_PARTITIONED_TABLE_INFO.getTableDescriptor()
: DATA1_TABLE_INFO.getTableDescriptor(),
Expand All @@ -289,7 +293,7 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
} else {
Map<String, Long> partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH);
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);
// just pick one partition
Map.Entry<String, Long> partitionNameAndIdEntry =
partitionNameAndIds.entrySet().iterator().next();
Expand All @@ -298,12 +302,12 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId, partitionId);
}

PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH, partitionName);
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);

long firstStartTimestamp = System.currentTimeMillis();
int batchRecordSize = 10;
List<IndexedRow> expectedRows = new ArrayList<>();
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
try (Table table = conn.getTable(tablePath)) {
// 1. first write one batch of data.
AppendWriter appendWriter = table.getAppendWriter();
for (int i = 0; i < batchRecordSize; i++) {
Expand All @@ -315,6 +319,9 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
appendWriter.append(row).get();
}

// sleep a while to avoid secondStartTimestamp is same with firstStartTimestamp
Thread.sleep(10);

// record second batch start timestamp, we move this before first scan to make it
// as early as possible to avoid potential time backwards
// as early as possible to avoid potential time backwards
Expand Down Expand Up @@ -371,9 +378,13 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testScanFromLatestOffsets(boolean isPartitioned) throws Exception {
TablePath tablePath =
TablePath.of(
"test_db_1",
"test_scan_from_latest_offsets" + (isPartitioned ? "_partitioned" : ""));
long tableId =
createTable(
DATA1_TABLE_PATH,
tablePath,
isPartitioned
? DATA1_PARTITIONED_TABLE_INFO.getTableDescriptor()
: DATA1_TABLE_INFO.getTableDescriptor(),
Expand All @@ -384,16 +395,16 @@ void testScanFromLatestOffsets(boolean isPartitioned) throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
} else {
Map<String, Long> partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH);
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);
// just pick one partition
partitionName = partitionNameAndIds.keySet().iterator().next();
partitionId = partitionNameAndIds.get(partitionName);
FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId, partitionId);
}
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH, partitionName);
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);

int batchRecordSize = 10;
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
try (Table table = conn.getTable(tablePath)) {
// 1. first write one batch of data.
AppendWriter appendWriter = table.getAppendWriter();
for (int i = 0; i < batchRecordSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void testPrefetchNum() throws Exception {
retry(
Duration.ofMinutes(1),
() -> {
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 4; i++) {
assertThat(futures.get(i).isDone()).isTrue();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ void testPartitionedLogTable() throws Exception {
@Test
void testUnsubscribePartitionBucket() throws Exception {
// write rows
Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false);
TablePath tablePath = TablePath.of("test_db_1", "unsubscribe_partition_bucket_table");
Schema schema = createPartitionedTable(tablePath, false);
Map<String, Long> partitionIdByNames =
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH);
Table table = conn.getTable(DATA1_TABLE_PATH);
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);
Table table = conn.getTable(tablePath);

Map<Long, List<InternalRow>> expectPartitionAppendRows =
writeRows(table, schema, partitionIdByNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.alibaba.fluss.utils.Preconditions;

import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -101,6 +102,7 @@ void testAppendOnly() throws Exception {

@ParameterizedTest
@ValueSource(booleans = {true, false})
@Disabled("TODO, fix me in #116")
void testAppendWithSmallBuffer(boolean indexedFormat) throws Exception {
TableDescriptor desc =
indexedFormat
Expand Down Expand Up @@ -186,8 +188,9 @@ void testUpsertWithSmallBuffer() throws Exception {

@Test
void testPutAndLookup() throws Exception {
createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK.getTableDescriptor(), false);
verifyPutAndLookup(DATA1_TABLE_PATH_PK, DATA1_SCHEMA_PK, new Object[] {1, "a"});
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_lookup_table");
createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false);
verifyPutAndLookup(tablePath, DATA1_SCHEMA_PK, new Object[] {1, "a"});

// test put/lookup data for primary table with pk index is not 0
Schema schema =
Expand Down Expand Up @@ -319,7 +322,8 @@ void verifyPutAndLookup(TablePath tablePath, Schema tableSchema, Object[] fields
try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.getUpsertWriter();
// put data.
upsertWriter.upsert(row).get();
upsertWriter.upsert(row);
upsertWriter.flush();
}
// lookup this key.
IndexedRow keyRow = keyRow(tableSchema, fields);
Expand Down Expand Up @@ -601,9 +605,10 @@ void testAppendAndProject() throws Exception {
.column("d", DataTypes.BIGINT())
.build();
TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build();
createTable(DATA1_TABLE_PATH, tableDescriptor, false);
TablePath tablePath = TablePath.of("test_db_1", "test_append_and_project");
createTable(tablePath, tableDescriptor, false);

try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.getAppendWriter();
int expectedSize = 30;
for (int i = 0; i < expectedSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import com.alibaba.fluss.utils.ExceptionUtils;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -120,8 +122,15 @@ static void afterAll() {

@BeforeEach
void beforeEach() throws Exception {
if (catalog != null) {
try {
catalog.createDatabase(DEFAULT_DB, null, true);
} catch (CatalogException e) {
// the auto partitioned manager may create the db zk node
// in an another thread, so if exception is NodeExistsException, just ignore
if (!ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
.isPresent()) {
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.Test;

import java.time.ZoneId;
import java.util.TimeZone;

import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
import static com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp;
Expand All @@ -41,7 +42,7 @@ void testParseTimestamp() {
parseTimestamp(
"2023-12-09 23:09:12",
SCAN_STARTUP_TIMESTAMP.key(),
ZoneId.systemDefault()))
TimeZone.getTimeZone("Asia/Shanghai").toZoneId()))
.isEqualTo(1702134552000L);

assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ public class OSSTestCredentials {
// ------------------------------------------------------------------------

public static boolean credentialsAvailable() {
return ENDPOINT != null && BUCKET != null && ACCESS_KEY != null && SECRET_KEY != null;
return isNotEmpty(ENDPOINT)
&& isNotEmpty(BUCKET)
&& isNotEmpty(ACCESS_KEY)
&& isNotEmpty(SECRET_KEY);
}

/** Checks if a String is not null and not empty. */
private static boolean isNotEmpty(@Nullable String str) {
return str != null && !str.isEmpty();
}

public static void assumeCredentialsAvailable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,37 @@ class OSSWithTokenFileSystemBehaviorITCase extends OSSWithTokenFileSystemBehavio

private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();

private static final FsPath basePath =
new FsPath(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);

@BeforeAll
static void setup() throws Exception {
// init a filesystem with ak/sk so that it can generate sts token
initFileSystemWithSecretKey();
// now, we can init with sts token
initFileSystemWithToken();
initFileSystemWithToken(getFsPath());
}

@Override
protected FileSystem getFileSystem() throws Exception {
return basePath.getFileSystem();
return getFsPath().getFileSystem();
}

@Override
protected FsPath getBasePath() {
return basePath;
return getFsPath();
}

private static FsPath getFsPath() {
return new FsPath(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
}

@AfterAll
static void clearFsConfig() {
FileSystem.initialize(new Configuration(), null);
}

private static void initFileSystemWithToken() throws Exception {
private static void initFileSystemWithToken(FsPath fsPath) throws Exception {
Configuration configuration = new Configuration();
// obtain a security token and call onNewTokensObtained
ObtainedSecurityToken obtainedSecurityToken =
basePath.getFileSystem().obtainSecurityToken();
ObtainedSecurityToken obtainedSecurityToken = fsPath.getFileSystem().obtainSecurityToken();
OSSSecurityTokenReceiver ossSecurityTokenReceiver = new OSSSecurityTokenReceiver();
ossSecurityTokenReceiver.onNewTokensObtained(obtainedSecurityToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ private long prepareLogTable(
if (isPartitioned) {
Map<Long, String> partitionNameById = waitUntilPartitions(tablePath);
for (String partition : partitionNameById.values()) {
for (int i = 0; i < 10; i++) {
flinkRows.addAll(writeRows(tablePath, 3, partition));
for (int i = 0; i < 3; i++) {
flinkRows.addAll(writeRows(tablePath, 10, partition));
}
}
} else {
for (int i = 0; i < 10; i++) {
flinkRows.addAll(writeRows(tablePath, 3, null));
for (int i = 0; i < 3; i++) {
flinkRows.addAll(writeRows(tablePath, 10, null));
}
}
return t1Id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ void testLogTableEnumerator() throws Throwable {
TableBucket tableBucket = new TableBucket(t1Id, i);
// no any paimon data written for the bucket
if (bucketLogEndOffset.get(i) <= 0) {
expectedAssignment.put(
i, Collections.singletonList(new LogSplit(tableBucket, null, -2, 0)));
expectedAssignment
.computeIfAbsent(i, (k) -> new ArrayList<>())
.add(new LogSplit(tableBucket, null, -2, 0));
}
}
Map<Integer, List<SourceSplitBase>> actualAssignment =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ protected PaimonDataBaseSyncSinkBuilder getDatabaseSyncSinkBuilder(

DataStreamSource<MultiplexCdcRecord> input =
execEnv.fromSource(
flussDatabaseSyncSource,
WatermarkStrategy.noWatermarks(),
"flinkSycDatabaseSource");

flussDatabaseSyncSource,
WatermarkStrategy.noWatermarks(),
"flinkSycDatabaseSource")
// limit resource usage
.setParallelism(2);
Map<String, String> paimonCatalogConf = FlinkPaimonTestBase.getPaimonCatalogConf();

return new PaimonDataBaseSyncSinkBuilder(paimonCatalogConf, configuration).withInput(input);
Expand Down Expand Up @@ -120,7 +121,7 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
.distributedBy(bucketNum)
.distributedBy(bucketNum, "a")
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");

if (isPartitioned) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ public RequestChannel(int queueCapacity) {
this.requestQueue =
new PriorityBlockingQueue<>(
queueCapacity,
(req1, req2) -> Integer.compare(req2.getPriority(), req1.getPriority()));
(req1, req2) -> {
// less value will be popped first
int res = Integer.compare(req2.getPriority(), req1.getPriority());
// if priority is same, we want to keep FIFO
if (res == 0 && req1 != req2) {
res = (req1.getRequestId() < req2.getRequestId() ? -1 : 1);
}
return res;
});
}

/**
Expand Down
Loading

0 comments on commit 2291484

Please sign in to comment.