Skip to content

Commit

Permalink
Implement stats command
Browse files Browse the repository at this point in the history
This allows users to monitor Rafka for errors that are not propagated to
the clients. Until now such errors were just being logged. It allows for
simple alerting rules (eg. alert when producer errors are more than 10).

We used the HGETALL Redis command, since the stats structure can be
considered a hash with stat names as keys and counters as values.

Part of #28
  • Loading branch information
agis committed Sep 7, 2017
1 parent 2acf9b6 commit 04ac92c
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 12 deletions.
27 changes: 18 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,6 @@ $ go build && ./rafka -k ./kafka.json.sample -i 10
API
------------------------------------------------------------------------------
Generic commands:
- [`PING`](https://redis.io/commands/ping) (no arguments)
- [`QUIT`](https://redis.io/commands/quit)
### Producer
- `RPUSHX topics:<topic> <message>`: produce a message
Expand Down Expand Up @@ -174,6 +165,24 @@ Example using Redis:
### Generic
- [`PING`](https://redis.io/commands/ping)
- [`QUIT`](https://redis.io/commands/quit)
- `HGETALL stats`: returns a hash with various statistics. Typically used for
monitoring.
Development
-------------------------------------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"

rdkafka "github.com/confluentinc/confluent-kafka-go/kafka"
)
Expand Down Expand Up @@ -77,6 +78,7 @@ func (p *Producer) Close() {
unflushed := p.rdProd.Flush(5000)
if unflushed > 0 {
p.log.Printf("Flush timeout: %d unflushed events", unflushed)
atomic.AddInt64(&stats.producerUnflushed, int64(unflushed))
}
// signal consumeDeliveries() to exit by closing p.rdProd.Events()
// channel
Expand All @@ -93,9 +95,11 @@ func (p *Producer) consumeDeliveries() {
if ok {
if err := msg.TopicPartition.Error; err != nil {
p.log.Printf("Failed to deliver `%s` to %s: %s", msg.Value, msg, err)
atomic.AddInt64(&stats.producerErr, 1)
}
} else {
p.log.Printf("Unknown event type: %s", ev)
atomic.AddInt64(&stats.producerErr, 1)
}
}
}
1 change: 1 addition & 0 deletions rafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

var (
cfg Config
stats Stats
shutdown = make(chan os.Signal, 1)
)

Expand Down
26 changes: 24 additions & 2 deletions rafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestConsumerTopicExclusive(t *testing.T) {
}
}

// RPUSH
func TestConsumerOffsetCommit(t *testing.T) {
cases := [][]string{
{"acks", "sometopic:1:-5"},
Expand All @@ -84,7 +85,8 @@ func TestConsumerOffsetCommit(t *testing.T) {
}
}

func TestErrRPUSHX(t *testing.T) {
// RPUSHX
func TestProduceErr(t *testing.T) {
c := newClient("some:producer")

_, err := c.RPushX("invalid-arg", "a msg").Result()
Expand All @@ -98,7 +100,8 @@ func TestErrRPUSHX(t *testing.T) {
}
}

func TestSETNAME(t *testing.T) {
// SETNAME
func TestClientID(t *testing.T) {
numReq := 100
replies := make(chan string)

Expand Down Expand Up @@ -159,6 +162,25 @@ func TestConcurrentProducers(t *testing.T) {
wg.Wait()
}

// HGETALL
func TestStatsQuery(t *testing.T) {
p := newClient("someone:foo")
v, err := p.HGetAll("stats").Result()
if err != nil {
t.Error(err)
}

_, err = strconv.Atoi(v["producer.delivery.errors"])
if err != nil {
t.Error(err)
}

_, err = strconv.Atoi(v["producer.unflushed.messages"])
if err != nil {
t.Error(err)
}
}

func newClient(id string) *redis.Client {
return redis.NewClient(&redis.Options{
// TODO Add the ability to read host and port from a cfg file
Expand Down
14 changes: 13 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ func (s *Server) handleConn(conn net.Conn) {
}
}
ticker.Stop()
// Get producer/consumer statistics
//
// HGETALL stats
case "HGETALL":
key := strings.ToUpper(string(command.Get(1)))
if key != "STATS" {
writeErr = writer.WriteError("ERR Expected key to be 'stats', got " + key)
break
}
writeErr = writer.WriteObjects(stats.toRedis()...)
// Commit offsets for the given topic/partition
//
// RPUSH acks <topic>:<partition>:<offset>
Expand Down Expand Up @@ -252,6 +262,7 @@ func (s *Server) handleConn(conn net.Conn) {
writer.Flush()
}
if writeErr != nil {
// TODO(agis) log these errors
break
}
}
Expand Down Expand Up @@ -294,7 +305,7 @@ Loop:
conn, err := listener.Accept()
if err != nil {
// we know that closing a listener that blocks
// on accepts will return this error
// on Accept() will return this error
if !strings.Contains(err.Error(), "use of closed network connection") {
s.log.Println("Accept error: ", err)
}
Expand Down Expand Up @@ -356,6 +367,7 @@ func parseAck(ack string) (string, int32, rdkafka.Offset, error) {
func msgToRedis(msg *rdkafka.Message) []interface{} {
tp := msg.TopicPartition

// TODO(agis): convert slices to strings
return []interface{}{
[]byte("topic"),
[]byte(*tp.Topic),
Expand Down
34 changes: 34 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2017 Skroutz S.A.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main

import (
"strconv"
"sync/atomic"
)

type Stats struct {
producerUnflushed int64
producerErr int64
}

func (s *Stats) toRedis() []interface{} {
return []interface{}{
"producer.unflushed.messages",
strconv.FormatInt(atomic.LoadInt64(&s.producerUnflushed), 10),
"producer.delivery.errors",
strconv.FormatInt(atomic.LoadInt64(&s.producerErr), 10),
}
}
7 changes: 7 additions & 0 deletions test/end-to-end
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ class TestRafka < Minitest::Test
cons2.consume(1)
end
end

def test_stats
stats = @prod.redis.hgetall("stats")
assert stats.count > 0
stats.keys.each { |k| assert_kind_of String, k }
stats.values.each { |v| assert_kind_of Integer, Integer(v) }
end
end

puts "\nRunning on #{host_port.join(":")} " \
Expand Down

0 comments on commit 04ac92c

Please sign in to comment.