Skip to content

Commit

Permalink
statsd, udp, tcp: do not log every dropped metric.
Browse files Browse the repository at this point in the history
also applying this change to the udp_listener and tcp_listener input
plugins

closes influxdata#1340
  • Loading branch information
sparrc committed Jun 10, 2016
1 parent ea2521b commit 06cb5a0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Features

- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric.

### Bugfixes

- [#1330](https://github.com/influxdata/telegraf/issues/1330): Fix exec plugin panic when using single binary.
Expand Down
10 changes: 8 additions & 2 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ const (
defaultSeparator = "_"
)

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
var dropwarn = "ERROR: statsd message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

var prevInstance *Statsd
Expand Down Expand Up @@ -65,6 +66,8 @@ type Statsd struct {

sync.Mutex
wg sync.WaitGroup
// drops tracks the number of dropped metrics.
drops int

// Channel for all incoming statsd packets
in chan []byte
Expand Down Expand Up @@ -291,7 +294,10 @@ func (s *Statsd) udpListen() error {
select {
case s.in <- bufCopy:
default:
log.Printf(dropwarn, string(buf[:n]))
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
log.Printf(dropwarn, s.drops)
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions plugins/inputs/tcp_listener/tcp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type TcpListener struct {
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool
// drops tracks the number of dropped metrics.
drops int

// track the listener here so we can close it in Stop()
listener *net.TCPListener
Expand All @@ -39,7 +41,8 @@ type TcpListener struct {
acc telegraf.Accumulator
}

var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " +
var dropwarn = "ERROR: tcp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

const sampleConfig = `
Expand Down Expand Up @@ -212,7 +215,10 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
select {
case t.in <- bufCopy:
default:
log.Printf(dropwarn, scanner.Text())
t.drops++
if t.drops == 1 || t.drops%t.AllowedPendingMessages == 0 {
log.Printf(dropwarn, t.drops)
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type UdpListener struct {

in chan []byte
done chan struct{}
// drops tracks the number of dropped metrics.
drops int

parser parsers.Parser

Expand All @@ -38,7 +40,8 @@ type UdpListener struct {
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_MAX_PACKET_SIZE int = 64 * 1024

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
var dropwarn = "ERROR: udp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

const sampleConfig = `
Expand Down Expand Up @@ -125,7 +128,10 @@ func (u *UdpListener) udpListen() error {
select {
case u.in <- bufCopy:
default:
log.Printf(dropwarn, string(bufCopy))
u.drops++
if u.drops == 1 || u.drops%u.AllowedPendingMessages == 0 {
log.Printf(dropwarn, u.drops)
}
}
}
}
Expand Down

0 comments on commit 06cb5a0

Please sign in to comment.