diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index b7d55e38a4a99..0f84b7d66f9e2 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/alitto/pond" @@ -51,6 +52,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet for { n, src, err := l.conn.ReadFrom(buf) + receiveTime := time.Now() if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { if onError != nil { @@ -74,7 +76,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) src = &net.UnixAddr{Name: l.path, Net: "unixgram"} } - onData(src, body) + onData(src, body, receiveTime) }) } }() diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index 0a138415e5e06..ed5bc499cd15e 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -9,13 +9,14 @@ import ( "net/url" "regexp" "strings" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" common_tls "github.com/influxdata/telegraf/plugins/common/tls" ) -type CallbackData func(net.Addr, []byte) +type CallbackData func(net.Addr, []byte, time.Time) type CallbackConnection func(net.Addr, io.ReadCloser) type CallbackError func(error) diff --git a/plugins/common/socket/socket_test.go b/plugins/common/socket/socket_test.go index 5a2a6d7d2d8da..e979f8a3654cd 100644 --- a/plugins/common/socket/socket_test.go +++ b/plugins/common/socket/socket_test.go @@ -153,7 +153,7 @@ func TestListenData(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(remote net.Addr, data []byte) { + onData := func(remote net.Addr, data []byte, _ time.Time) { m, err := parser.Parse(data) require.NoError(t, err) addr, _, err := net.SplitHostPort(remote.String()) @@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(_ net.Addr, data []byte) { + onData := func(_ net.Addr, data []byte, _ time.Time) { m, err := parser.Parse(data) require.NoError(t, err) acc.AddMetrics(m) @@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) { // Create callback var errs []error var mu sync.Mutex - onData := func(_ net.Addr, _ []byte) {} + onData := func(_ net.Addr, _ []byte, _ time.Time) {} onError := func(err error) { mu.Lock() errs = append(errs, err) diff --git a/plugins/common/socket/stream.go b/plugins/common/socket/stream.go index 251666899fa65..ae41e1db72f62 100644 --- a/plugins/common/socket/stream.go +++ b/plugins/common/socket/stream.go @@ -352,6 +352,7 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { break } + receiveTime := time.Now() src := conn.RemoteAddr() if l.path != "" { src = &net.UnixAddr{Name: l.path, Net: "unix"} @@ -361,7 +362,7 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { d := make([]byte, len(data)) copy(d, data) l.parsePool.Submit(func() { - onData(src, d) + onData(src, d, receiveTime) }) } @@ -407,8 +408,9 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error { return fmt.Errorf("read on %s failed: %w", src, err) } + receiveTime := time.Now() l.parsePool.Submit(func() { - onData(src, buf) + onData(src, buf, receiveTime) }) return nil diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b4783afc1af86..5f32af8062627 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -6,6 +6,7 @@ import ( _ "embed" "net" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -20,6 +21,7 @@ var once sync.Once type SocketListener struct { ServiceAddress string `toml:"service_address"` + TimeSource string `toml:"time_source"` Log telegraf.Logger `toml:"-"` socket.Config socket.SplitConfig @@ -52,18 +54,27 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) { func (sl *SocketListener) Start(acc telegraf.Accumulator) error { // Create the callbacks for parsing the data and recording issues - onData := func(_ net.Addr, data []byte) { + onData := func(_ net.Addr, data []byte, receiveTime time.Time) { metrics, err := sl.parser.Parse(data) + if err != nil { acc.AddError(err) return } + if len(metrics) == 0 { once.Do(func() { sl.Log.Debug(internal.NoMetricsCreatedMsg) }) } + for _, m := range metrics { + switch sl.TimeSource { + case "", "metric": + case "receive_time": + m.SetTime(receiveTime) + } + acc.AddMetric(m) } } diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 6f5517abc8fe0..4fe9f4e8ef3d9 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -10,6 +10,7 @@ import ( "net/url" "strings" "sync" + "time" "unicode" "github.com/leodido/go-syslog/v4" @@ -214,7 +215,7 @@ func (s *Syslog) createDatagramDataHandler(acc telegraf.Accumulator) socket.Call } // Return the OnData function - return func(src net.Addr, data []byte) { + return func(src net.Addr, data []byte, _ time.Time) { message, err := parser.Parse(data) if err != nil { acc.AddError(err)