From 34565d8e74169641accff019bbf1ed2a3d9a25ec Mon Sep 17 00:00:00 2001 From: Robert Casey Date: Thu, 12 Jan 2017 12:08:18 -0600 Subject: [PATCH] Re-factored ZkUtilsWrapper due to underlying API change with Kafka's ZkUtils. Re-factored usage of ZkUtilsWrapper in project. Updated KafkaOffsetGetter unit tests to test new implementation. All previously existing unit tests are now back in place and passing. --- .../com/quantifind/kafka/OffsetGetter.scala | 11 +- .../kafka/core/KafkaOffsetGetter.scala | 15 +- .../kafka/core/StormOffsetGetter.scala | 5 +- .../kafka/core/ZKOffsetGetter.scala | 8 +- .../com/quantifind/utils/ZkUtilsWrapper.scala | 135 ++++++++++++++++++ .../kafka/core/KafkaOffsetGetterSpec.scala | 28 ++-- .../kafka/core/StormOffsetGetterSpec.scala | 10 +- .../kafka/core/ZKOffsetGetterSpec.scala | 11 +- 8 files changed, 182 insertions(+), 41 deletions(-) create mode 100644 src/main/scala/com/quantifind/utils/ZkUtilsWrapper.scala diff --git a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala index b3e67b8..0b6de78 100644 --- a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala @@ -3,6 +3,7 @@ package com.quantifind.kafka import com.quantifind.kafka.core._ import com.quantifind.kafka.offsetapp.OffsetGetterArgs import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} +import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time import java.util.concurrent.atomic.AtomicBoolean @@ -21,11 +22,9 @@ import scala.util.control.NonFatal case class Node(name: String, children: Seq[Node] = Seq()) case class TopicDetails(consumers: Seq[ConsumerDetail]) - case class TopicDetailsWrapper(consumers: TopicDetails) case class TopicAndConsumersDetails(active: Seq[KafkaInfo], inactive: Seq[KafkaInfo]) - case class TopicAndConsumersDetailsWrapper(consumers: TopicAndConsumersDetails) case class ConsumerDetail(name: String) @@ -34,7 +33,7 @@ trait OffsetGetter extends Logging { val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() - def zkUtils: ZkUtils + def zkUtils: ZkUtilsWrapper // kind of interface methods def getTopicList(group: String): List[String] @@ -217,7 +216,7 @@ object OffsetGetter { } val kafkaOffsetListenerStarted: AtomicBoolean = new AtomicBoolean(false) - var zkUtils: ZkUtils = null + var zkUtils: ZkUtilsWrapper = null var consumerConnector: ConsumerConnector = null var newKafkaConsumer: KafkaConsumer[String, String] = null @@ -231,7 +230,7 @@ object OffsetGetter { def getInstance(args: OffsetGetterArgs): OffsetGetter = { if (kafkaOffsetListenerStarted.compareAndSet(false, true)) { - zkUtils = createZkUtils(args) + zkUtils = new ZkUtilsWrapper(createZkUtils(args)) args.offsetStorage.toLowerCase match { @@ -245,7 +244,7 @@ object OffsetGetter { args.offsetStorage.toLowerCase match { case "kafka" => - new KafkaOffsetGetter(zkUtils, args) + new KafkaOffsetGetter(args) case "storm" => new StormOffsetGetter(zkUtils, args.stormZKOffsetBase) case _ => diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index ff6ee38..588940f 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -25,11 +25,13 @@ import scala.concurrent.{Await, Future, duration} /** * Created by rcasey on 11/16/2016. */ -class KafkaOffsetGetter(theZkUtils: ZkUtils, args: OffsetGetterArgs) extends OffsetGetter { +class KafkaOffsetGetter(args: OffsetGetterArgs) extends OffsetGetter { import KafkaOffsetGetter._ - override val zkUtils = theZkUtils + // TODO: We will get all data from the Kafka broker in this class. This is here simply to satisfy + // the OffsetGetter dependency until it can be refactored. + override val zkUtils = null override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = { @@ -47,8 +49,13 @@ class KafkaOffsetGetter(theZkUtils: ZkUtils, args: OffsetGetterArgs) extends Off val lag: Long = logEndOffset.get - committedOffset val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get - val client: Option[ClientGroup] = Option(clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition))).head) - val clientString: Option[String] = if (client.isDefined) Option(client.get.clientId + client.get.clientHost) else Option("NA") + // Get client information if we can find an associated client + var clientString: Option[String] = Option("NA") + val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition))) + if (!filteredClients.isEmpty) { + val client: ClientGroup = filteredClients.head + clientString = Option(client.clientId + client.clientHost) + } OffsetInfo(group = group, topic = topic, diff --git a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala index 5b64964..1a59430 100644 --- a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala @@ -2,10 +2,11 @@ package com.quantifind.kafka.core import com.quantifind.kafka.OffsetGetter import com.quantifind.kafka.OffsetGetter.OffsetInfo +import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition -import kafka.utils.{Json, ZkUtils} +import kafka.utils.{Json} import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.zookeeper.data.Stat @@ -15,7 +16,7 @@ import scala.util.control.NonFatal /** * a version that manages offsets saved by Storm Kafka Spout */ -class StormOffsetGetter(theZkUtils: ZkUtils, zkOffsetBase: String) extends OffsetGetter { +class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extends OffsetGetter { override val zkUtils = theZkUtils diff --git a/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala index 16b5380..edc5b11 100644 --- a/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala @@ -1,13 +1,15 @@ package com.quantifind.kafka.core import com.quantifind.kafka.OffsetGetter -import OffsetGetter.OffsetInfo +import com.quantifind.kafka.OffsetGetter.OffsetInfo +import com.quantifind.utils.ZkUtilsWrapper + import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition import kafka.utils.ZkUtils -import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.zookeeper.data.Stat +import org.I0Itec.zkclient.exception.ZkNoNodeException import scala.collection._ import scala.util.control.NonFatal @@ -17,7 +19,7 @@ import scala.util.control.NonFatal * User: pierre * Date: 1/22/14 */ -class ZKOffsetGetter(theZkUtils: ZkUtils) extends OffsetGetter { +class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter { override val zkUtils = theZkUtils diff --git a/src/main/scala/com/quantifind/utils/ZkUtilsWrapper.scala b/src/main/scala/com/quantifind/utils/ZkUtilsWrapper.scala new file mode 100644 index 0000000..48fa740 --- /dev/null +++ b/src/main/scala/com/quantifind/utils/ZkUtilsWrapper.scala @@ -0,0 +1,135 @@ +package com.quantifind.utils + +import java.util + +import kafka.api.{ApiVersion, LeaderAndIsr} +import kafka.cluster.{EndPoint, BrokerEndPoint, Cluster, Broker} +import kafka.common.TopicAndPartition +import kafka.consumer.ConsumerThreadId +import kafka.controller.{ReassignedPartitionsContext, LeaderIsrAndControllerEpoch} +import kafka.utils.ZkUtils +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.zookeeper.data.{ACL, Stat} + +import scala.collection.mutable + +/* + This class is mainly to help us mock the ZkUtils class. It is really painful to get powermock to work with scalatest, + so we created this class with a little help from IntelliJ to auto-generate the delegation code + */ +class ZkUtilsWrapper(zkUtils: ZkUtils) { + + val ConsumersPath = ZkUtils.ConsumersPath + + val delegator = zkUtils + + //def updatePersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updatePersistentPath(path, data, acls) + + //def updatePartitionReassignmentData(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): Unit = delegator.updatePartitionReassignmentData(partitionsToBeReassigned) + + //def updateEphemeralPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updateEphemeralPath(path, data, acls) + + //def setupCommonPaths(): Unit = delegator.setupCommonPaths() + + //def replicaAssignmentZkData(map: collection.Map[String, Seq[Int]]): String = delegator.replicaAssignmentZkData(map) + + //def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int, rack: Option[String], apiVersion: ApiVersion): Unit = delegator.registerBrokerInZk(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion) + + def readDataMaybeNull(path: String): (Option[String], Stat) = delegator.readDataMaybeNull(path) + + def readData(path: String): (String, Stat) = delegator.readData(path) + + //def pathExists(path: String): Boolean = delegator.pathExists(path) + + //def parseTopicsData(jsonData: String): Seq[String] = delegator.parseTopicsData(jsonData) + + //def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = delegator.parsePartitionReassignmentDataWithoutDedup(jsonData) + + //def parsePartitionReassignmentData(jsonData: String): collection.Map[TopicAndPartition, Seq[Int]] = delegator.parsePartitionReassignmentData(jsonData) + + //def makeSurePersistentPathExists(path: String, acls: util.List[ACL]): Unit = delegator.makeSurePersistentPathExists(path, acls) + + //def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = delegator.leaderAndIsrZkData(leaderAndIsr, controllerEpoch) + + //def getTopicsByConsumerGroup(consumerGroup: String): Seq[String] = delegator.getTopicsByConsumerGroup(consumerGroup) + + //def getSortedBrokerList(): Seq[Int] = delegator.getSortedBrokerList() + + //def getSequenceId(path: String, acls: util.List[ACL]): Int = delegator.getSequenceId(path, acls) + + //def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getReplicasForPartition(topic, partition) + + //def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = delegator.getReplicaAssignmentForTopics(topics) + + //def getPartitionsUndergoingPreferredReplicaElection(): collection.Set[TopicAndPartition] = delegator.getPartitionsUndergoingPreferredReplicaElection() + + def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = delegator.getPartitionsForTopics(topics) + + //def getPartitionsBeingReassigned(): collection.Map[TopicAndPartition, ReassignedPartitionsContext] = delegator.getPartitionsBeingReassigned() + + //def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: collection.Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = delegator.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions) + + //def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = delegator.getPartitionAssignmentForTopics(topics) + + def getLeaderForPartition(topic: String, partition: Int): Option[Int] = delegator.getLeaderForPartition(topic, partition) + + //def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = delegator.getLeaderAndIsrForPartition(topic, partition) + + //def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getInSyncReplicasForPartition(topic, partition) + + //def getEpochForPartition(topic: String, partition: Int): Int = delegator.getEpochForPartition(topic, partition) + + //def getController(): Int = delegator.getController() + + def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = delegator.getConsumersPerTopic(group, excludeInternalTopics) + + //def getConsumersInGroup(group: String): Seq[String] = delegator.getConsumersInGroup(group) + + //def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = delegator.getConsumerPartitionOwnerPath(group, topic, partition) + + //def getConsumerGroups(): Seq[String] = delegator.getConsumerGroups() + + //def getChildrenParentMayNotExist(path: String): Seq[String] = delegator.getChildrenParentMayNotExist(path) + + def getChildren(path: String): Seq[String] = delegator.getChildren(path) + + //def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = delegator.getBrokerSequenceId(MaxReservedBrokerId) + + //def getBrokerInfo(brokerId: Int): Option[Broker] = delegator.getBrokerInfo(brokerId) + + //def getAllTopics(): Seq[String] = delegator.getAllTopics() + + //def getAllPartitions(): collection.Set[TopicAndPartition] = delegator.getAllPartitions() + + //def getAllEntitiesWithConfig(entityType: String): Seq[String] = delegator.getAllEntitiesWithConfig(entityType) + + //def getAllConsumerGroupsForTopic(topic: String): collection.Set[String] = delegator.getAllConsumerGroupsForTopic(topic) + + def getAllBrokersInCluster(): Seq[Broker] = delegator.getAllBrokersInCluster() + + //def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = delegator.getAllBrokerEndPointsForChannel(protocolType) + + //def formatAsReassignmentJson(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): String = delegator.formatAsReassignmentJson(partitionsToBeReassigned) + + //def deletePathRecursive(path: String): Unit = delegator.deletePathRecursive(path) + + //def deletePath(path: String): Boolean = delegator.deletePath(path) + + //def deletePartition(brokerId: Int, topic: String): Unit = delegator.deletePartition(brokerId, topic) + + //def createSequentialPersistentPath(path: String, data: String, acls: util.List[ACL]): String = delegator.createSequentialPersistentPath(path, data, acls) + + //def createPersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createPersistentPath(path, data, acls) + + //def createEphemeralPathExpectConflict(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createEphemeralPathExpectConflict(path, data, acls) + + //def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = delegator.conditionalUpdatePersistentPathIfExists(path, data, expectVersion) + + //def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int, optionalChecker: Option[(ZkUtils, String, String) => (Boolean, Int)]): (Boolean, Int) = delegator.conditionalUpdatePersistentPath(path, data, expectVersion, optionalChecker) + + //def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = delegator.conditionalDeletePath(path, expectedVersion) + + def close(): Unit = delegator.close() + +} \ No newline at end of file diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index 7ab1444..8bfbc92 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -1,27 +1,27 @@ -/* package com.quantifind.kafka.core +import com.quantifind.kafka.offsetapp.OffsetGetterArgs import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse} import kafka.common.{OffsetAndMetadata, TopicAndPartition} import kafka.coordinator.GroupTopicPartition import kafka.consumer.SimpleConsumer import kafka.utils.ZkUtils -import org.I0Itec.zkclient.ZkClient import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.mockito.{Matchers => MockitoMatchers, Mockito} +import org.mockito.{Mockito, Matchers => MockitoMatchers} import org.scalatest._ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { trait Fixture { - val mockedZkClient = Mockito.mock(classOf[ZkClient]) val mockedZkUtil = Mockito.mock(classOf[ZkUtils]) val mockedConsumer = Mockito.mock(classOf[SimpleConsumer]) val testPartitionLeader = 1 - val offsetGetter = new KafkaOffsetGetter(mockedZkUtil) + val args = new OffsetGetterArgs + + val offsetGetter = new KafkaOffsetGetter(args) offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer)) } @@ -30,16 +30,21 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val testGroup = "testgroup" val testTopic = "testtopic" val testPartition = 1 + val committedOffset = 100 + val logEndOffset = 102 val topicAndPartition = TopicAndPartition(testTopic, testPartition) val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition) - val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis) + val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis) KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata) - when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition))) + //topicPartitionOffsetsMap + KafkaOffsetGetter.topicPartitionOffsetsMap += (topicAndPartition -> logEndOffset) + + when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition))) .thenReturn(Some(testPartitionLeader)) - val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102))) + val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(logEndOffset))) val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets) when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse) @@ -48,11 +53,10 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { offsetInfo.topic shouldBe testTopic offsetInfo.group shouldBe testGroup offsetInfo.partition shouldBe testPartition - offsetInfo.offset shouldBe 100 - offsetInfo.logSize shouldBe 102 + offsetInfo.offset shouldBe committedOffset + offsetInfo.logSize shouldBe logEndOffset case None => fail("Failed to build offset data") } } -} -*/ \ No newline at end of file +} \ No newline at end of file diff --git a/src/test/scala/com/quantifind/kafka/core/StormOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/StormOffsetGetterSpec.scala index a496dba..a445721 100644 --- a/src/test/scala/com/quantifind/kafka/core/StormOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/StormOffsetGetterSpec.scala @@ -1,8 +1,6 @@ -/* package com.quantifind.kafka.core import com.quantifind.utils.ZkUtilsWrapper -import org.I0Itec.zkclient.ZkClient import org.apache.zookeeper.data.Stat import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -13,11 +11,10 @@ class StormOffsetGetterSpec extends FlatSpec with ShouldMatchers { trait Fixture { - val mockedZkClient = Mockito.mock(classOf[ZkClient]) val zkOffsetBase = "/stormconsumers" val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper]) - val offsetGetter = new StormOffsetGetter(mockedZkClient, zkOffsetBase, mockedZkUtil) + val offsetGetter = new StormOffsetGetter(mockedZkUtil, zkOffsetBase) } "StormOffsetGetter" should "be able to extract topic from persisted spout state" in new Fixture { @@ -38,12 +35,11 @@ class StormOffsetGetterSpec extends FlatSpec with ShouldMatchers { } }""" val ret = (spoutState, Mockito.mock(classOf[Stat])) - when(mockedZkUtil.readData(MockitoMatchers.eq(mockedZkClient), anyString)).thenReturn(ret) + when(mockedZkUtil.readData(anyString)).thenReturn(ret) val topics = offsetGetter.getTopicList(testGroup) topics.size shouldBe 1 topics(0) shouldBe testTopic } -} -*/ \ No newline at end of file +} \ No newline at end of file diff --git a/src/test/scala/com/quantifind/kafka/core/ZKOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/ZKOffsetGetterSpec.scala index 73d2ed2..449d7aa 100644 --- a/src/test/scala/com/quantifind/kafka/core/ZKOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/ZKOffsetGetterSpec.scala @@ -1,4 +1,3 @@ -/* package com.quantifind.kafka.core import com.quantifind.utils.ZkUtilsWrapper @@ -12,10 +11,9 @@ class ZKOffsetGetterSpec extends FlatSpec with ShouldMatchers { trait Fixture { - val mockedZkClient = Mockito.mock(classOf[ZkClient]) val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper]) - val offsetGetter = new ZKOffsetGetter(mockedZkClient, mockedZkUtil) + val offsetGetter = new ZKOffsetGetter(mockedZkUtil) } "ZKOffsetGetter" should "be able to fetch topic list" in new Fixture { @@ -24,7 +22,7 @@ class ZKOffsetGetterSpec extends FlatSpec with ShouldMatchers { val testTopic1 = "testtopic1" val testTopic2 = "testtopic2" - when(mockedZkUtil.getChildren(MockitoMatchers.eq(mockedZkClient), anyString)).thenReturn(Seq(testTopic1, testTopic2)) + when(mockedZkUtil.getChildren(anyString)).thenReturn(Seq(testTopic1, testTopic2)) val topics = offsetGetter.getTopicList(testGroup) @@ -38,7 +36,7 @@ class ZKOffsetGetterSpec extends FlatSpec with ShouldMatchers { val testGroup1 = "testgroup1" val testGroup2 = "testgroup2" - when(mockedZkUtil.getChildren(MockitoMatchers.eq(mockedZkClient), anyString)).thenReturn(Seq(testGroup1, testGroup2)) + when(mockedZkUtil.getChildren(anyString)).thenReturn(Seq(testGroup1, testGroup2)) val groups = offsetGetter.getGroups @@ -46,5 +44,4 @@ class ZKOffsetGetterSpec extends FlatSpec with ShouldMatchers { groups(0) shouldBe testGroup1 groups(1) shouldBe testGroup2 } -} -*/ \ No newline at end of file +} \ No newline at end of file