Skip to content

Commit

Permalink
Big commit this time.
Browse files Browse the repository at this point in the history
**New functionality:**
  - (Experimental) Implemented Java-based in-memory data storage which provides some new functionality.  Right now this is lightly implemented into the front-end, but over time will replace the current implementation.  This allowed me to implement some new functionality:
  - Report if consumer-group is currently active
    - This will eventually allow us to report on inactive consumer-groups
  - (Experimental) Report Burrow-like consumer-group status calculation via REST endpoint (/consumergroup), while updating Burrows rules a bit. The rules I implemnted here are:
    - Evaluate per consumer-group topic-partition:
      - Rule 0:  If there are no committed offsets, then there is nothing to calculate and the period is OK.
      - Rule 1:  If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the consumer has stopped committing offsets for that partition (error)
      - Rule 2:  If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error)
      - Rule 3:  If over the stored period, the lag is ever zero for the partition, the period is OK
      - Rule 4:  If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled)
      - Rule 5:  If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow)
    - Roll-up all consumer-group topic-partitions per consumer-group and report a consumer-group status:
      - Set consumer-group status to ERROR if any topic-partition status is STOP
      - Set consumer-group status to ERROR if any topic-partition status is REWIND
      - Set consumer-group status to ERROR if any topic-partition status is STALL
      - Set consumer-group status to WARN if any topic-partition status is WARN
      - Set consumer-group status to OK if none of the above rules match

**Of course some of the bugs you were seeing were fixed as well:**
  - Synchronizing around all SQLite DB activity.  SQLite only allows one operation at a time with the DB file.
  - This fixed all DB create/update/delete issues at the expense of sometimes blocking DB operations while another DB operation is taking place. This is unavoidable using SQLite. Long term fix will be to replace SQLite with a more appropriate DB engine.
  - Fixed an issue where LogEndOffset and Lag can display incorrect values.
  - Added retry logic around building the ZkUtils object. This fixed the issue where we would not re-connect to Zookeeper if the zk service went down and then was restored.
  - Updated some dependency versions.
  • Loading branch information
Robert Casey authored and Robert Casey committed Jan 7, 2018
1 parent dc5fa7c commit 8119e2a
Show file tree
Hide file tree
Showing 24 changed files with 1,245 additions and 222 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ This is a small web app, you can run it locally or on a server, as long as you h
```
java -Djava.security.auth.login.config=conf/server-client-jaas.conf \
-cp KafkaOffsetMonitor-assembly-0.4.0.jar \
-cp KafkaOffsetMonitor-assembly-0.4.6.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \
Expand Down Expand Up @@ -126,7 +126,7 @@ As long as this is true you will need to use local maven repo and just publish K
Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app:

```
java -cp KafkaOffsetMonitor-assembly-0.3.0.jar:kafka-offset-monitor-another-db-reporter.jar \
java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zkserver01,zkserver02 \
--port 8080 \
Expand All @@ -141,7 +141,3 @@ Contributing
============

The KafkaOffsetMonitor is released under the Apache License and we **welcome any contributions** within this license. Any pull request is welcome and will be reviewed and merged as quickly as possible.

Because this open source tool is released by [Quantifind](http://www.quantifind.com) as a company, if you want to submit a pull request, you will have to sign the following simple contributors agreement:
- If you are an individual, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1RS7qEjq3cCmJ1665UhoCMK8541Ms7KyU3kVFoO4CR_I/) and send it back to [email protected]
- If you are contributing changes that you did as part of your work, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1kNwLT4qG3G0Ct2mEuNdBGmKDYuApN1CpQtZF8TSVTjE/) and send it back to [email protected]
32 changes: 18 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
name := "KafkaOffsetMonitor"
version := "0.4.1-SNAPSHOT"
scalaVersion := "2.11.8"
version := "0.4.6-SNAPSHOT"
scalaVersion := "2.11.11"
organization := "com.quantifind"

scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature")

mainClass in Compile := Some("com.quantifind.kafka.offsetapp.OffsetGetterWeb")

libraryDependencies ++= Seq(
"log4j" % "log4j" % "1.2.17",
"net.databinder" %% "unfiltered-filter" % "0.8.4",
"net.databinder" %% "unfiltered-jetty" % "0.8.4",
"net.databinder" %% "unfiltered-json4s" % "0.8.4",
"com.quantifind" %% "sumac" % "0.3.0",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.reflections" % "reflections" % "0.9.10",
"com.twitter" %% "util-core" % "6.40.0",
"com.typesafe.slick" %% "slick" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.7.2",
"org.mockito" % "mockito-all" % "1.10.19" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
"log4j" % "log4j" % "1.2.17",
"net.databinder" %% "unfiltered-filter" % "0.8.4",
"net.databinder" %% "unfiltered-jetty" % "0.8.4",
"net.databinder" %% "unfiltered-json4s" % "0.8.4",
"com.quantifind" %% "sumac" % "0.3.0",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.reflections" % "reflections" % "0.9.11",
"com.twitter" %% "util-core" % "7.1.0",
"com.typesafe.slick" %% "slick" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.18.0",
"com.google.code.gson" % "gson" % "2.8.2",
"com.google.guava" % "guava" % "20.0",
"javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16",
"org.glassfish.jersey.core" % "jersey-client" % "2.25.1",
"org.mockito" % "mockito-all" % "1.10.19" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")

assemblyMergeStrategy in assembly := {
case "about.html" => MergeStrategy.discard
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
project.organization=com.quantifind
project.name=KafkaOffsetMonitor
project.version=0.4
sbt.version=0.13.13
sbt.version=0.13.16
50 changes: 50 additions & 0 deletions src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.morningstar.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

public class KafkaCommittedOffset {

private String groupName;
private boolean groupIsActive;
private String topicName;
private int partitionId;
private KafkaCommittedOffsetMetadata committedOffset;


public KafkaCommittedOffset(String groupName, boolean groupIsActive, String topicName, int partitionId, long committedOffset, long committedMillis) {

Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName must not be NULL or empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(topicName), "topicName must not be NULL or empty.");
Preconditions.checkArgument(partitionId > -1, "partitionId must be greater than or equal to 0.");
Preconditions.checkArgument(committedOffset > -1, "committedOffset must be greater than or equal to 0.");
Preconditions.checkArgument(committedMillis > -1, "committedMillis must be greater than or equal to 0.");

this.groupName = groupName;
this.groupIsActive = groupIsActive;
this.topicName = topicName;
this.partitionId = partitionId;
this.committedOffset = new KafkaCommittedOffsetMetadata(committedOffset, committedMillis);
}


public String getGroupName() {
return groupName;
}

public boolean getGroupIsActive() {
return groupIsActive;
}

public String getTopicName() {
return topicName;
}

public int getPartitionId() {
return partitionId;
}

public KafkaCommittedOffsetMetadata getCommittedOffset() {
return committedOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.morningstar.kafka;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;

public class KafkaCommittedOffsetMetadata extends KafkaOffsetMetadata {

@Expose private long lag = -1;


public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata, long lag) {
super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp());
verifyParameters(lag);
this.lag = lag;
}

public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp, long lag) {
super(committedOffset, timestamp);
verifyParameters(lag);
this.lag = lag;
}

public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata) {
super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp());
}

public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp) {
super(committedOffset, timestamp);
}

private void verifyParameters(long lag) {

Preconditions.checkArgument(lag > -2, "lag must not be less than -1.");
}


public long getLag() {
return lag;
}

public void setLag(long lag) {
this.lag = lag;
}
}
168 changes: 168 additions & 0 deletions src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.morningstar.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.google.gson.annotations.Expose;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;


public class KafkaConsumerGroup {

private final long COMPLETE_THRESHOLD = 10;

@Expose private String consumerGroupName;
@Expose private boolean isActive;
@Expose private boolean complete;
@Expose private long mostRecentCommittedMillis;
@Expose private Status status;
@Expose private Set<KafkaTopicPartition> topicPartitions;


public KafkaConsumerGroup(String consumerGroupName) {

Preconditions.checkArgument(!Strings.isNullOrEmpty(consumerGroupName), "consumerGroupName cannot be NULL or empty.");

this.consumerGroupName = consumerGroupName;
this.isActive = false;
this.complete = false;
this.mostRecentCommittedMillis = -1;
this.status = Status.OK;
this.topicPartitions = Sets.newConcurrentHashSet();
}


public String getConsumerGroupName() {
return consumerGroupName;
}

public boolean getComplete() { return complete; }

public long getMaxCommitedMillis() {
return mostRecentCommittedMillis;
}

public boolean isActive() {
return isActive;
}

public Set<KafkaTopicPartition> getTopicPartitions() {

return topicPartitions;
}

public Set<String> getTopics() {

return topicPartitions.stream()
.map(KafkaTopicPartition::getTopicName)
.collect(Collectors.toSet());
}

public synchronized void updateStatus() {

if (!isActive) {
this.status = Status.ERR;
return;
}

Status newStatus = Status.OK;

for (KafkaTopicPartition topicPartition : topicPartitions) {

// Set group status to ERROR if any topicPartition's status is STOP
if (Status.STOP == topicPartition.getStatus()) {
newStatus = Status.ERR;
break;
}

// Set group status to ERROR if any topicPartition's status is REWIND
if (Status.REWIND == topicPartition.getStatus()) {
newStatus = Status.ERR;
break;
}

// Set group status to ERROR if any topicPartition's status is STALL
if (Status.STALL == topicPartition.getStatus()) {
newStatus = Status.ERR;
break;
}

// Set group status to WARN if any topicPartition's status is WARN
if (Status.WARN == topicPartition.getStatus()) {
newStatus = Status.WARN;
break;
}
}

this.status = newStatus;
}


private Optional<KafkaTopicPartition> getTopicPartition(String topic, int partitionId) {

//return committedOffsets.keySet().stream().filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId)).findFirst();
return topicPartitions.stream()
.filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId))
.findFirst();
}

private void upsertTopicPartition(KafkaCommittedOffset kafkaCommittedOffset) {

Preconditions.checkArgument(!Strings.isNullOrEmpty(kafkaCommittedOffset.getTopicName()), "topic cannot be NULL or empty.");
Preconditions.checkArgument(kafkaCommittedOffset.getPartitionId() >= 0, "partitionId must be greater-than or equal-to zero.");

String incomingTopicName = kafkaCommittedOffset.getTopicName();
int incomingPartitionId = kafkaCommittedOffset.getPartitionId();

Optional<KafkaTopicPartition> existingTopicPartition = getTopicPartition(incomingTopicName, incomingPartitionId);

if (existingTopicPartition.isPresent()) {
// Append committed offset info to existing set item
existingTopicPartition.get().addCommittedOffset(kafkaCommittedOffset.getCommittedOffset());
} else {
// Add a new entry to the map
KafkaTopicPartition newTopicPartition = new KafkaTopicPartition(incomingTopicName, incomingPartitionId);
newTopicPartition.addCommittedOffset(kafkaCommittedOffset.getCommittedOffset());
topicPartitions.add(newTopicPartition);
}
}

private void setMostRecentCommittedMillis(long mostRecentCommittedMillis) {
if (this.mostRecentCommittedMillis < mostRecentCommittedMillis) {
this.mostRecentCommittedMillis = mostRecentCommittedMillis;
}
}

private void updateCompleteFlag() {

this.complete = topicPartitions.stream()
.noneMatch(f -> f.getCommittedOffsets().size() < COMPLETE_THRESHOLD);
}

public void addCommittedOffsetInfo(KafkaCommittedOffset kafkaCommittedOffset) {

setMostRecentCommittedMillis(kafkaCommittedOffset.getCommittedOffset().getTimestamp());
this.isActive = kafkaCommittedOffset.getGroupIsActive();
upsertTopicPartition(kafkaCommittedOffset);
updateCompleteFlag();
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

KafkaConsumerGroup that = (KafkaConsumerGroup) o;

return getConsumerGroupName().equals(that.getConsumerGroupName());
}

@Override
public int hashCode() {
return getConsumerGroupName().hashCode();
}
}
Loading

0 comments on commit 8119e2a

Please sign in to comment.