Skip to content

Commit

Permalink
Fix burrow topic status check for more than one partition (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-xu-hs authored and kush-patel-hs committed Dec 10, 2019
1 parent a837920 commit e52a3a9
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 61 deletions.
115 changes: 68 additions & 47 deletions checks/burrowsc/burrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,75 +130,92 @@ func (b BurrowStatusChecker) CheckStatus(name string) healthchecks.StatusList {
}
}

lagStatus := lagResponse.LagStatus
var s *healthchecks.Status

// If topic is nil, check the status of the entire consumer group instead of a specific partition
var s healthchecks.Status
if b.Topic == nil {
s = checkGroupStatus(name, lagStatus, b.CriticalLagThreshold)
return healthchecks.StatusList{
StatusList: []healthchecks.Status{
*s,
},
}
}

for _, partition := range lagStatus.Partitions {
if partition.Topic == *b.Topic {
s = checkPartitionStatus(name, lagStatus, partition, b.CriticalLagThreshold)
break
}
}

// Topic was not found in consumer group partitions
if s == nil {
s = &healthchecks.Status{
Description: name,
Result: healthchecks.WARNING,
Details: fmt.Sprintf("Topic %s not found in group %s on cluster %s", *b.Topic, b.ConsumerGroup, b.Cluster),
}
s = checkGroupStatus(name, lagResponse.LagStatus, b.CriticalLagThreshold)
} else {
s = checkTopicStatus(name, *b.Topic, lagResponse.LagStatus, b.CriticalLagThreshold)
}

return healthchecks.StatusList{
StatusList: []healthchecks.Status{
*s,
s,
},
}
}

func checkGroupStatus(name string, lagStatus lagStatus, criticalLagThreshold *int64) *healthchecks.Status {
func checkGroupStatus(name string, lagStatus lagStatus, criticalLagThreshold *int64) healthchecks.Status {
// If critical lag threshold is specified and exceeded, return a critical alert
if criticalLagThreshold != nil && lagStatus.TotalLag > *criticalLagThreshold {
return &healthchecks.Status{
return healthchecks.Status{
Description: name,
Result: healthchecks.CRITICAL,
Details: fmt.Sprintf("%s exceeds threshold", formatConsumerGroupDetails(lagStatus)),
}
}

// If critical lag threshold was not exceeded or not specified, check Burrow consumer group status
return &healthchecks.Status{
return healthchecks.Status{
Description: name,
Result: getAlertLevel(lagStatus.Status),
Details: formatConsumerGroupDetails(lagStatus),
}
}

func checkPartitionStatus(name string, lagStatus lagStatus, partition partition, criticalLagThreshold *int64) *healthchecks.Status {
func checkTopicStatus(name string, topic string, lagStatus lagStatus, criticalLagThreshold *int64) healthchecks.Status {
alertLevel := healthchecks.OK
critWarnPartitions := []partition{}
topicExists := false
totalLag := int64(0)

for _, partition := range lagStatus.Partitions {
if partition.Topic == topic {
partitionAlertLevel := getAlertLevel(partition.Status)
topicExists = true
totalLag += partition.CurrentLag

if partitionAlertLevel == healthchecks.CRITICAL || partitionAlertLevel == healthchecks.WARNING {
critWarnPartitions = append(critWarnPartitions, partition)

// If status is already CRITICAL, don't overwrite
if alertLevel != healthchecks.CRITICAL {
alertLevel = partitionAlertLevel
}
}
}
}

if !topicExists {
return healthchecks.Status{
Description: name,
Result: healthchecks.WARNING,
Details: fmt.Sprintf("Topic %s not found in group %s on cluster %s", topic, lagStatus.Group, lagStatus.Cluster),
}
}

topicDetails := fmt.Sprintf(
"Topic %s has total lag of %d for group %s on cluster %s",
topic,
totalLag,
lagStatus.Group,
lagStatus.Cluster,
)
partitionDetails := formatPartitionsDetails(critWarnPartitions)

// If critical lag threshold is specified and exceeded, return a critical alert
if criticalLagThreshold != nil && partition.CurrentLag > *criticalLagThreshold {
return &healthchecks.Status{
if criticalLagThreshold != nil && totalLag > *criticalLagThreshold {
return healthchecks.Status{
Description: name,
Result: healthchecks.CRITICAL,
Details: fmt.Sprintf("%s exceeds threshold", formatPartitionDetails(partition, lagStatus)),
Details: fmt.Sprintf("%s exceeds threshold%s", topicDetails, partitionDetails),
}
}

// If critical lag threshold was not exceeded or not specified, check Burrow partition status
return &healthchecks.Status{
// If critical lag threshold was not exceeded or not specified, check Burrow partition statuses
return healthchecks.Status{
Description: name,
Result: getAlertLevel(partition.Status),
Details: formatPartitionDetails(partition, lagStatus),
Result: alertLevel,
Details: fmt.Sprintf("%s%s", topicDetails, partitionDetails),
}
}

Expand Down Expand Up @@ -229,13 +246,17 @@ func formatConsumerGroupDetails(status lagStatus) string {
)
}

func formatPartitionDetails(partition partition, status lagStatus) string {
return fmt.Sprintf(
"Partition status is %s, lag of %d for topic %s in group %s on cluster %s",
partition.Status,
partition.CurrentLag,
partition.Topic,
status.Group,
status.Cluster,
)
func formatPartitionsDetails(partitions []partition) string {
var str strings.Builder

for _, partition := range partitions {
str.WriteString(fmt.Sprintf(
", partition %d status is %s and has lag of %d",
partition.Partition,
partition.Status,
partition.CurrentLag,
))
}

return str.String()
}
22 changes: 11 additions & 11 deletions checks/burrowsc/burrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBurrowStatusChecker_CheckGroupStatusOK(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.OK,
Details: "Consumer group status is OK, total lag of 13 for group group1 on cluster cluster1",
Details: "Consumer group status is OK, total lag of 17 for group group1 on cluster cluster1",
},
},
}
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestBurrowStatusChecker_CheckTopicStatusOK(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.OK,
Details: "Partition status is OK, lag of 2 for topic topic1 in group group1 on cluster cluster1",
Details: "Topic topic1 has total lag of 6 for group group1 on cluster cluster1",
},
},
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestBurrowStatusChecker_CheckGroupStatusThresholdExceeded(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.CRITICAL,
Details: "Consumer group status is OK, total lag of 13 for group group1 on cluster cluster1 exceeds threshold",
Details: "Consumer group status is OK, total lag of 17 for group group1 on cluster cluster1 exceeds threshold",
},
},
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestBurrowStatusChecker_CheckTopicStatusThresholdExceeded(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.CRITICAL,
Details: "Partition status is OK, lag of 11 for topic topic2 in group group1 on cluster cluster1 exceeds threshold",
Details: "Topic topic2 has total lag of 11 for group group1 on cluster cluster1 exceeds threshold",
},
},
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestBurrowStatusChecker_CheckGroupStatusERR(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.CRITICAL,
Details: "Consumer group status is ERR, total lag of 15329 for group group1 on cluster cluster1",
Details: "Consumer group status is ERR, total lag of 17717 for group group1 on cluster cluster1",
},
},
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestBurrowStatusChecker_CheckTopicStatusSTOP(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.CRITICAL,
Details: "Partition status is STOP, lag of 8035 for topic topic1 in group group1 on cluster cluster1",
Details: "Topic topic1 has total lag of 8035 for group group1 on cluster cluster1, partition 0 status is STOP and has lag of 8035",
},
},
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestBurrowStatusChecker_CheckTopicStatusSTALL(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.CRITICAL,
Details: "Partition status is STALL, lag of 7294 for topic topic2 in group group1 on cluster cluster1",
Details: "Topic topic2 has total lag of 9682 for group group1 on cluster cluster1, partition 1 status is STALL and has lag of 7294, partition 2 status is WARN and has lag of 2285",
},
},
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestBurrowStatusChecker_CheckGroupStatusWARN(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.WARNING,
Details: "Consumer group status is WARN, total lag of 2760 for group group1 on cluster cluster1",
Details: "Consumer group status is WARN, total lag of 2764 for group group1 on cluster cluster1",
},
},
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestBurrowStatusChecker_CheckTopicStatusWARN(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.WARNING,
Details: "Partition status is WARN, lag of 2736 for topic topic1 in group group1 on cluster cluster1",
Details: "Topic topic1 has total lag of 2740 for group group1 on cluster cluster1, partition 1 status is WARN and has lag of 2736",
},
},
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestBurrowStatusChecker_CheckTopicStatusOKGroupStatusWarn(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.OK,
Details: "Partition status is OK, lag of 24 for topic topic2 in group group1 on cluster cluster1",
Details: "Topic topic2 has total lag of 24 for group group1 on cluster cluster1",
},
},
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestBurrowStatusChecker_CheckStatusTrimBaseUrl(t *testing.T) {
{
Description: "Consumer",
Result: healthchecks.OK,
Details: "Consumer group status is OK, total lag of 13 for group group1 on cluster cluster1",
Details: "Consumer group status is OK, total lag of 17 for group group1 on cluster cluster1",
},
},
}
Expand Down
16 changes: 15 additions & 1 deletion checks/burrowsc/test/status_err.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,29 @@
"partitions": [
{
"topic": "topic1",
"partition": 0,
"status": "STOP",
"current_lag": 8035
},
{
"topic": "topic2",
"partition": 0,
"status": "OK",
"current_lag": 103
},
{
"topic": "topic2",
"partition": 1,
"status": "STALL",
"current_lag": 7294
},
{
"topic": "topic2",
"partition": 2,
"status": "WARN",
"current_lag": 2285
}
],
"totallag": 15329
"totallag": 17717
}
}
10 changes: 9 additions & 1 deletion checks/burrowsc/test/status_ok.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@
"partitions": [
{
"topic": "topic1",
"partition": 0,
"status": "OK",
"current_lag": 2
},
{
"topic": "topic1",
"partition": 1,
"status": "OK",
"current_lag": 4
},
{
"topic": "topic2",
"partition": 0,
"status": "OK",
"current_lag": 11
}
],
"totallag": 13
"totallag": 17
}
}
10 changes: 9 additions & 1 deletion checks/burrowsc/test/status_warn.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@
"partitions": [
{
"topic": "topic1",
"partition": 0,
"status": "OK",
"current_lag": 4
},
{
"topic": "topic1",
"partition": 1,
"status": "WARN",
"current_lag": 2736
},
{
"topic": "topic2",
"partition": 0,
"status": "OK",
"current_lag": 24
}
],
"totallag": 2760
"totallag": 2764
}
}

0 comments on commit e52a3a9

Please sign in to comment.