Skip to content

Commit

Permalink
111
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Dec 12, 2024
1 parent 20222ee commit 0bf5a76
Showing 1 changed file with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.MemoryLogRecords;
import com.alibaba.fluss.rpc.RpcClient;
Expand All @@ -47,6 +48,7 @@
import static com.alibaba.fluss.client.admin.ClientToServerITCaseUtils.createTable;
import static com.alibaba.fluss.client.admin.ClientToServerITCaseUtils.initConfig;
import static com.alibaba.fluss.record.TestData.DATA1;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
Expand All @@ -65,61 +67,64 @@ public class LogFetcherTest {
private LogFetcher logFetcher;
private Connection conn;
private Admin admin;
private long tableId;
private final int bucketId0 = 0;
private final int bucketId1 = 1;
private RpcClient rpcClient;
private Configuration clientConf;
private MetadataUpdater metadataUpdater;

// TODO covert this test to UT as kafka.

@BeforeEach
protected void setup() throws Exception {
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
this.clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
conn = ConnectionFactory.createConnection(clientConf);
admin = conn.getAdmin();

this.rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
this.metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
}

@AfterEach
protected void teardown() throws Exception {
if (admin != null) {
admin.close();
admin = null;
}

if (conn != null) {
conn.close();
conn = null;
}
}

@Test
void testFetch() throws Exception {
// We create table data1NonPkTablePath previously.
TablePath tablePath = TablePath.of("test_db_1", "test_table_for_log_fetcher");
tableId = createTable(admin, tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false);
long tableId = createTable(admin, tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false);
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);

RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath));

Map<TableBucket, Long> scanBuckets = new HashMap<>();
// add bucket 0 and bucket 1 to log scanner status.
int bucketId0 = 0;
scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
int bucketId1 = 1;
scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
LogScannerStatus logScannerStatus = new LogScannerStatus();
logScannerStatus.assignScanBuckets(scanBuckets);
TestingScannerMetricGroup scannerMetricGroup = TestingScannerMetricGroup.newInstance();
TableInfo tableInfo = new TableInfo(tablePath, tableId, DATA1_TABLE_DESCRIPTOR, 1);
logFetcher =
new LogFetcher(
DATA1_TABLE_INFO,
tableInfo,
null,
rpcClient,
logScannerStatus,
clientConf,
metadataUpdater,
scannerMetricGroup,
new RemoteFileDownloader(1));
}

@AfterEach
protected void teardown() throws Exception {
if (admin != null) {
admin.close();
admin = null;
}

if (conn != null) {
conn.close();
conn = null;
}
}

@Test
void testFetch() throws Exception {
// add one batch records to tb0.
TableBucket tb0 = new TableBucket(tableId, bucketId0);
addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
Expand Down

0 comments on commit 0bf5a76

Please sign in to comment.