From 18bceb66c5bbe247db17e48143c4bc80fdf61551 Mon Sep 17 00:00:00 2001 From: Klaudiusz Dembler Date: Thu, 23 Mar 2017 18:46:12 +0100 Subject: [PATCH] Fixes #120 Migrated to new plugin lib go --- .sync.yml | 6 +- .travis.yml | 7 +- glide.lock | 62 +++--- glide.yaml | 8 +- influxdb/influxdb.go | 323 ++++++++++++++----------------- influxdb/influxdb_medium_test.go | 296 ++++++++++++++++++---------- influxdb/influxdb_small_test.go | 47 +---- main.go | 7 +- scripts/medium.sh | 4 +- 9 files changed, 397 insertions(+), 363 deletions(-) diff --git a/.sync.yml b/.sync.yml index d4ba3a7..8adcfde 100644 --- a/.sync.yml +++ b/.sync.yml @@ -10,6 +10,8 @@ sudo: true services: - docker + go: + - 1.7.x env: global: - ORG_PATH=/home/travis/gopath/src/github.com/intelsdi-x @@ -21,9 +23,9 @@ TEST_TYPE: large matrix: exclude: - - go: 1.6.x + - go: 1.7.x env: SNAP_VERSION=latest TEST_TYPE=large - - go: 1.6.x + - go: 1.7.x env: SNAP_VERSION=latest_build TEST_TYPE=large deploy: access_key_id: AKIAINMB43VSSPFZISAA diff --git a/.travis.yml b/.travis.yml index 0827fde..689b8ab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,6 @@ sudo: true language: go go: -- 1.6.x - 1.7.x services: - docker @@ -18,11 +17,11 @@ env: - TEST_TYPE=build matrix: exclude: - - go: 1.6.x + - go: 1.7.x env: SNAP_VERSION=latest TEST_TYPE=large - - go: 1.6.x + - go: 1.7.x env: SNAP_VERSION=latest_build TEST_TYPE=large - - go: 1.6.x + - go: 1.7.x env: TEST_TYPE=build before_install: - "[[ -d $SNAP_PLUGIN_SOURCE ]] || mkdir -p $ORG_PATH && ln -s $TRAVIS_BUILD_DIR $SNAP_PLUGIN_SOURCE" diff --git a/glide.lock b/glide.lock index ae9fe7b..3de3206 100644 --- a/glide.lock +++ b/glide.lock @@ -1,37 +1,49 @@ -hash: b917cca6287ff121564c89e854b46366d68f15f8b0b795eeed7984b4d5f23eb2 -updated: 2016-12-14T11:33:29.559410375+01:00 +hash: 8bd0c4c7aad81d92fba50ee6eafd7690de026fc2b9acaee22d25df03ded2c13a +updated: 2017-03-27T10:41:45.899652789+02:00 imports: +- name: github.com/golang/protobuf + version: 888eb0692c857ec880338addf316bd662d5e630e + subpackages: + - proto - name: github.com/influxdata/influxdb - version: 855c567c67d7fff1e2f91617c292ea20cf26e53c + version: b869607dc206cc58e63718ca72ea08293b987a0e subpackages: - client/v2 - models - pkg/escape -- name: github.com/intelsdi-x/snap - version: 8905198aaab5bfb6d50a747477ca401e3a7a33ab - subpackages: - - control/plugin - - control/plugin/cpolicy - - control/plugin/encoding - - control/plugin/encrypter - - core - - core/cdata - - core/ctypes - - core/serror - - pkg/ctree - - pkg/schedule - - pkg/stringutils - - scheduler/wmap -- name: github.com/robfig/cron - version: 32d9c273155a0506d27cf73dd1246e86a470997e +- name: github.com/intelsdi-x/snap-plugin-lib-go + version: 8ae21e8f72cb1740171073d496a6b7f2cec5ca73 + subpackages: + - v1/plugin + - v1/plugin/rpc +- name: github.com/julienschmidt/httprouter + version: 6f3f3919c8781ce5c0509c83fffc887a7830c938 - name: github.com/Sirupsen/logrus - version: be52937128b38f1d99787bb476c789e2af1147f1 + version: 10f801ebc38b33738c9d17d50860f484a0988ff5 +- name: golang.org/x/net + version: 154d9f9ea81208afed560f4cf27b4860c8ed1904 + subpackages: + - context + - http2 + - http2/hpack + - internal/timeseries + - lex/httplex + - trace - name: golang.org/x/sys - version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + version: afadfcc7779c1f4db0f6f6438afcb108d9c9c7cd subpackages: - unix -- name: gopkg.in/yaml.v2 - version: c1cd2254a6dd314c9d73c338c12688c9325d85c6 +- name: google.golang.org/grpc + version: 0032a855ba5c8a3c8e0d71c2deef354b70af1584 + subpackages: + - codes + - credentials + - grpclog + - internal + - metadata + - naming + - peer + - transport testImports: - name: github.com/gopherjs/gopherjs version: 4b53e1bddba0e2f734514aeb6c02db652f4c6fe8 @@ -45,7 +57,7 @@ testImports: - internal/go-render/render - internal/oglematchers - name: github.com/smartystreets/goconvey - version: d4c757aa9afd1e2fc1832aaab209b5794eb336e1 + version: 995f5b2e021c69b8b028ba6d0b05c1dd500783db subpackages: - convey - convey/gotest diff --git a/glide.yaml b/glide.yaml index 1d5842c..4d59948 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,13 +4,9 @@ import: - package: github.com/influxdata/influxdb subpackages: - client/v2 -- package: github.com/intelsdi-x/snap - version: ^1.0 +- package: github.com/intelsdi-x/snap-plugin-lib-go subpackages: - - control/plugin - - control/plugin/cpolicy - - core - - core/ctypes + - v1/plugin testImport: - package: github.com/smartystreets/goconvey subpackages: diff --git a/influxdb/influxdb.go b/influxdb/influxdb.go index a30aabf..4cc0a68 100644 --- a/influxdb/influxdb.go +++ b/influxdb/influxdb.go @@ -20,8 +20,6 @@ limitations under the License. package influxdb import ( - "bytes" - "encoding/gob" "fmt" "io/ioutil" "net/http" @@ -33,17 +31,15 @@ import ( log "github.com/Sirupsen/logrus" "github.com/influxdata/influxdb/client/v2" - "github.com/intelsdi-x/snap/control/plugin" - "github.com/intelsdi-x/snap/control/plugin/cpolicy" - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/core/ctypes" + "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin" ) const ( - name = "influxdb" - version = 21 - pluginType = plugin.PublisherPluginType + Name = "influxdb" + Version = 22 + PluginType = "publisher" maxInt64 = ^uint64(0) / 2 + separator = "\U0001f422" // HTTP represents its string constant HTTP = "http" @@ -55,7 +51,7 @@ var ( // The maximum time a connection can sit around unused. maxConnectionIdle = time.Minute * 30 // How frequently idle connections are checked - watchConnctionWait = time.Minute * 15 + watchConnectionWait = time.Minute * 15 // Our connection pool connPool = make(map[string]*clientConnection) // Mutex for synchronizing connection pool changes @@ -67,93 +63,109 @@ func init() { go watchConnections() } -// Meta returns a plugin meta data -func Meta() *plugin.PluginMeta { - return plugin.NewPluginMeta(name, version, pluginType, []string{plugin.SnapGOBContentType}, []string{plugin.SnapGOBContentType}) +//NewInfluxPublisher returns an instance of the InfluxDB publisher +func NewInfluxPublisher() *InfluxPublisher { + return &InfluxPublisher{} } -//NewInfluxPublisher returns an instance of the InfuxDB publisher -func NewInfluxPublisher() *influxPublisher { - return &influxPublisher{} -} - -type influxPublisher struct { +type InfluxPublisher struct { } type point struct { - ns core.Namespace + ns plugin.Namespace tags map[string]string ts time.Time fields map[string]interface{} } -func (f *influxPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { - cp := cpolicy.New() - config := cpolicy.NewPolicyNode() - - r1, err := cpolicy.NewStringRule("host", true) - handleErr(err) - r1.Description = "Influxdb host" - config.Add(r1) - - r2, err := cpolicy.NewIntegerRule("port", false, 8086) - handleErr(err) - r2.Description = "Influxdb port" - config.Add(r2) - - r3, err := cpolicy.NewStringRule("database", true) - handleErr(err) - r3.Description = "Influxdb db name" - config.Add(r3) - - r4, err := cpolicy.NewStringRule("user", true) - handleErr(err) - r4.Description = "Influxdb user" - config.Add(r4) - - r5, err := cpolicy.NewStringRule("password", true) - handleErr(err) - r5.Description = "Influxdb password" - config.Add(r5) - - r6, err := cpolicy.NewStringRule("retention", false, "autogen") - handleErr(err) - r6.Description = "Influxdb retention policy" - config.Add(r6) - - r8, err := cpolicy.NewBoolRule("skip-verify", false, false) - handleErr(err) - r8.Description = "Influxdb HTTPS Skip certificate verification" - config.Add(r8) - - r9, err := cpolicy.NewStringRule("precision", false, "ns") - handleErr(err) - r9.Description = "Influxdb timestamp precision" - config.Add(r9) - - r10, err := cpolicy.NewBoolRule("isMultiFields", false, false) - handleErr(err) - r10.Description = "groupping common namespaces, those that differ at the leaf and have same tags including values, into one data point with multiple influx fields" - config.Add(r10) - - r11, err := cpolicy.NewStringRule("scheme", false, HTTP) - handleErr(err) - r11.Description = "Influxdb communication protocol" - config.Add(r11) - - cp.Add([]string{""}, config) - return cp, nil +type configuration struct { + host, database, user, password, retention, precision, scheme, logLevel string + port int64 + skipVerify, isMultiFields bool +} + +func getConfig(config plugin.Config) (configuration, error) { + cfg := configuration{} + var err error + + cfg.host, err = config.GetString("host") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "host") + } + + cfg.database, err = config.GetString("database") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "database") + } + + cfg.user, err = config.GetString("user") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "user") + } + + cfg.password, err = config.GetString("password") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "password") + } + + cfg.retention, err = config.GetString("retention") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "retention") + } + + cfg.scheme, err = config.GetString("scheme") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "scheme") + } + + cfg.logLevel, err = config.GetString("log-level") + if err != nil { + cfg.logLevel = "undefined" + } + + cfg.port, err = config.GetInt("port") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "port") + } + + cfg.skipVerify, err = config.GetBool("skip-verify") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "skip-verify") + } + + cfg.isMultiFields, err = config.GetBool("isMultiFields") + if err != nil { + return cfg, fmt.Errorf("%s: %s", err, "isMultiFields") + } + + return cfg, nil +} + +func (ip *InfluxPublisher) GetConfigPolicy() (plugin.ConfigPolicy, error) { + policy := plugin.NewConfigPolicy() + + policy.AddNewStringRule([]string{""}, "host", true) + policy.AddNewIntRule([]string{""}, "port", false, plugin.SetDefaultInt(8086)) + policy.AddNewStringRule([]string{""}, "database", true) + policy.AddNewStringRule([]string{""}, "user", true) + policy.AddNewStringRule([]string{""}, "password", true) + policy.AddNewStringRule([]string{""}, "retention", false, plugin.SetDefaultString("autogen")) + policy.AddNewBoolRule([]string{""}, "skip-verify", false, plugin.SetDefaultBool(false)) + policy.AddNewStringRule([]string{""}, "precision", false, plugin.SetDefaultString("ns")) + policy.AddNewBoolRule([]string{""}, "isMultiFields", false, plugin.SetDefaultBool(false)) + policy.AddNewStringRule([]string{""}, "scheme", false, plugin.SetDefaultString(HTTP)) + + return *policy, nil } func watchConnections() { for { - time.Sleep(watchConnctionWait) + time.Sleep(watchConnectionWait) for k, c := range connPool { - if time.Now().Sub(c.LastUsed) > maxConnectionIdle { m.Lock() // Close the connection - c.close() + c.closeClientConnection() // Remove from the pool delete(connPool, k) m.Unlock() @@ -164,24 +176,14 @@ func watchConnections() { // Publish publishes metric data to influxdb // currently only 0.9 version of influxdb are supported -func (f *influxPublisher) Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error { - logger := getLogger(config) - var metrics []plugin.MetricType - - switch contentType { - case plugin.SnapGOBContentType: - dec := gob.NewDecoder(bytes.NewBuffer(content)) - if err := dec.Decode(&metrics); err != nil { - logger.WithFields(log.Fields{ - "err": err, - }).Error("decoding error") - return err - } - default: - logger.Errorf("unknown content type '%v'", contentType) - return fmt.Errorf("Unknown content type '%s'", contentType) +func (ip *InfluxPublisher) Publish(metrics []plugin.Metric, pluginConfig plugin.Config) error { + config, err := getConfig(pluginConfig) + if err != nil { + return err } + logger := getLogger(config) + con, err := selectClientConnection(config) if err != nil { logger.Error(err) @@ -190,18 +192,18 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map //Set up batch points bps, _ := client.NewBatchPoints(client.BatchPointsConfig{ - Database: config["database"].(ctypes.ConfigValueStr).Value, - RetentionPolicy: config["retention"].(ctypes.ConfigValueStr).Value, - Precision: config["precision"].(ctypes.ConfigValueStr).Value, + Database: config.database, + RetentionPolicy: config.retention, + Precision: config.precision, }) - isMultiFields := config["isMultiFields"].(ctypes.ConfigValueBool).Value + isMultiFields := config.isMultiFields mpoints := map[string]point{} for _, m := range metrics { tags := map[string]string{} - ns := m.Namespace().Strings() + ns := m.Namespace.Strings() - isDynamic, indexes := m.Namespace().IsDynamic() + isDynamic, indexes := m.Namespace.IsDynamic() if isDynamic { for i, j := range indexes { // The second return value from IsDynamic(), in this case `indexes`, is the index of @@ -212,38 +214,38 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map // // Remove "data" from the namespace and create a tag for it ns = append(ns[:j-i], ns[j-i+1:]...) - tags[m.Namespace()[j].Name] = m.Namespace()[j].Value + tags[m.Namespace[j].Name] = m.Namespace[j].Value } } // Add "unit"" if we do not already have a "unit" tag - if _, ok := m.Tags()["unit"]; !ok { - tags["unit"] = m.Unit() + if _, ok := m.Tags["unit"]; !ok { + tags["unit"] = m.Unit } // Process the tags for this metric - for k, v := range m.Tags() { + for k, v := range m.Tags { // Convert the standard tag describing where the plugin is running to "source" - if k == core.STD_TAG_PLUGIN_RUNNING_ON { + if k == "plugin_running_on" { // Unless the "source" tag is already being used - if _, ok := m.Tags()["source"]; !ok { + if _, ok := m.Tags["source"]; !ok { k = "source" } } tags[k] = v } - data := m.Data() + data := m.Data //publishing of nil value causes errors if data == nil { - log.Errorf("Received nil value of metric, this metric is not published, namespace: %s, timestamp: %s", m.Namespace().String(), m.Timestamp().String()) + log.Errorf("Received nil value of metric, this metric will not be published, namespace: %s, timestamp: %s", strings.Join(m.Namespace.Strings(), "/"), m.Timestamp.String()) continue } // NOTE: uint64 is specifically not supported by influxdb client due to potential overflow //without convertion of uint64 to int64, data with uint64 type will be saved as strings in influx database - v, ok := m.Data().(uint64) + v, ok := m.Data.(uint64) if ok { data = int64(v) if v > maxInt64 { @@ -254,7 +256,7 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map if !isMultiFields { pt, err := client.NewPoint(strings.Join(ns, "/"), tags, map[string]interface{}{ "value": data, - }, m.Timestamp()) + }, m.Timestamp) if err != nil { logger.WithFields(log.Fields{ "err": err, @@ -292,7 +294,7 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map }).Error("publishing failed") // Remove connction from pool since something is wrong m.Lock() - con.close() + con.closeClientConnection() delete(connPool, con.Key) m.Unlock() return err @@ -304,65 +306,27 @@ func (f *influxPublisher) Publish(contentType string, content []byte, config map return nil } -func handleErr(e error) { - if e != nil { - panic(e) - } -} - -func getLogger(config map[string]ctypes.ConfigValue) *log.Entry { +func getLogger(config configuration) *log.Entry { logger := log.WithFields(log.Fields{ - "plugin-name": name, - "plugin-version": version, - "plugin-type": pluginType.String(), + "plugin-name": Name, + "plugin-version": Version, + "plugin-type": PluginType, }) // default log.SetLevel(log.WarnLevel) - if debug, ok := config["debug"]; ok { - switch v := debug.(type) { - case ctypes.ConfigValueBool: - if v.Value { - log.SetLevel(log.DebugLevel) - return logger - } - default: - logger.WithFields(log.Fields{ - "field": "debug", - "type": v, - "expected type": "ctypes.ConfigValueBool", - }).Error("invalid config type") - } - } - - if loglevel, ok := config["log-level"]; ok { - switch v := loglevel.(type) { - case ctypes.ConfigValueStr: - switch strings.ToLower(v.Value) { - case "warn": - log.SetLevel(log.WarnLevel) - case "error": - log.SetLevel(log.ErrorLevel) - case "debug": - log.SetLevel(log.DebugLevel) - case "info": - log.SetLevel(log.InfoLevel) - default: - log.WithFields(log.Fields{ - "value": strings.ToLower(v.Value), - "acceptable values": "warn, error, debug, info", - }).Warn("invalid config value") - } - default: - logger.WithFields(log.Fields{ - "field": "log-level", - "type": v, - "expected type": "ctypes.ConfigValueStr", - }).Error("invalid config type") + levelValue := config.logLevel + if levelValue != "undefined" { + if level, err := log.ParseLevel(strings.ToLower(levelValue)); err == nil { + log.SetLevel(level) + } else { + log.WithFields(log.Fields{ + "value": strings.ToLower(levelValue), + "acceptable values": "warn, error, debug, info", + }).Warn("Invalid log-level config value") } } - return logger } @@ -441,21 +405,19 @@ func (c *clientConnection) write(bps client.BatchPoints) error { } // Map the close function into client.Client -func (c *clientConnection) close() error { +func (c *clientConnection) closeClientConnection() error { return (*c.Conn).Close() } -func selectClientConnection(config map[string]ctypes.ConfigValue) (*clientConnection, error) { +func selectClientConnection(config configuration) (*clientConnection, error) { // This is not an ideal way to get the logger but deferring solving this for a later date logger := getLogger(config) - scheme := config["scheme"].(ctypes.ConfigValueStr).Value - if strings.Trim(scheme, " ") == "" { - scheme = HTTP - } + scheme := config.scheme - u, err := url.Parse(fmt.Sprintf("%s://%s:%d", scheme, config["host"].(ctypes.ConfigValueStr).Value, config["port"].(ctypes.ConfigValueInt).Value)) + u, err := url.Parse(fmt.Sprintf("%s://%s:%d", scheme, config.host, config.port)) if err != nil { + logger.Error("Error parsing URL") return nil, err } @@ -463,10 +425,9 @@ func selectClientConnection(config map[string]ctypes.ConfigValue) (*clientConnec m.Lock() defer m.Unlock() - user := config["user"].(ctypes.ConfigValueStr).Value - pass := config["password"].(ctypes.ConfigValueStr).Value - db := config["database"].(ctypes.ConfigValueStr).Value - skipVerify := config["skip-verify"].(ctypes.ConfigValueBool).Value + user := config.user + pass := config.password + db := config.database key := connectionKey(u, user, db) // Do we have a existing client? @@ -479,7 +440,7 @@ func selectClientConnection(config map[string]ctypes.ConfigValue) (*clientConnec Addr: u.String(), Username: user, Password: pass, - InsecureSkipVerify: skipVerify, + InsecureSkipVerify: config.skipVerify, }) } else { con, err = client.NewUDPClient(client.UDPConfig{ @@ -523,8 +484,8 @@ func connectionKey(u *url.URL, user, db string) string { } // groupCommonNamespaces groups common namespaces, those that differ at the leaf, into one data point with multiple influx fields. -func groupCommonNamespaces(m plugin.MetricType, tags map[string]string, mpoints map[string]point) { - elems := m.Namespace() +func groupCommonNamespaces(m plugin.Metric, tags map[string]string, mpoints map[string]point) { + elems := m.Namespace // Slices to the second to last s2l := elems[:len(elems)-1] if len(s2l) == 0 { @@ -540,7 +501,7 @@ func groupCommonNamespaces(m plugin.MetricType, tags map[string]string, mpoints mkeys = append(mkeys, s2l.Strings()...) // Converts the map keys to a string key - sk := strings.Join(mkeys, core.Separator) + sk := strings.Join(mkeys, separator) // Groups fields by the namespace common prefix and tags fieldName := elems[len(elems)-1].Value @@ -548,10 +509,10 @@ func groupCommonNamespaces(m plugin.MetricType, tags map[string]string, mpoints mpoints[sk] = point{ ns: s2l, tags: tags, - ts: m.Timestamp(), - fields: map[string]interface{}{fieldName: m.Data()}, + ts: m.Timestamp, + fields: map[string]interface{}{fieldName: m.Data}, } } else { - p.fields[fieldName] = m.Data() + p.fields[fieldName] = m.Data } } diff --git a/influxdb/influxdb_medium_test.go b/influxdb/influxdb_medium_test.go index 5df15f8..bcd1973 100644 --- a/influxdb/influxdb_medium_test.go +++ b/influxdb/influxdb_medium_test.go @@ -18,22 +18,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package influxdb import ( - "bytes" - "encoding/gob" "net/http" "os" "strings" "testing" "time" - "github.com/intelsdi-x/snap/control/plugin" - "github.com/intelsdi-x/snap/core" - "github.com/intelsdi-x/snap/core/ctypes" - . "github.com/smartystreets/goconvey/convey" + + "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin" ) func init() { @@ -54,7 +51,6 @@ func init() { } func TestInfluxPublish(t *testing.T) { - config := make(map[string]ctypes.ConfigValue) Convey("snap plugin InfluxDB integration testing with Influx", t, func() { var retention string @@ -64,97 +60,122 @@ func TestInfluxPublish(t *testing.T) { } else { retention = "autogen" } - config["host"] = ctypes.ConfigValueStr{Value: os.Getenv("SNAP_INFLUXDB_HOST")} - config["skip-verify"] = ctypes.ConfigValueBool{Value: false} - config["user"] = ctypes.ConfigValueStr{Value: "root"} - config["password"] = ctypes.ConfigValueStr{Value: "root"} - config["database"] = ctypes.ConfigValueStr{Value: "test"} - config["retention"] = ctypes.ConfigValueStr{Value: retention} - config["debug"] = ctypes.ConfigValueBool{Value: false} - config["log-level"] = ctypes.ConfigValueStr{Value: "debug"} - config["precison"] = ctypes.ConfigValueStr{Value: "s"} - - tests("", config) - - config["scheme"] = ctypes.ConfigValueStr{Value: HTTP} - config["port"] = ctypes.ConfigValueInt{Value: 8086} + config := plugin.Config{ + "host": os.Getenv("SNAP_INFLUXDB_HOST"), + "skip-verify": false, + "user": "root", + "password": "root", + "database": "test", + "retention": retention, + "isMultiFields": false, + "debug": false, + "log-level": "debug", + "precision": "s", + } + + config["scheme"] = HTTP + config["port"] = int64(8086) tests(HTTP, config) - config["scheme"] = ctypes.ConfigValueStr{Value: UDP} - config["port"] = ctypes.ConfigValueInt{Value: 4444} + config["scheme"] = UDP + config["port"] = int64(4444) tests(UDP, config) }) } -func tests(scheme string, config map[string]ctypes.ConfigValue) { - var buf bytes.Buffer - - ip := NewInfluxPublisher() - cp, _ := ip.GetConfigPolicy() - cfg, _ := cp.Get([]string{""}).Process(config) +func tests(scheme string, config plugin.Config) { + ip := &InfluxPublisher{} tags := map[string]string{"zone": "red"} + mcfg := map[string]interface{}{"field": "abc123"} Convey(" Publish integer metric via "+scheme, func() { - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("foo"), time.Now(), tags, "some unit", 99), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("foo"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 99, + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish float metric via "+scheme, func() { - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("bar"), time.Now(), tags, "some unit", 3.141), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("bar"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 3.141, + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish string metric via "+scheme, func() { - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("qux"), time.Now(), tags, "some unit", "bar"), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("qux"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: "bar", + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish boolean metric via "+scheme, func() { - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("baz"), time.Now(), tags, "some unit", true), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("baz"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: true, + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish multiple metrics via "+scheme, func() { - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("foo"), time.Now(), tags, "some unit", 101), - *plugin.NewMetricType(core.NewNamespace("bar"), time.Now(), tags, "some unit", 5.789), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("foo"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 13, + }, + { + Namespace: plugin.NewNamespace("bar"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 2.718, + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish dynamic metrics via "+scheme, func() { - dynamicNS1 := core.NewNamespace("foo"). + dynamicNS1 := plugin.NewNamespace("foo"). AddDynamicElement("dynamic", "dynamic elem"). AddStaticElement("bar") - dynamicNS2 := core.NewNamespace("foo"). + dynamicNS2 := plugin.NewNamespace("foo"). AddDynamicElement("dynamic_one", "dynamic element one"). AddDynamicElement("dynamic_two", "dynamic element two"). AddStaticElement("baz") @@ -163,60 +184,137 @@ func tests(scheme string, config map[string]ctypes.ConfigValue) { dynamicNS2[1].Value = "barval" dynamicNS2[2].Value = "bazval" - metrics := []plugin.MetricType{ - *plugin.NewMetricType(dynamicNS1, time.Now(), tags, "", 123), - *plugin.NewMetricType(dynamicNS2, time.Now(), tags, "", 456), + metrics := []plugin.Metric{ + { + Namespace: dynamicNS1, + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 17, + }, + { + Namespace: dynamicNS2, + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 23, + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish nil value of metric via "+scheme, func() { - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("baz"), time.Now(), tags, "some unit", nil), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("baz"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "empty unit", + Data: nil, + }, } - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish multiple fields to one metric via "+scheme, func() { - config["isMultiFields"] = ctypes.ConfigValueBool{Value: true} - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("a", "b", "x"), time.Now(), tags, "test unit", 123.6), - *plugin.NewMetricType(core.NewNamespace("a", "b", "y"), time.Now(), tags, "test unit", 765.3), - *plugin.NewMetricType(core.NewNamespace("a", "b", "z"), time.Now(), tags, "test unit", 12345), - *plugin.NewMetricType(core.NewNamespace("a", "b", "z"), time.Now(), tags, "testunit", 11111), + config["isMultiFields"] = true + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("a", "b", "x"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 123.456, + }, + { + Namespace: plugin.NewNamespace("a", "b", "y"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 987.654, + }, + { + Namespace: plugin.NewNamespace("a", "b", "z"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 18, + }, + { + Namespace: plugin.NewNamespace("a", "b", "z"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 512, + }, } - - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) Convey("Publish multiple fields to two metrics via "+scheme, func() { - config["isMultiFields"] = ctypes.ConfigValueBool{Value: true} + config["isMultiFields"] = true ntags := map[string]string{"zone": "red", "light": "yellow"} - metrics := []plugin.MetricType{ - *plugin.NewMetricType(core.NewNamespace("influx", "x"), time.Now(), tags, "test unit", 333.6), - *plugin.NewMetricType(core.NewNamespace("influx", "y"), time.Now(), tags, "test unit", 222.3), - *plugin.NewMetricType(core.NewNamespace("influx", "z"), time.Now(), tags, "test unit", 1111), - *plugin.NewMetricType(core.NewNamespace("influx", "r"), time.Now(), ntags, "unittest ", 777), - *plugin.NewMetricType(core.NewNamespace("influx", "s"), time.Now(), ntags, "unittest", 888), - *plugin.NewMetricType(core.NewNamespace("influx", "s"), time.Now(), ntags, "unit test", 999), + metrics := []plugin.Metric{ + { + Namespace: plugin.NewNamespace("influx", "x"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 333.6, + }, + { + Namespace: plugin.NewNamespace("influx", "y"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 666.3, + }, + { + Namespace: plugin.NewNamespace("influx", "z"), + Timestamp: time.Now(), + Config: mcfg, + Tags: tags, + Unit: "someunit", + Data: 173, + }, + { + Namespace: plugin.NewNamespace("influx", "r"), + Timestamp: time.Now(), + Config: mcfg, + Tags: ntags, + Unit: "someunit", + Data: 256, + }, + { + Namespace: plugin.NewNamespace("influx", "s"), + Timestamp: time.Now(), + Config: mcfg, + Tags: ntags, + Unit: "someunit", + Data: 128, + }, + { + Namespace: plugin.NewNamespace("influx", "s"), + Timestamp: time.Now(), + Config: mcfg, + Tags: ntags, + Unit: "someunit", + Data: 64, + }, } - - buf.Reset() - enc := gob.NewEncoder(&buf) - enc.Encode(metrics) - err := ip.Publish(plugin.SnapGOBContentType, buf.Bytes(), *cfg) + err := ip.Publish(metrics, config) So(err, ShouldBeNil) }) } diff --git a/influxdb/influxdb_small_test.go b/influxdb/influxdb_small_test.go index e211704..97ecd0b 100644 --- a/influxdb/influxdb_small_test.go +++ b/influxdb/influxdb_small_test.go @@ -23,28 +23,22 @@ package influxdb import ( "testing" - "github.com/intelsdi-x/snap/control/plugin" - "github.com/intelsdi-x/snap/control/plugin/cpolicy" - "github.com/intelsdi-x/snap/core/ctypes" . "github.com/smartystreets/goconvey/convey" + + "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin" ) func TestInfluxDBPlugin(t *testing.T) { - Convey("Meta should return metadata for the plugin", t, func() { - meta := Meta() - So(meta.Name, ShouldResemble, name) - So(meta.Version, ShouldResemble, version) - So(meta.Type, ShouldResemble, plugin.PublisherPluginType) - }) Convey("Create InfluxPublisher", t, func() { ip := NewInfluxPublisher() - Convey("So ip should not be nil", func() { + Convey("So publisher should not be nil", func() { So(ip, ShouldNotBeNil) }) - Convey("So ip should be of influxPublisher type", func() { - So(ip, ShouldHaveSameTypeAs, &influxPublisher{}) + Convey("So publisher should be of InfluxPublisher type", func() { + So(ip, ShouldHaveSameTypeAs, &InfluxPublisher{}) }) + configPolicy, err := ip.GetConfigPolicy() Convey("ip.GetConfigPolicy() should return a config policy", func() { Convey("So config policy should not be nil", func() { @@ -53,33 +47,8 @@ func TestInfluxDBPlugin(t *testing.T) { Convey("So we should not get an err retreiving the config policy", func() { So(err, ShouldBeNil) }) - Convey("So config policy should be a cpolicy.ConfigPolicy", func() { - So(configPolicy, ShouldHaveSameTypeAs, &cpolicy.ConfigPolicy{}) - }) - testConfig := make(map[string]ctypes.ConfigValue) - testConfig["host"] = ctypes.ConfigValueStr{Value: "localhost"} - testConfig["port"] = ctypes.ConfigValueInt{Value: 8086} - testConfig["scheme"] = ctypes.ConfigValueStr{Value: "http"} - testConfig["skip-verify"] = ctypes.ConfigValueBool{Value: false} - testConfig["user"] = ctypes.ConfigValueStr{Value: "root"} - testConfig["password"] = ctypes.ConfigValueStr{Value: "root"} - testConfig["database"] = ctypes.ConfigValueStr{Value: "test"} - testConfig["retention"] = ctypes.ConfigValueStr{Value: "testretention"} - testConfig["precison"] = ctypes.ConfigValueStr{Value: "s"} - cfg, errs := configPolicy.Get([]string{""}).Process(testConfig) - Convey("So config policy should process testConfig and return a config", func() { - So(cfg, ShouldNotBeNil) - }) - Convey("So testConfig processing should return no errors", func() { - So(errs.HasErrors(), ShouldBeFalse) - }) - testConfig["port"] = ctypes.ConfigValueStr{Value: "8086"} - cfg, errs = configPolicy.Get([]string{""}).Process(testConfig) - Convey("So config policy should not return a config after processing invalid testConfig", func() { - So(cfg, ShouldBeNil) - }) - Convey("So testConfig processing should return errors", func() { - So(errs.HasErrors(), ShouldBeTrue) + Convey("So config policy should be of plugin.ConfigPolicy type", func() { + So(configPolicy, ShouldHaveSameTypeAs, plugin.ConfigPolicy{}) }) }) }) diff --git a/main.go b/main.go index f0ee711..272b2ea 100644 --- a/main.go +++ b/main.go @@ -20,13 +20,10 @@ limitations under the License. package main import ( - "os" - + "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin" "github.com/intelsdi-x/snap-plugin-publisher-influxdb/influxdb" - "github.com/intelsdi-x/snap/control/plugin" ) func main() { - meta := influxdb.Meta() - plugin.Start(meta, influxdb.NewInfluxPublisher(), os.Args[1]) + plugin.StartPublisher(influxdb.NewInfluxPublisher(), influxdb.Name, influxdb.Version) } diff --git a/scripts/medium.sh b/scripts/medium.sh index 287835d..c6ef8b3 100755 --- a/scripts/medium.sh +++ b/scripts/medium.sh @@ -15,7 +15,7 @@ __proj_name="$(basename $__proj_dir)" export INFLUXDB_VERSION="${INFLUXDB_VERSION:-"1.0"}" export PLUGIN_SRC="${__proj_dir}" -export GOLANGVER="${GOLANGVER:-"1.6.2"}" +export GOLANGVER=${GOLANGVER:-"go1.7.4"} TEST_TYPE="${TEST_TYPE:-"medium"}" @@ -34,7 +34,7 @@ _debug "running test: ${TEST_TYPE}" # sleep for a few seconds giving influxd time to finish initializing sleep 3 _docker_project docker-compose exec -T golang gvm-bash.sh "curl -i -XPOST http://influxdb:8086/query --data-urlencode \"q=CREATE DATABASE test\"" -_docker_project docker-compose exec -T golang gvm-bash.sh "gvm use $GOLANGVER; export INFLUXDB_VERSION=$INFLUXDB_VERSION; mkdir -p ${_docker_org_path}; cp -Rf /${__proj_name} ${_docker_org_path}; (cd ${_docker_proj_path} && ./scripts/medium_tests.sh)" +_docker_project docker-compose exec -T golang gvm-bash.sh "gvm install $GOLANGVER -B; gvm use $GOLANGVER; export INFLUXDB_VERSION=$INFLUXDB_VERSION; mkdir -p ${_docker_org_path}; cp -Rf /${__proj_name} ${_docker_org_path}; (cd ${_docker_proj_path} && ./scripts/medium_tests.sh)" _debug "stopping docker compose images" _docker_project docker-compose stop _docker_project docker-compose rm -f