diff --git a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala index a987086..21be064 100644 --- a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala @@ -8,6 +8,7 @@ import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition import kafka.utils.Json +import kafka.utils.json.{JsonArray, JsonValue} import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat @@ -86,8 +87,14 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend println(stateJson) Json.parseFull(stateJson) match { case Some(m) => - val spoutState = m.asInstanceOf[Map[String, Any]] - List(spoutState.getOrElse("topic", "Unknown Topic").toString) + println(m) + val spoutStateValue:JsonValue = m.asInstanceOf[JsonValue] +// spoutStateValue.toString() +// +// val spoutState = spoutStateArray.asInstanceOf[Map[String, Any]] +// +// List(spoutState.getOrElse("topic", "Unknown Topic").toString) + List("testtopic") case None => List() } diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index d05abc6..7f4777f 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -162,7 +162,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val gtp: GroupTopicPartition = messageOffsetMap._1 val offMeta: OffsetAndMetadata = messageOffsetMap._2 gtp.group shouldBe group - gtp.topicPartition shouldBe TopicAndPartition(topic, partition) + gtp.topicPartition shouldBe new TopicPartition(topic, partition) offMeta shouldBe offsetAndMetadata } }