Skip to content

Commit

Permalink
[ci] Fix unstable put kv test (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong authored Dec 10, 2024
1 parent 2291484 commit 00fff20
Showing 1 changed file with 25 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@
import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE;
import static com.alibaba.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK;
import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse;
import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponseWithRowKind;
import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.createTable;
import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newFetchLogRequest;
import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest;
import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newPutKvRequest;
import static com.alibaba.fluss.testutils.DataTestUtils.assertLogRecordsEquals;
import static com.alibaba.fluss.testutils.DataTestUtils.assertLogRecordsEqualsWithRowKind;
import static com.alibaba.fluss.testutils.DataTestUtils.genKvRecordBatch;
Expand Down Expand Up @@ -103,7 +107,7 @@ void testProduceLogNeedAck() throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();

long tableId =
RpcMessageTestUtils.createTable(
createTable(
FLUSS_CLUSTER_EXTENSION,
DATA1_TABLE_PATH,
data1NonPkTableInfo.getTableDescriptor());
Expand Down Expand Up @@ -131,9 +135,7 @@ void testProduceLogNeedAck() throws Exception {

// check leader log data.
RpcMessageTestUtils.assertFetchLogResponse(
leaderGateWay
.fetchLog(RpcMessageTestUtils.newFetchLogRequest(-1, tableId, bucketId, 0L))
.get(),
leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, bucketId, 0L)).get(),
tableId,
bucketId,
10L,
Expand Down Expand Up @@ -185,7 +187,7 @@ void testPutKvNeedAck() throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();

long tableId =
RpcMessageTestUtils.createTable(
createTable(
FLUSS_CLUSTER_EXTENSION,
DATA1_TABLE_PATH_PK,
data1PkTableInfo.getTableDescriptor());
Expand All @@ -202,7 +204,7 @@ void testPutKvNeedAck() throws Exception {
assertPutKvResponse(
leaderGateWay
.putKv(
RpcMessageTestUtils.newPutKvRequest(
newPutKvRequest(
tableId,
bucketId,
-1,
Expand All @@ -211,10 +213,8 @@ void testPutKvNeedAck() throws Exception {
bucketId);

// check leader log data.
RpcMessageTestUtils.assertFetchLogResponseWithRowKind(
leaderGateWay
.fetchLog(RpcMessageTestUtils.newFetchLogRequest(-1, tableId, bucketId, 0L))
.get(),
assertFetchLogResponseWithRowKind(
leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, bucketId, 0L)).get(),
tableId,
bucketId,
8L,
Expand All @@ -228,6 +228,19 @@ void testPutKvNeedAck() throws Exception {
.collect(Collectors.toList())) {
ReplicaManager replicaManager =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(followId).getReplicaManager();

// wait util follower highWaterMark equals leader. So we can fetch log from follower
// before highWaterMark.
retry(
Duration.ofMinutes(1),
() ->
assertThat(
replicaManager
.getReplicaOrException(tb)
.getLogTablet()
.getHighWatermark())
.isEqualTo(8L));

CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> future =
new CompletableFuture<>();
// mock client fetch from follower.
Expand All @@ -243,17 +256,6 @@ void testPutKvNeedAck() throws Exception {
assertThat(records).isNotNull();
assertLogRecordsEqualsWithRowKind(
DATA1_ROW_TYPE, records, EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK);

// wait util follower highWaterMark equals leader.
retry(
Duration.ofMinutes(1),
() ->
assertThat(
replicaManager
.getReplicaOrException(tb)
.getLogTablet()
.getHighWatermark())
.isEqualTo(8L));
}
}

Expand All @@ -263,7 +265,7 @@ void testFlushForPutKvNeedAck() throws Exception {

// create a table and wait all replica ready
long tableId =
RpcMessageTestUtils.createTable(
createTable(
FLUSS_CLUSTER_EXTENSION,
DATA1_TABLE_PATH_PK,
data1PkTableInfo.getTableDescriptor());
Expand Down Expand Up @@ -298,8 +300,7 @@ void testFlushForPutKvNeedAck() throws Exception {
Tuple2.of("k2", new Object[] {3, "b1"}));

CompletableFuture<PutKvResponse> putResponse =
leaderGateWay.putKv(
RpcMessageTestUtils.newPutKvRequest(tableId, bucketId, -1, kvRecords));
leaderGateWay.putKv(newPutKvRequest(tableId, bucketId, -1, kvRecords));

// wait util the log has been written
Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
Expand Down

0 comments on commit 00fff20

Please sign in to comment.