Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #105 from candysmurf/multi-fields
Browse files Browse the repository at this point in the history
SDI-2375:Support mapping (many) snap.Metrics to a (single) data point…
  • Loading branch information
candysmurf authored Jan 9, 2017
2 parents 37ee89b + afcf8bb commit 0479736
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 15 deletions.
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,36 @@ You can also set the following options if needed:
- `https` defaults to `false` (boolean). Set to true to connect to InfluxDB via HTTPS.
- `skip-verify` defaults to `false` (boolean). Set to true to complain if the certificate used is not issued by a trusted CA.
- `precision` defaults to `s` (string). The value can be changed to any of the following: n,u,ms,s,m,h. This will determine the precision of timestamps.

- `isMultiFields` defaults to `false` (boolean). When it's true, plugin groups common namespaces, those that differ at the leaf and have same tags including values, into one data point with multiple influx fields.

### Examples

See [examples/tasks](https://github.com/intelsdi-x/snap-plugin-publisher-influxdb/tree/master/examples/tasks) folder for examples
See [examples/tasks](https://github.com/intelsdi-x/snap-plugin-publisher-influxdb/tree/master/examples/tasks) folder for examples.

Here are samples to illustrate the differences for `isMultiFields` flag. When *isMultiFields* is `false` which is the default setting,
you have to query each measurement. While *isMultiFields* is `true`, plugin groups the common namespaces, those that differ at the leaf and have same tags including values, into one data point with multiple influx fields; you query the common namespace.

**Sample** *`isMultiField=false`*
```
select * from "/intel/psutil/load/load1"
```

| time | source | unit | value |
|---------------------|---------------|---------|-------|
| 1483997727411599704 | egu-mac01.lan | Load/1M | 1.76 |
| 1483997728412178616 | egu-mac01.lan | Load/1M | 1.76 |


**Sample** *`isMultiField=true`*
```
select * from "/intel/psutil/load"
```

| time | load1 | load15 | load5 | source | unit |
|---------------------|-------|--------|-------|---------------|---------|
| 1483996289995839909 | 2.05 | | | egu-mac01.lan | Load/1M |
| 1483996289995839909 | | 6.21 | | egu-mac01.lan | Load/1M |
| 1483996289995839909 | | | 5.26 | egu-mac01.lan | Load/1M |

### Roadmap

Expand Down
94 changes: 81 additions & 13 deletions influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

const (
name = "influxdb"
version = 17
version = 18
pluginType = plugin.PublisherPluginType
maxInt64 = ^uint64(0) / 2
)
Expand Down Expand Up @@ -72,6 +72,13 @@ func NewInfluxPublisher() *influxPublisher {
type influxPublisher struct {
}

type point struct {
ns core.Namespace
tags map[string]string
ts time.Time
fields map[string]interface{}
}

func (f *influxPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
cp := cpolicy.New()
config := cpolicy.NewPolicyNode()
Expand Down Expand Up @@ -116,11 +123,16 @@ func (f *influxPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
r8.Description = "Influxdb HTTPS Skip certificate verification"
config.Add(r8)

r9, err := cpolicy.NewStringRule("precision", false, "s")
r9, err := cpolicy.NewStringRule("precision", false, "ns")
handleErr(err)
r9.Description = "Influxdb timestamp precision"
config.Add(r9)

r10, err := cpolicy.NewBoolRule("isMultiFields", false, false)
handleErr(err)
r10.Description = "groupping common namespaces, those that differ at the leaf, into one data point with multiple influx fields"
config.Add(r10)

cp.Add([]string{""}, config)
return cp, nil
}
Expand Down Expand Up @@ -175,6 +187,8 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map
Precision: config["precision"].(ctypes.ConfigValueStr).Value,
})

isMultiFields := config["isMultiFields"].(ctypes.ConfigValueBool).Value
mpoints := map[string]point{}
for _, m := range metrics {
tags := map[string]string{}
ns := m.Namespace().Strings()
Expand Down Expand Up @@ -228,18 +242,38 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map
log.Errorf("Overflow during conversion uint64 to int64, value after conversion to int64: %d, desired uint64 value: %d ", data, v)
}
}
pt, err := client.NewPoint(strings.Join(ns, "/"), tags, map[string]interface{}{
"value": data,
}, m.Timestamp())
if err != nil {
logger.WithFields(log.Fields{
"err": err,
"batch-points": bps.Points(),
"point": pt,
}).Error("Publishing failed. Problem creating data point")
return err

if !isMultiFields {
pt, err := client.NewPoint(strings.Join(ns, "/"), tags, map[string]interface{}{
"value": data,
}, m.Timestamp())
if err != nil {
logger.WithFields(log.Fields{
"err": err,
"batch-points": bps.Points(),
"point": pt,
}).Error("Publishing failed. Problem creating data point")
return err
}
bps.AddPoint(pt)
} else {
groupCommonNamespaces(m, tags, mpoints)
}
}

if isMultiFields {
for _, p := range mpoints {
pt, err := client.NewPoint(p.ns.String(), p.tags, p.fields, p.ts)
if err != nil {
logger.WithFields(log.Fields{
"err": err,
"batch-points": bps.Points(),
"point": pt,
}).Error("Publishing failed. Problem creating data point")
return err
}
bps.AddPoint(pt)
}
bps.AddPoint(pt)
}

err = con.write(bps)
Expand Down Expand Up @@ -399,3 +433,37 @@ func selectClientConnection(config map[string]ctypes.ConfigValue) (*clientConnec
func connectionKey(u *url.URL, user, db string) string {
return fmt.Sprintf("%s:%s:%s", u.String(), user, db)
}

// groupCommonNamespaces groups common namespaces, those that differ at the leaf, into one data point with multiple influx fields.
func groupCommonNamespaces(m plugin.MetricType, tags map[string]string, mpoints map[string]point) {
elems := m.Namespace()
// Slices to the second to last
s2l := elems[:len(elems)-1]
if len(s2l) == 0 {
s2l = elems
}

// Appends tag keys
mkeys := []string{}
for k, v := range tags {
mkeys = append(mkeys, k, v)
}
// Appends namespace prefix
mkeys = append(mkeys, s2l.Strings()...)

// Converts the map keys to a string key
sk := strings.Join(mkeys, core.Separator)

// Groups fields by the namespace common prefix and tags
fieldName := elems[len(elems)-1].Value
if p, ok := mpoints[sk]; !ok {
mpoints[sk] = point{
ns: s2l,
tags: tags,
ts: m.Timestamp(),
fields: map[string]interface{}{fieldName: m.Data()},
}
} else {
p.fields[fieldName] = m.Data()
}
}
36 changes: 36 additions & 0 deletions influxdb/influxdb_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestInfluxPublish(t *testing.T) {
})

Convey("Publish float metric", func() {
tags["test"] = "test"
metrics := []plugin.MetricType{
*plugin.NewMetricType(core.NewNamespace("bar"), time.Now(), tags, "some unit", 3.141),
}
Expand Down Expand Up @@ -173,5 +174,40 @@ func TestInfluxPublish(t *testing.T) {
err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg)
So(err, ShouldBeNil)
})

Convey("Publish multiple fields to one metric", func() {
config["isMultiFields"] = ctypes.ConfigValueBool{Value: true}
metrics := []plugin.MetricType{
*plugin.NewMetricType(core.NewNamespace("a", "b", "x"), time.Now(), tags, "test unit", 123.6),
*plugin.NewMetricType(core.NewNamespace("a", "b", "y"), time.Now(), tags, "test unit", 765.3),
*plugin.NewMetricType(core.NewNamespace("a", "b", "z"), time.Now(), tags, "test unit", 12345),
*plugin.NewMetricType(core.NewNamespace("a", "b", "z"), time.Now(), tags, "testunit", 11111),
}

buf.Reset()
enc := gob.NewEncoder(&buf)
enc.Encode(metrics)
err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg)
So(err, ShouldBeNil)
})

Convey("Publish multiple fields to two metrics", func() {
config["isMultiFields"] = ctypes.ConfigValueBool{Value: true}
ntags := map[string]string{"zone": "red", "light": "yellow"}
metrics := []plugin.MetricType{
*plugin.NewMetricType(core.NewNamespace("influx", "x"), time.Now(), tags, "test unit", 333.6),
*plugin.NewMetricType(core.NewNamespace("influx", "y"), time.Now(), tags, "test unit", 222.3),
*plugin.NewMetricType(core.NewNamespace("influx", "z"), time.Now(), tags, "test unit", 1111),
*plugin.NewMetricType(core.NewNamespace("influx", "r"), time.Now(), ntags, "unittest ", 777),
*plugin.NewMetricType(core.NewNamespace("influx", "s"), time.Now(), ntags, "unittest", 888),
*plugin.NewMetricType(core.NewNamespace("influx", "s"), time.Now(), ntags, "unit test", 999),
}

buf.Reset()
enc := gob.NewEncoder(&buf)
enc.Encode(metrics)
err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg)
So(err, ShouldBeNil)
})
})
}

0 comments on commit 0479736

Please sign in to comment.