diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java index 01cc54686920b..2d8ce328ca103 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java @@ -81,9 +81,9 @@ public void unregisterJob(JobID jobID) { } public List addPartitionAndGetShuffleDescriptor( - JobID jobID, ResultPartitionID resultPartitionID) { + JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) { return tieredStorageMasterClient.addPartitionAndGetShuffleDescriptor( - jobID, resultPartitionID); + jobID, numSubpartitions, resultPartitionID); } public void releasePartition(ShuffleDescriptor shuffleDescriptor) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java index 457dcffef251d..5bb857358a74d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java @@ -49,12 +49,12 @@ public void unregisterJob(JobID jobID) { } public List addPartitionAndGetShuffleDescriptor( - JobID jobID, ResultPartitionID resultPartitionID) { + JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) { return tiers.stream() .map( tierMasterAgent -> tierMasterAgent.addPartitionAndGetShuffleDescriptor( - jobID, resultPartitionID)) + jobID, numSubpartitions, resultPartitionID)) .collect(Collectors.toList()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java index bd833d69e4537..2fde8f96fb4bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java @@ -38,7 +38,7 @@ public void unregisterJob(JobID jobID) { @Override public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( - JobID jobID, ResultPartitionID resultPartitionID) { + JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) { // noop return NoOpTierShuffleDescriptor.INSTANCE; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java index e0d20d24082b3..ee02e14655f74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java @@ -32,7 +32,7 @@ public interface TierMasterAgent { /** Add a new tiered storage partition and get the {@link TierShuffleDescriptor}. */ TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( - JobID jobID, ResultPartitionID resultPartitionID); + JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID); /** * Release a tiered storage partition. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java index 924993b875a31..7ff29fe01bca3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java @@ -56,7 +56,7 @@ public void unregisterJob(JobID jobID) { @Override public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( - JobID jobID, ResultPartitionID resultPartitionID) { + JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) { TieredStoragePartitionId partitionId = convertId(resultPartitionID); resourceRegistry.registerResource( partitionId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java index 26a31a5746a10..c186505301d67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java @@ -108,7 +108,9 @@ public CompletableFuture registerPartitionWithProducer( if (tieredInternalShuffleMaster != null) { tierShuffleDescriptors = tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor( - jobID, resultPartitionID); + jobID, + partitionDescriptor.getNumberOfSubpartitions(), + resultPartitionID); } NettyShuffleDescriptor shuffleDeploymentDescriptor = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java index 73b2d65cd24c4..245e8b0ee891a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java @@ -52,7 +52,7 @@ void testAddAndReleasePartition() throws IOException { RemoteTierMasterAgent masterAgent = new RemoteTierMasterAgent(tempFolder.getAbsolutePath(), resourceRegistry); TierShuffleDescriptor tierShuffleDescriptor = - masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), resultPartitionID); + masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), 1, resultPartitionID); assertThat(partitionFile.exists()).isTrue(); masterAgent.releasePartition(tierShuffleDescriptor);