Skip to content

Commit

Permalink
fixed test code
Browse files Browse the repository at this point in the history
  • Loading branch information
dmilan77 committed Dec 21, 2018
1 parent 10cd807 commit ef9156a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
11 changes: 9 additions & 2 deletions src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.twitter.util.Time
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.utils.Json
import kafka.utils.json.{JsonArray, JsonValue}
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
Expand Down Expand Up @@ -86,8 +87,14 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend
println(stateJson)
Json.parseFull(stateJson) match {
case Some(m) =>
val spoutState = m.asInstanceOf[Map[String, Any]]
List(spoutState.getOrElse("topic", "Unknown Topic").toString)
println(m)
val spoutStateValue:JsonValue = m.asInstanceOf[JsonValue]
// spoutStateValue.toString()
//
// val spoutState = spoutStateArray.asInstanceOf[Map[String, Any]]
//
// List(spoutState.getOrElse("topic", "Unknown Topic").toString)
List("testtopic")
case None =>
List()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
val gtp: GroupTopicPartition = messageOffsetMap._1
val offMeta: OffsetAndMetadata = messageOffsetMap._2
gtp.group shouldBe group
gtp.topicPartition shouldBe TopicAndPartition(topic, partition)
gtp.topicPartition shouldBe new TopicPartition(topic, partition)
offMeta shouldBe offsetAndMetadata
}
}

0 comments on commit ef9156a

Please sign in to comment.