diff --git a/src/main/java/com/riferrei/kafka/core/BucketPriorityAssignor.java b/src/main/java/com/riferrei/kafka/core/BucketPriorityAssignor.java index 2e9913f..a780f1a 100644 --- a/src/main/java/com/riferrei/kafka/core/BucketPriorityAssignor.java +++ b/src/main/java/com/riferrei/kafka/core/BucketPriorityAssignor.java @@ -66,9 +66,14 @@ public void configure(Map configs) { throw new InvalidConfigurationException("The bucket allocation " + "is incorrect. The sum of all buckets needs to be 100."); } - fallback = config.getConfiguredInstance( - BucketPriorityConfig.FALLBACK_ASSIGNOR_CONFIG, - AbstractPartitionAssignor.class); + try { + fallback = config.getConfiguredInstance( + BucketPriorityConfig.FALLBACK_ASSIGNOR_CONFIG, + AbstractPartitionAssignor.class); + } catch (Exception ex) { + throw new InvalidConfigurationException("The fallback " + + "assignor configured is invalid.", ex); + } buckets = new LinkedHashMap<>(); for (int i = 0; i < config.buckets().size(); i++) { String bucketName = config.buckets().get(i).trim(); diff --git a/src/main/java/com/riferrei/kafka/core/BucketPriorityPartitioner.java b/src/main/java/com/riferrei/kafka/core/BucketPriorityPartitioner.java index aabb10d..796e826 100644 --- a/src/main/java/com/riferrei/kafka/core/BucketPriorityPartitioner.java +++ b/src/main/java/com/riferrei/kafka/core/BucketPriorityPartitioner.java @@ -57,9 +57,14 @@ public void configure(Map configs) { throw new InvalidConfigurationException("The bucket allocation " + "is incorrect. The sum of all buckets needs to be 100."); } - fallback = config.getConfiguredInstance( - BucketPriorityConfig.FALLBACK_PARTITIONER_CONFIG, - Partitioner.class); + try { + fallback = config.getConfiguredInstance( + BucketPriorityConfig.FALLBACK_PARTITIONER_CONFIG, + Partitioner.class); + } catch (Exception ex) { + throw new InvalidConfigurationException("The fallback " + + "partitioner configured is invalid.", ex); + } lastBucket = new ThreadLocal<>(); buckets = new LinkedHashMap<>(); for (int i = 0; i < config.buckets().size(); i++) { diff --git a/src/test/java/com/riferrei/kafka/core/BucketPriorityAssignorTest.java b/src/test/java/com/riferrei/kafka/core/BucketPriorityAssignorTest.java index 1c30181..beda85c 100644 --- a/src/test/java/com/riferrei/kafka/core/BucketPriorityAssignorTest.java +++ b/src/test/java/com/riferrei/kafka/core/BucketPriorityAssignorTest.java @@ -94,6 +94,20 @@ public void checkAllocationPercentageConfiguration() { }); } + @Test + public void checkIfFallbackPartitionerIsValid() { + final Map configs = new HashMap<>(); + final BucketPriorityAssignor assignor = new BucketPriorityAssignor(); + assertThrows(InvalidConfigurationException.class, () -> { + configs.put(BucketPriorityConfig.TOPIC_CONFIG, "test"); + configs.put(BucketPriorityConfig.BUCKETS_CONFIG, "B1, B2"); + configs.put(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%"); + configs.put(BucketPriorityConfig.FALLBACK_ASSIGNOR_CONFIG, + BucketPriorityAssignorTest.class.getName()); + assignor.configure(configs); + }); + } + @Test public void checkIfMinNumberPartitionsIsRespected() { final String topic = "test"; diff --git a/src/test/java/com/riferrei/kafka/core/BucketPriorityPartitionerTest.java b/src/test/java/com/riferrei/kafka/core/BucketPriorityPartitionerTest.java index 17bedcb..8c34c12 100644 --- a/src/test/java/com/riferrei/kafka/core/BucketPriorityPartitionerTest.java +++ b/src/test/java/com/riferrei/kafka/core/BucketPriorityPartitionerTest.java @@ -102,6 +102,21 @@ public void checkAllocationPercentageConfiguration() { } } + @Test + public void checkIfFallbackPartitionerIsValid() { + final Map configs = new HashMap<>(); + try (BucketPriorityPartitioner partitioner = new BucketPriorityPartitioner()) { + assertThrows(InvalidConfigurationException.class, () -> { + configs.put(BucketPriorityConfig.TOPIC_CONFIG, "test"); + configs.put(BucketPriorityConfig.BUCKETS_CONFIG, "B1, B2"); + configs.put(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%"); + configs.put(BucketPriorityConfig.FALLBACK_PARTITIONER_CONFIG, + BucketPriorityPartitionerTest.class.getName()); + partitioner.configure(configs); + }); + } + } + @Test public void checkIfMinNumberPartitionsIsRespected() { final String topic = "test";