Skip to content

Commit

Permalink
Fix dropwizard parsing error for metrics that need escaped (influxdat…
Browse files Browse the repository at this point in the history
…a#4142)

If the dropwizard parser cannot convert the metric name into a valid
line protocol series then we will accept the name as is.
  • Loading branch information
danielnelson authored May 14, 2018
1 parent 558caf5 commit 0af40a8
Show file tree
Hide file tree
Showing 10 changed files with 6,010 additions and 5,258 deletions.
2 changes: 2 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Metric interface {
AddField(key string, value interface{})
RemoveField(key string)

SetTime(t time.Time)

// HashID returns an unique identifier for the series.
HashID() uint64

Expand Down
4 changes: 4 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (m *metric) RemoveField(key string) {
}
}

func (m *metric) SetTime(t time.Time) {
m.tm = t
}

func (m *metric) Copy() telegraf.Metric {
m2 := &metric{
name: m.name,
Expand Down
143 changes: 75 additions & 68 deletions plugins/parsers/dropwizard/parser.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dropwizard

import (
"bytes"
"encoding/json"
"fmt"
"log"
Expand All @@ -10,6 +9,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/tidwall/gjson"
)
Expand All @@ -19,8 +19,8 @@ var keyEscaper = strings.NewReplacer(" ", "\\ ", ",", "\\,", "=", "\\=")

// Parser parses json inputs containing dropwizard metrics,
// either top-level or embedded inside a json field.
// This parser is using gjon for retrieving paths within the json file.
type Parser struct {
// This parser is using gjson for retrieving paths within the json file.
type parser struct {

// an optional json path containing the metric registry object
// if left empty, the whole json object is parsed as a metric registry
Expand All @@ -45,15 +45,28 @@ type Parser struct {
// an optional map of default tags to use for metrics
DefaultTags map[string]string

// templating configuration
Separator string
Templates []string

separator string
templateEngine *templating.Engine

timeFunc metric.TimeFunc

// seriesParser parses line protocol measurement + tags
seriesParser *influx.Parser
}

func NewParser() *parser {
handler := influx.NewMetricHandler()
seriesParser := influx.NewSeriesParser(handler)

parser := &parser{
timeFunc: time.Now,
seriesParser: seriesParser,
}
return parser
}

// Parse parses the input bytes to an array of metrics
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) {

metrics := make([]telegraf.Metric, 0)

Expand Down Expand Up @@ -100,28 +113,38 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

// InitTemplating initializes the templating support
func (p *Parser) InitTemplating() error {
if len(p.Templates) > 0 {
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
p.templateEngine = templateEngine
func (p *parser) SetTemplates(separator string, templates []string) error {
if len(templates) == 0 {
p.templateEngine = nil
return nil
}

defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
if err != nil {
return err
}

templateEngine, err := templating.NewEngine(separator, defaultTemplate, templates)
if err != nil {
return err
}

p.separator = separator
p.templateEngine = templateEngine
return nil
}

// ParseLine is not supported by the dropwizard format
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
func (p *parser) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line)
}

// SetDefaultTags sets the default tags
func (p *Parser) SetDefaultTags(tags map[string]string) {
func (p *parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *Parser) readTags(buf []byte) map[string]string {
func (p *parser) readTags(buf []byte) map[string]string {

if p.TagsPath != "" {
var tagsBytes []byte
Expand All @@ -147,7 +170,7 @@ func (p *Parser) readTags(buf []byte) map[string]string {
return tags
}

func (p *Parser) parseTime(buf []byte) (time.Time, error) {
func (p *parser) parseTime(buf []byte) (time.Time, error) {

if p.TimePath != "" {
timeFormat := p.TimeFormat
Expand All @@ -157,19 +180,19 @@ func (p *Parser) parseTime(buf []byte) (time.Time, error) {
timeString := gjson.GetBytes(buf, p.TimePath).String()
if timeString == "" {
err := fmt.Errorf("time not found in JSON path %s", p.TimePath)
return time.Now().UTC(), err
return p.timeFunc(), err
}
t, err := time.Parse(timeFormat, timeString)
if err != nil {
err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err)
return time.Now().UTC(), err
return p.timeFunc(), err
}
return t.UTC(), nil
}
return time.Now().UTC(), nil
return p.timeFunc(), nil
}

func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {

var registryBytes []byte
if p.MetricRegistryPath != "" {
Expand All @@ -195,71 +218,55 @@ func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
return jsonOut, nil
}

func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {

switch dwmsTyped := dwms.(type) {
case map[string]interface{}:
var metricsBuffer bytes.Buffer
func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
if dwmsTyped, ok := dwms.(map[string]interface{}); ok {
for dwmName, dwmFields := range dwmsTyped {
measurementName := dwmName
tags := make(map[string]string)
fieldPrefix := ""
if p.templateEngine != nil {
measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName)
if len(fieldPrefix) > 0 {
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator)
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.separator)
}
}

parsed, err := p.seriesParser.Parse([]byte(measurementName))
var m telegraf.Metric
if err != nil || len(parsed) != 1 {
m, err = metric.New(measurementName, map[string]string{}, map[string]interface{}{}, tm)
if err != nil {
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
continue
}
} else {
m = parsed[0]
m.SetTime(tm)
}
tags["metric_type"] = metricType

measurementWithTags := measurementName
for tagName, tagValue := range tags {
tagKeyValue := fmt.Sprintf("%s=%s", keyEscaper.Replace(tagName), keyEscaper.Replace(tagValue))
measurementWithTags = fmt.Sprintf("%s,%s", measurementWithTags, tagKeyValue)
m.AddTag("metric_type", metricType)
for k, v := range tags {
m.AddTag(k, v)
}

fields := make([]string, 0)
switch t := dwmFields.(type) {
case map[string]interface{}: // json object
for fieldName, fieldValue := range t {
key := keyEscaper.Replace(fieldPrefix + fieldName)
switch v := fieldValue.(type) {
case float64:
fields = append(fields, fmt.Sprintf("%s=%f", key, v))
case string:
fields = append(fields, fmt.Sprintf("%s=\"%s\"", key, fieldEscaper.Replace(v)))
case bool:
fields = append(fields, fmt.Sprintf("%s=%t", key, v))
default: // ignore
if fields, ok := dwmFields.(map[string]interface{}); ok {
for k, v := range fields {
switch v := v.(type) {
case float64, string, bool:
m.AddField(fieldPrefix+k, v)
default:
// ignore
}
}
default: // ignore
}

metricsBuffer.WriteString(fmt.Sprintf("%s,metric_type=%s ", measurementWithTags, metricType))
metricsBuffer.WriteString(strings.Join(fields, ","))
metricsBuffer.WriteString("\n")
}

handler := influx.NewMetricHandler()
handler.SetTimeFunc(func() time.Time { return tm })
parser := influx.NewParser(handler)
newMetrics, err := parser.Parse(metricsBuffer.Bytes())
if err != nil {
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
metrics = append(metrics, m)
}

return append(metrics, newMetrics...)
default:
return metrics
}

return metrics
}

func arraymap(vs []string, f func(string) string) []string {
vsm := make([]string, len(vs))
for i, v := range vs {
vsm[i] = f(v)
}
return vsm
func (p *parser) SetTimeFunc(f metric.TimeFunc) {
p.timeFunc = f
}
Loading

0 comments on commit 0af40a8

Please sign in to comment.