Skip to content

Commit

Permalink
[FLINK-36934][network] Enrich numSubpartitions to TierMasterAgent#add…
Browse files Browse the repository at this point in the history
…PartitionAndGetShuffleDescriptor
  • Loading branch information
reswqa committed Dec 20, 2024
1 parent a0d2d87 commit 1523f2c
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public void unregisterJob(JobID jobID) {
}

public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID) {
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
return tieredStorageMasterClient.addPartitionAndGetShuffleDescriptor(
jobID, resultPartitionID);
jobID, numSubpartitions, resultPartitionID);
}

public void releasePartition(ShuffleDescriptor shuffleDescriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ public void unregisterJob(JobID jobID) {
}

public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID) {
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
return tiers.stream()
.map(
tierMasterAgent ->
tierMasterAgent.addPartitionAndGetShuffleDescriptor(
jobID, resultPartitionID))
jobID, numSubpartitions, resultPartitionID))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void unregisterJob(JobID jobID) {

@Override
public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID) {
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
// noop
return NoOpTierShuffleDescriptor.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface TierMasterAgent {

/** Add a new tiered storage partition and get the {@link TierShuffleDescriptor}. */
TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID);
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID);

/**
* Release a tiered storage partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void unregisterJob(JobID jobID) {

@Override
public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID) {
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
TieredStoragePartitionId partitionId = convertId(resultPartitionID);
resourceRegistry.registerResource(
partitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(
if (tieredInternalShuffleMaster != null) {
tierShuffleDescriptors =
tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor(
jobID, resultPartitionID);
jobID,
partitionDescriptor.getNumberOfSubpartitions(),
resultPartitionID);
}

NettyShuffleDescriptor shuffleDeploymentDescriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void testAddAndReleasePartition() throws IOException {
RemoteTierMasterAgent masterAgent =
new RemoteTierMasterAgent(tempFolder.getAbsolutePath(), resourceRegistry);
TierShuffleDescriptor tierShuffleDescriptor =
masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), resultPartitionID);
masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), 1, resultPartitionID);
assertThat(partitionFile.exists()).isTrue();
masterAgent.releasePartition(tierShuffleDescriptor);

Expand Down

0 comments on commit 1523f2c

Please sign in to comment.