Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Add tests case for RemoteLogITCase #96

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.server.entity.FetchData;
import com.alibaba.fluss.server.log.FetchParams;
import com.alibaba.fluss.server.tablet.TabletServer;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
import com.alibaba.fluss.utils.FlussPaths;
Expand All @@ -33,6 +37,9 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static com.alibaba.fluss.record.TestData.DATA1;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
Expand All @@ -54,34 +61,101 @@ public class RemoteLogITCase {
.setClusterConf(initConfig())
.build();

@Test
void testDeleteRemoteLog() throws Exception {
private TableBucket setupTestEnvironment() throws Exception {
long tableId =
createTable(
FLUSS_CLUSTER_EXTENSION,
DATA1_TABLE_PATH,
DATA1_TABLE_INFO.getTableDescriptor());
TableBucket tb = new TableBucket(tableId, 0);

FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
// produce many records to trigger remote log copy.
for (int i = 0; i < 10; i++) {
return tb;
}

private void produceTestRecords(TabletServerGateway leaderGateWay, TableBucket tb, int count)
throws Exception {
for (int i = 0; i < count; i++) {
assertProduceLogResponse(
leaderGateWay
.produceLog(
newProduceLogRequest(
tableId, 0, 1, genMemoryLogRecordsByObject(DATA1)))
tb.getTableId(),
0,
1,
genMemoryLogRecordsByObject(DATA1)))
.get(),
0,
i * 10L);
}
}

FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(new TableBucket(tableId, 0));
@Test
void testCreateAndUploadRemoteLog() throws Exception {
TableBucket tb = setupTestEnvironment();
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);

// Produce records and wait for remote copy
produceTestRecords(leaderGateWay, tb, 5);
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);

// get leader.
// Verify remote files
TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leader);
FsPath fsPath =
FlussPaths.remoteLogTabletDir(
tabletServer.getReplicaManager().getRemoteLogManager().remoteLogDir(),
PhysicalTablePath.of(DATA1_TABLE_PATH),
tb);
FileSystem fileSystem = fsPath.getFileSystem();

assertThat(fileSystem.exists(fsPath)).isTrue();
assertThat(fileSystem.listStatus(fsPath).length).isGreaterThan(0);
}

@Test
void testDownloadRemoteLog() throws Exception {
TableBucket tb = setupTestEnvironment();
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leader);

// Produce records and wait for remote copy
produceTestRecords(leaderGateWay, tb, 10);
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);

CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> future =
new CompletableFuture<>();
tabletServer
.getReplicaManager()
.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(
tb, new FetchData(tb.getTableId(), 0L, 1024 * 1024)),
future::complete);

Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result).hasSize(1);

FetchLogResultForBucket resultForBucket = result.get(tb);
assertThat(resultForBucket.getError()).isEqualTo(ApiError.NONE);
assertThat(resultForBucket.fetchFromRemote()).isTrue();
assertThat(resultForBucket.getHighWatermark()).isGreaterThan(0L);
}

@Test
void testDeleteRemoteLog() throws Exception {
TableBucket tb = setupTestEnvironment();
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);

// Produce records and wait for remote copy
produceTestRecords(leaderGateWay, tb, 10);
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);

// Verify remote files exist
TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leader);
FsPath fsPath =
FlussPaths.remoteLogTabletDir(
Expand All @@ -92,6 +166,7 @@ void testDeleteRemoteLog() throws Exception {
assertThat(fileSystem.exists(fsPath)).isTrue();
assertThat(fileSystem.listStatus(fsPath).length).isGreaterThan(1);

// Delete table and verify cleanup
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
coordinatorGateway
.dropTable(
Expand All @@ -105,57 +180,59 @@ void testDeleteRemoteLog() throws Exception {

@Test
void testFollowerFetchAlreadyMoveToRemoteLog() throws Exception {
long tableId =
createTable(
FLUSS_CLUSTER_EXTENSION,
DATA1_TABLE_PATH,
DATA1_TABLE_INFO.getTableDescriptor());
TableBucket tb = new TableBucket(tableId, 0);

FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
TableBucket tb = setupTestEnvironment();
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
int follower;
for (int i = 0; true; i++) {
if (i != leader) {
follower = i;
break;
}
}
// kill follower, and restart after some segments in leader has been copied to remote.
int follower = leader == 0 ? 1 : 0;

// Stop follower
FLUSS_CLUSTER_EXTENSION.stopTabletServer(follower);

// Produce records and wait for remote copy
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
// produce many records to trigger remote log copy.
for (int i = 0; i < 10; i++) {
assertProduceLogResponse(
leaderGateWay
.produceLog(
newProduceLogRequest(
tableId, 0, 1, genMemoryLogRecordsByObject(DATA1)))
.get(),
0,
i * 10L);
}
produceTestRecords(leaderGateWay, tb, 10);

FLUSS_CLUSTER_EXTENSION.waitUtilReplicaShrinkFromIsr(tb, follower);
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);

// restart follower
// Restart follower and verify recovery
FLUSS_CLUSTER_EXTENSION.startTabletServer(follower);
FLUSS_CLUSTER_EXTENSION.waitUtilReplicaExpandToIsr(tb, follower);
}

@Test
void testRemoteLogMetadataUpdate() throws Exception {
TableBucket tb = setupTestEnvironment();
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);

// Produce records and wait for remote copy
produceTestRecords(leaderGateWay, tb, 5);
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);

// Verify metadata
TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leader);
RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager();
RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);

assertThat(remoteLog.allRemoteLogSegments()).isNotEmpty();
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L);
assertThat(remoteLog.getRemoteLogEndOffset()).isPresent();

// Verify manifest
RemoteLogManifest manifest = remoteLog.currentManifest();
assertThat(manifest.getRemoteLogSegmentList()).isNotEmpty();
assertThat(manifest.getRemoteLogSegmentList())
.containsExactlyInAnyOrderElementsOf(remoteLog.allRemoteLogSegments());
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_BUCKET_NUMBER, 1);
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
// set a shorter interval for testing purpose
conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ofSeconds(1));
conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("1kb"));

// set a shorter max log time to allow replica shrink from isr. Don't be too low, otherwise
// normal follower synchronization will also be affected
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(5));
return conf;
}
Expand Down