Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
rcasey212 committed Jan 12, 2017
2 parents 0689722 + d41842f commit c08f6e7
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 41 deletions.
11 changes: 5 additions & 6 deletions src/main/scala/com/quantifind/kafka/OffsetGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.quantifind.kafka
import com.quantifind.kafka.core._
import com.quantifind.kafka.offsetapp.OffsetGetterArgs
import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo}
import com.quantifind.utils.ZkUtilsWrapper
import com.twitter.util.Time

import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -21,11 +22,9 @@ import scala.util.control.NonFatal
case class Node(name: String, children: Seq[Node] = Seq())

case class TopicDetails(consumers: Seq[ConsumerDetail])

case class TopicDetailsWrapper(consumers: TopicDetails)

case class TopicAndConsumersDetails(active: Seq[KafkaInfo], inactive: Seq[KafkaInfo])

case class TopicAndConsumersDetailsWrapper(consumers: TopicAndConsumersDetails)

case class ConsumerDetail(name: String)
Expand All @@ -34,7 +33,7 @@ trait OffsetGetter extends Logging {

val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()

def zkUtils: ZkUtils
def zkUtils: ZkUtilsWrapper

// kind of interface methods
def getTopicList(group: String): List[String]
Expand Down Expand Up @@ -217,7 +216,7 @@ object OffsetGetter {
}

val kafkaOffsetListenerStarted: AtomicBoolean = new AtomicBoolean(false)
var zkUtils: ZkUtils = null
var zkUtils: ZkUtilsWrapper = null
var consumerConnector: ConsumerConnector = null
var newKafkaConsumer: KafkaConsumer[String, String] = null

Expand All @@ -231,7 +230,7 @@ object OffsetGetter {
def getInstance(args: OffsetGetterArgs): OffsetGetter = {

if (kafkaOffsetListenerStarted.compareAndSet(false, true)) {
zkUtils = createZkUtils(args)
zkUtils = new ZkUtilsWrapper(createZkUtils(args))

args.offsetStorage.toLowerCase match {

Expand All @@ -245,7 +244,7 @@ object OffsetGetter {

args.offsetStorage.toLowerCase match {
case "kafka" =>
new KafkaOffsetGetter(zkUtils, args)
new KafkaOffsetGetter(args)
case "storm" =>
new StormOffsetGetter(zkUtils, args.stormZKOffsetBase)
case _ =>
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import scala.concurrent.{Await, Future, duration}
/**
* Created by rcasey on 11/16/2016.
*/
class KafkaOffsetGetter(theZkUtils: ZkUtils, args: OffsetGetterArgs) extends OffsetGetter {
class KafkaOffsetGetter(args: OffsetGetterArgs) extends OffsetGetter {

import KafkaOffsetGetter._

override val zkUtils = theZkUtils
// TODO: We will get all data from the Kafka broker in this class. This is here simply to satisfy
// the OffsetGetter dependency until it can be refactored.
override val zkUtils = null

override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {

Expand All @@ -47,8 +49,13 @@ class KafkaOffsetGetter(theZkUtils: ZkUtils, args: OffsetGetterArgs) extends Off
val lag: Long = logEndOffset.get - committedOffset
val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get

val client: Option[ClientGroup] = Option(clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition))).head)
val clientString: Option[String] = if (client.isDefined) Option(client.get.clientId + client.get.clientHost) else Option("NA")
// Get client information if we can find an associated client
var clientString: Option[String] = Option("NA")
val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition)))
if (!filteredClients.isEmpty) {
val client: ClientGroup = filteredClients.head
clientString = Option(client.clientId + client.clientHost)
}

OffsetInfo(group = group,
topic = topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package com.quantifind.kafka.core

import com.quantifind.kafka.OffsetGetter
import com.quantifind.kafka.OffsetGetter.OffsetInfo
import com.quantifind.utils.ZkUtilsWrapper
import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.utils.{Json, ZkUtils}
import kafka.utils.{Json}
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.zookeeper.data.Stat

Expand All @@ -15,7 +16,7 @@ import scala.util.control.NonFatal
/**
* a version that manages offsets saved by Storm Kafka Spout
*/
class StormOffsetGetter(theZkUtils: ZkUtils, zkOffsetBase: String) extends OffsetGetter {
class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extends OffsetGetter {

override val zkUtils = theZkUtils

Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.quantifind.kafka.core

import com.quantifind.kafka.OffsetGetter
import OffsetGetter.OffsetInfo
import com.quantifind.kafka.OffsetGetter.OffsetInfo
import com.quantifind.utils.ZkUtilsWrapper

import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.zookeeper.data.Stat
import org.I0Itec.zkclient.exception.ZkNoNodeException

import scala.collection._
import scala.util.control.NonFatal
Expand All @@ -17,7 +19,7 @@ import scala.util.control.NonFatal
* User: pierre
* Date: 1/22/14
*/
class ZKOffsetGetter(theZkUtils: ZkUtils) extends OffsetGetter {
class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter {

override val zkUtils = theZkUtils

Expand Down
135 changes: 135 additions & 0 deletions src/main/scala/com/quantifind/utils/ZkUtilsWrapper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.quantifind.utils

import java.util

import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{EndPoint, BrokerEndPoint, Cluster, Broker}
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerThreadId
import kafka.controller.{ReassignedPartitionsContext, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.data.{ACL, Stat}

import scala.collection.mutable

/*
This class is mainly to help us mock the ZkUtils class. It is really painful to get powermock to work with scalatest,
so we created this class with a little help from IntelliJ to auto-generate the delegation code
*/
class ZkUtilsWrapper(zkUtils: ZkUtils) {

val ConsumersPath = ZkUtils.ConsumersPath

val delegator = zkUtils

//def updatePersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updatePersistentPath(path, data, acls)

//def updatePartitionReassignmentData(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): Unit = delegator.updatePartitionReassignmentData(partitionsToBeReassigned)

//def updateEphemeralPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updateEphemeralPath(path, data, acls)

//def setupCommonPaths(): Unit = delegator.setupCommonPaths()

//def replicaAssignmentZkData(map: collection.Map[String, Seq[Int]]): String = delegator.replicaAssignmentZkData(map)

//def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int, rack: Option[String], apiVersion: ApiVersion): Unit = delegator.registerBrokerInZk(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion)

def readDataMaybeNull(path: String): (Option[String], Stat) = delegator.readDataMaybeNull(path)

def readData(path: String): (String, Stat) = delegator.readData(path)

//def pathExists(path: String): Boolean = delegator.pathExists(path)

//def parseTopicsData(jsonData: String): Seq[String] = delegator.parseTopicsData(jsonData)

//def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = delegator.parsePartitionReassignmentDataWithoutDedup(jsonData)

//def parsePartitionReassignmentData(jsonData: String): collection.Map[TopicAndPartition, Seq[Int]] = delegator.parsePartitionReassignmentData(jsonData)

//def makeSurePersistentPathExists(path: String, acls: util.List[ACL]): Unit = delegator.makeSurePersistentPathExists(path, acls)

//def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = delegator.leaderAndIsrZkData(leaderAndIsr, controllerEpoch)

//def getTopicsByConsumerGroup(consumerGroup: String): Seq[String] = delegator.getTopicsByConsumerGroup(consumerGroup)

//def getSortedBrokerList(): Seq[Int] = delegator.getSortedBrokerList()

//def getSequenceId(path: String, acls: util.List[ACL]): Int = delegator.getSequenceId(path, acls)

//def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getReplicasForPartition(topic, partition)

//def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = delegator.getReplicaAssignmentForTopics(topics)

//def getPartitionsUndergoingPreferredReplicaElection(): collection.Set[TopicAndPartition] = delegator.getPartitionsUndergoingPreferredReplicaElection()

def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = delegator.getPartitionsForTopics(topics)

//def getPartitionsBeingReassigned(): collection.Map[TopicAndPartition, ReassignedPartitionsContext] = delegator.getPartitionsBeingReassigned()

//def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: collection.Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = delegator.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)

//def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = delegator.getPartitionAssignmentForTopics(topics)

def getLeaderForPartition(topic: String, partition: Int): Option[Int] = delegator.getLeaderForPartition(topic, partition)

//def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = delegator.getLeaderAndIsrForPartition(topic, partition)

//def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getInSyncReplicasForPartition(topic, partition)

//def getEpochForPartition(topic: String, partition: Int): Int = delegator.getEpochForPartition(topic, partition)

//def getController(): Int = delegator.getController()

def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = delegator.getConsumersPerTopic(group, excludeInternalTopics)

//def getConsumersInGroup(group: String): Seq[String] = delegator.getConsumersInGroup(group)

//def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = delegator.getConsumerPartitionOwnerPath(group, topic, partition)

//def getConsumerGroups(): Seq[String] = delegator.getConsumerGroups()

//def getChildrenParentMayNotExist(path: String): Seq[String] = delegator.getChildrenParentMayNotExist(path)

def getChildren(path: String): Seq[String] = delegator.getChildren(path)

//def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = delegator.getBrokerSequenceId(MaxReservedBrokerId)

//def getBrokerInfo(brokerId: Int): Option[Broker] = delegator.getBrokerInfo(brokerId)

//def getAllTopics(): Seq[String] = delegator.getAllTopics()

//def getAllPartitions(): collection.Set[TopicAndPartition] = delegator.getAllPartitions()

//def getAllEntitiesWithConfig(entityType: String): Seq[String] = delegator.getAllEntitiesWithConfig(entityType)

//def getAllConsumerGroupsForTopic(topic: String): collection.Set[String] = delegator.getAllConsumerGroupsForTopic(topic)

def getAllBrokersInCluster(): Seq[Broker] = delegator.getAllBrokersInCluster()

//def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = delegator.getAllBrokerEndPointsForChannel(protocolType)

//def formatAsReassignmentJson(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): String = delegator.formatAsReassignmentJson(partitionsToBeReassigned)

//def deletePathRecursive(path: String): Unit = delegator.deletePathRecursive(path)

//def deletePath(path: String): Boolean = delegator.deletePath(path)

//def deletePartition(brokerId: Int, topic: String): Unit = delegator.deletePartition(brokerId, topic)

//def createSequentialPersistentPath(path: String, data: String, acls: util.List[ACL]): String = delegator.createSequentialPersistentPath(path, data, acls)

//def createPersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createPersistentPath(path, data, acls)

//def createEphemeralPathExpectConflict(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createEphemeralPathExpectConflict(path, data, acls)

//def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = delegator.conditionalUpdatePersistentPathIfExists(path, data, expectVersion)

//def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int, optionalChecker: Option[(ZkUtils, String, String) => (Boolean, Int)]): (Boolean, Int) = delegator.conditionalUpdatePersistentPath(path, data, expectVersion, optionalChecker)

//def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = delegator.conditionalDeletePath(path, expectedVersion)

def close(): Unit = delegator.close()

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
/*
package com.quantifind.kafka.core

import com.quantifind.kafka.offsetapp.OffsetGetterArgs
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.coordinator.GroupTopicPartition
import kafka.consumer.SimpleConsumer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.{Matchers => MockitoMatchers, Mockito}
import org.mockito.{Mockito, Matchers => MockitoMatchers}
import org.scalatest._

class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {

trait Fixture {

val mockedZkClient = Mockito.mock(classOf[ZkClient])
val mockedZkUtil = Mockito.mock(classOf[ZkUtils])
val mockedConsumer = Mockito.mock(classOf[SimpleConsumer])
val testPartitionLeader = 1

val offsetGetter = new KafkaOffsetGetter(mockedZkUtil)
val args = new OffsetGetterArgs

val offsetGetter = new KafkaOffsetGetter(args)
offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer))
}

Expand All @@ -30,16 +30,21 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
val testGroup = "testgroup"
val testTopic = "testtopic"
val testPartition = 1
val committedOffset = 100
val logEndOffset = 102

val topicAndPartition = TopicAndPartition(testTopic, testPartition)
val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition)
val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis)
val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis)
KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata)

when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
//topicPartitionOffsetsMap
KafkaOffsetGetter.topicPartitionOffsetsMap += (topicAndPartition -> logEndOffset)

when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
.thenReturn(Some(testPartitionLeader))

val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102)))
val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(logEndOffset)))
val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)

Expand All @@ -48,11 +53,10 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
offsetInfo.topic shouldBe testTopic
offsetInfo.group shouldBe testGroup
offsetInfo.partition shouldBe testPartition
offsetInfo.offset shouldBe 100
offsetInfo.logSize shouldBe 102
offsetInfo.offset shouldBe committedOffset
offsetInfo.logSize shouldBe logEndOffset
case None => fail("Failed to build offset data")
}

}
}
*/
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
package com.quantifind.kafka.core

import com.quantifind.utils.ZkUtilsWrapper
import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.data.Stat
import org.mockito.Matchers._
import org.mockito.Mockito._
Expand All @@ -13,11 +11,10 @@ class StormOffsetGetterSpec extends FlatSpec with ShouldMatchers {

trait Fixture {

val mockedZkClient = Mockito.mock(classOf[ZkClient])
val zkOffsetBase = "/stormconsumers"
val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper])

val offsetGetter = new StormOffsetGetter(mockedZkClient, zkOffsetBase, mockedZkUtil)
val offsetGetter = new StormOffsetGetter(mockedZkUtil, zkOffsetBase)
}

"StormOffsetGetter" should "be able to extract topic from persisted spout state" in new Fixture {
Expand All @@ -38,12 +35,11 @@ class StormOffsetGetterSpec extends FlatSpec with ShouldMatchers {
}
}"""
val ret = (spoutState, Mockito.mock(classOf[Stat]))
when(mockedZkUtil.readData(MockitoMatchers.eq(mockedZkClient), anyString)).thenReturn(ret)
when(mockedZkUtil.readData(anyString)).thenReturn(ret)

val topics = offsetGetter.getTopicList(testGroup)

topics.size shouldBe 1
topics(0) shouldBe testTopic
}
}
*/
}
Loading

0 comments on commit c08f6e7

Please sign in to comment.