From 28fb9b50ac8800f36e57d713118b813a0e3f7627 Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Fri, 12 Oct 2018 11:42:47 -0400 Subject: [PATCH 1/5] add basic support for handler listener function (#307) --- Gopkg.lock | 34 +++-- examples/listener/.gitignore | 2 + examples/listener/Makefile | 31 +++++ examples/listener/README.md | 84 ++++++++++++ examples/listener/config.yml | 5 + examples/listener/config/device/pusher.yaml | 18 +++ examples/listener/plugin.go | 99 ++++++++++++++ examples/listener/pusher/main.go | 35 +++++ sdk/data_manager.go | 136 +++++++++++++++++++- sdk/data_manager_test.go | 20 +++ sdk/device.go | 9 +- sdk/plugin.go | 34 +++++ 12 files changed, 489 insertions(+), 18 deletions(-) create mode 100644 examples/listener/.gitignore create mode 100644 examples/listener/Makefile create mode 100644 examples/listener/README.md create mode 100644 examples/listener/config.yml create mode 100644 examples/listener/config/device/pusher.yaml create mode 100644 examples/listener/plugin.go create mode 100644 examples/listener/pusher/main.go diff --git a/Gopkg.lock b/Gopkg.lock index 5dfd8cb4..a3f77b22 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,12 @@ [[projects]] - digest = "1:d867dfa6751c8d7a435821ad3b736310c2ed68945d05b50fb9d23aee0540c8cc" + digest = "1:3f53e9e4dfbb664cd62940c9c4b65a2171c66acd0b7621a1a6b8e78513525a52" name = "github.com/Sirupsen/logrus" packages = ["."] pruneopts = "UT" - revision = "3e01752db0189b9157070a0e1668a620f9a85da2" - version = "v1.0.6" + revision = "ad15b42461921f1fb3529b058c6786c6a45d5162" + version = "v1.1.1" [[projects]] digest = "1:7f769a7ea697a8e50c8a79a829f53a469e3ca7b8e314434ea9d9a25ca8401cb7" @@ -39,6 +39,14 @@ revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5" version = "v1.2.0" +[[projects]] + digest = "1:0a69a1c0db3591fcefb47f115b224592c8dfa4368b7ba9fae509d5e16cdc95c8" + name = "github.com/konsorten/go-windows-terminal-sequences" + packages = ["."] + pruneopts = "UT" + revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242" + version = "v1.0.1" + [[projects]] digest = "1:808cdddf087fb64baeae67b8dfaee2069034d9704923a3cb8bd96a995421a625" name = "github.com/patrickmn/go-cache" @@ -85,11 +93,11 @@ name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "UT" - revision = "182538f80094b6a8efaade63a8fd8e0d9d5843dd" + revision = "a92615f3c49003920a58dedcf32cf55022cefb8d" [[projects]] branch = "master" - digest = "1:1427ef3c5200ade53e1569b34a7fd49dff8df0c2b3cdb9539a727f69ae5eddfa" + digest = "1:5dbea1a55ed20bd18a9584b8d50a431ceedc4c48a5872fceb429d59a4c30b3f9" name = "golang.org/x/net" packages = [ "context", @@ -101,18 +109,18 @@ "trace", ] pruneopts = "UT" - revision = "8a410e7b638dca158bf9e766925842f6651ff828" + revision = "49bb7cea24b1df9410e1712aa6433dae904ff66a" [[projects]] branch = "master" - digest = "1:8ba0a13c04eb83e02db82d7fc4029a62b07b502615424db95dfd2a5be82bca3b" + digest = "1:306e2db17d22ee51108d97f0e053755543b4d504d5b699e8aea295cbc4514306" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "UT" - revision = "49385e6e15226593f68b26af201feec29d5bba22" + revision = "fa43e7bc11baaae89f3f902b2b4d832b68234844" [[projects]] digest = "1:7509ba4347d1f8de6ae9be8818b0cd1abc3deeffe28aeaf4be6d4b6b5178d9ca" @@ -147,14 +155,14 @@ [[projects]] branch = "master" - digest = "1:077c1c599507b3b3e9156d17d36e1e61928ee9b53a5b420f10f28ebd4a0b275c" + digest = "1:56b0bca90b7e5d1facf5fbdacba23e4e0ce069d25381b8e2f70ef1e7ebfb9c1a" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] pruneopts = "UT" - revision = "c66870c02cf823ceb633bcd05be3c7cda29976f4" + revision = "af9cb2a35e7f169ec875002c1829c9b315cddc04" [[projects]] - digest = "1:74f4a872f90f363778088654a2edea7101d62e6fc0784945d38b0ef85e1828d8" + digest = "1:23f6471aaea2518fc2f7be25dc12de5980aeab1220c765e657e5c131f550df1c" name = "google.golang.org/grpc" packages = [ ".", @@ -185,8 +193,8 @@ "tap", ] pruneopts = "UT" - revision = "32fb0ac620c32ba40a4626ddf94d90d12cce3455" - version = "v1.14.0" + revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1" + version = "v1.15.0" [[projects]] digest = "1:342378ac4dcb378a5448dd723f0784ae519383532f5e70ade24132c4c8693202" diff --git a/examples/listener/.gitignore b/examples/listener/.gitignore new file mode 100644 index 00000000..76163154 --- /dev/null +++ b/examples/listener/.gitignore @@ -0,0 +1,2 @@ +plugin +device \ No newline at end of file diff --git a/examples/listener/Makefile b/examples/listener/Makefile new file mode 100644 index 00000000..cfc1e0fd --- /dev/null +++ b/examples/listener/Makefile @@ -0,0 +1,31 @@ +# +# Listener Plugin Example +# + +PLUGIN_VERSION := 1.0 + +GIT_COMMIT ?= $(shell git rev-parse --short HEAD 2> /dev/null || true) +GIT_TAG ?= $(shell git describe --tags 2> /dev/null || true) +BUILD_DATE := $(shell date -u +%Y-%m-%dT%T 2> /dev/null) +GO_VERSION := $(shell go version | awk '{ print $$3 }') + +PKG_CTX := github.com/vapor-ware/synse-sdk/sdk +LDFLAGS := -w \ + -X ${PKG_CTX}.BuildDate=${BUILD_DATE} \ + -X ${PKG_CTX}.GitCommit=${GIT_COMMIT} \ + -X ${PKG_CTX}.GitTag=${GIT_TAG} \ + -X ${PKG_CTX}.GoVersion=${GO_VERSION} \ + -X ${PKG_CTX}.PluginVersion=${PLUGIN_VERSION} + + +all: build pusher + +build: + @go build -ldflags "${LDFLAGS}" -o plugin + +pusher: + @go build -o device ./pusher/... + + +.PHONY: all build pusher +.DEFAULT_GOAL := all diff --git a/examples/listener/README.md b/examples/listener/README.md new file mode 100644 index 00000000..e60b156c --- /dev/null +++ b/examples/listener/README.md @@ -0,0 +1,84 @@ +### Listener Plugin + +This directory contains an example of a simple plugin that defines a listener +function in its device handler. Generally, a device that uses a listener is one +that generates push-based data for the plugin to collect. The listener will +listen for this data and update the plugin state accordingly. + +In this case, there is only one kind of device, a "pusher". It will push random +data. In order to collect pushed data, we need something to actually push that +data. A simple program is defined in the "pusher" directory which can be run +alongside this plugin to provide the data. See the next section on how to build +and run the plugin and the pusher data source. + +#### Usage + +To build the pusher data source program, simply +```bash +make pusher +``` +from within this directory. The plugin binary can be built with +```bash +make build +``` + +Both binaries will be output to the 'listener' directory and should +be named `device` and `plugin`, respectively. You can run both simultaneously +in separate shell instances (order doesn't matter): + +**Shell 1** +```console +$ ./device +2018/10/12 11:03:15 Sending data on: :8553 +2018/10/12 11:03:15 << 2596996162 +2018/10/12 11:03:18 << 4039455774 +2018/10/12 11:03:21 << 2854263694 +2018/10/12 11:03:24 << 1879968118 +2018/10/12 11:03:27 << 1823804162 +2018/10/12 11:03:30 << 2949882636 +2018/10/12 11:03:33 << 281908850 +``` + +**Shell 2** +```console +./plugin +DEBU[0000] [sdk] adding 1 devices from config +DEBU[0000] [sdk] executing 0 pre-run action(s) +DEBU[0000] [sdk] executing 0 device setup action(s) +INFO[0000] Plugin Info: +INFO[0000] Tag: vaporio/listener-plugin +INFO[0000] Name: listener plugin +INFO[0000] Maintainer: vaporio +INFO[0000] Description: An example plugin with listener device +INFO[0000] VCS: +INFO[0000] Version Info: +INFO[0000] Plugin Version: 1.0 +INFO[0000] SDK Version: 1.1.0 +INFO[0000] Git Commit: 95a2def +INFO[0000] Git Tag: 1.1.0 +INFO[0000] Build Date: 2018-10-12T15:01:46 +INFO[0000] Go Version: go1.10.2 +INFO[0000] OS/Arch: darwin/amd64 +INFO[0000] Registered Devices: +INFO[0000] rack-1-board-1-f9def8b577bf354577e7c0c907fc5b86 (pusher) +INFO[0000] -------------------------------- +DEBU[0000] [sdk] starting plugin run +DEBU[0000] [sdk] registering default health checks +DEBU[0000] [health] new periodic health check interval=30s name="read buffer health" +DEBU[0000] [health] new periodic health check interval=30s name="write buffer health" +DEBU[0000] [data manager] setting up data manager state +INFO[0000] [data manager] setting up listeners handler=pusher +INFO[0000] [data manager] starting read goroutine (reads enabled) mode=serial +INFO[0000] [data manager] starting write goroutine (writes enabled) mode=serial +INFO[0000] [data manager] running +DEBU[0000] [grpc] setting up server mode=unix +DEBU[0000] [server] configuring grpc server for insecure transport +INFO[0000] [grpc] listening on unix:/tmp/synse/procs/example-plugin.sock +INFO[0000] [data manager] running listener device=f9def8b577bf354577e7c0c907fc5b86 handler=pusher +[listener] got data: 2854263694 +[listener] got data: 1879968118 +[listener] got data: 1823804162 +[listener] got data: 2949882636 +[listener] got data: 281908850 +``` + diff --git a/examples/listener/config.yml b/examples/listener/config.yml new file mode 100644 index 00000000..ea07aa19 --- /dev/null +++ b/examples/listener/config.yml @@ -0,0 +1,5 @@ +version: 1.2 # listeners added in cfg version 1.2 +debug: true +network: + type: unix + address: example-plugin.sock diff --git a/examples/listener/config/device/pusher.yaml b/examples/listener/config/device/pusher.yaml new file mode 100644 index 00000000..af0436b9 --- /dev/null +++ b/examples/listener/config/device/pusher.yaml @@ -0,0 +1,18 @@ +version: 1.0 +locations: + - name: r1b1 + rack: + name: rack-1 + board: + name: board-1 +devices: + - name: pusher + metadata: + model: test-device + outputs: + - type: push_data + instances: + - info: Test Pusher Device + location: r1b1 + data: + address: "localhost:8553" diff --git a/examples/listener/plugin.go b/examples/listener/plugin.go new file mode 100644 index 00000000..9c07330c --- /dev/null +++ b/examples/listener/plugin.go @@ -0,0 +1,99 @@ +package main + +import ( + "encoding/binary" + "fmt" + "log" + "net" + + "github.com/vapor-ware/synse-sdk/sdk" +) + +var ( + pluginName = "listener plugin" + pluginMaintainer = "vaporio" + pluginDesc = "An example plugin with listener device" +) + +// Output types are defined, either statically in the plugin code, or via YAML +// configuration files. They define the potential outputs of the plugin's devices. +// A single device could support multiple outputs, but at a minimum requires one. +var ( + // The random data coming back from the pusher is random and meaningless, + // so we don't ascribe any precision or unit to it. + pusherOutput = sdk.OutputType{ + Name: "push_data", + } +) + +// Device Handlers need to be defined to tell the plugin how to handle reads and +// writes for the different kinds of devices it supports. +var ( + // pusherHandler defines the listen behavior for the "pusher" device kind. + pusherHandler = sdk.DeviceHandler{ + Name: "pusher", + Listen: func(device *sdk.Device, data chan *sdk.ReadContext) error { + // The device data defines the host/port to listen on. + address := device.Data["address"].(string) + + addr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + return err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return err + } + buffer := make([]byte, 4) + for { + size, err := conn.Read(buffer) + if err != nil { + // failed read, try again + continue + } + if size != 4 { + // Unexpected packet size, try again + continue + } + value := binary.LittleEndian.Uint32(buffer) + fmt.Printf("[listener] got data: %v\n", value) + reading, err := device.GetOutput("push_data").MakeReading(value) + if err != nil { + return err + } + data <- sdk.NewReadContext(device, []*sdk.Reading{reading}) + } + }, + } +) + +func main() { + // Set the metadata for the plugin. + sdk.SetPluginMeta( + pluginName, + pluginMaintainer, + pluginDesc, + "", + ) + + // Create a new Plugin instance. + plugin := sdk.NewPlugin() + + // Register our output types with the Plugin. + err := plugin.RegisterOutputTypes( + &pusherOutput, + ) + if err != nil { + log.Fatal(err) + } + + // Register our device handlers with the Plugin. + plugin.RegisterDeviceHandlers( + &pusherHandler, + ) + + // Run the plugin. + if err := plugin.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/examples/listener/pusher/main.go b/examples/listener/pusher/main.go new file mode 100644 index 00000000..a43bdb4d --- /dev/null +++ b/examples/listener/pusher/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "encoding/binary" + "log" + "math/rand" + "net" + "time" +) + +func main() { + addr, err := net.ResolveUDPAddr("udp", ":8553") + if err != nil { + log.Fatal(err) + } + + conn, err := net.Dial("udp", addr.String()) + if err != nil { + log.Fatal(err) + } + defer conn.Close() // nolint: errcheck + + log.Printf("Sending data on: %v", addr.String()) + for { + b := make([]byte, 4) + data := rand.Uint32() + binary.LittleEndian.PutUint32(b, data) + log.Printf("<< %v", data) + _, err := conn.Write(b) + if err != nil { + log.Printf("failed to write. continuing.") + } + time.Sleep(3 * time.Second) + } +} diff --git a/sdk/data_manager.go b/sdk/data_manager.go index 3bbf86ca..34d1b4c2 100644 --- a/sdk/data_manager.go +++ b/sdk/data_manager.go @@ -15,6 +15,29 @@ import ( // DataManager is the global data manager for the plugin. var DataManager = newDataManager() +// ListenerCtx is the context needed for a listener function to be called +// and retried at a later time if it errors out after the listener goroutine +// is initially dispatched. +type ListenerCtx struct { + // handler is the DeviceHandler that defines the handler function. + handler *DeviceHandler + + // device is the Device that is being listened to via the listener. + device *Device + + // restarts is the number of times the listener has been restarted. + restarts int +} + +// NewListenerCtx creates a new ListenerCtx for the given handler and device. +func NewListenerCtx(handler *DeviceHandler, device *Device) *ListenerCtx { + return &ListenerCtx{ + handler: handler, + device: device, + restarts: 0, + } +} + // dataManager handles the reading from and writing to configured devices. // It executes the read and write goroutines and uses the channels between // those goroutines and its process to update the read and write state. @@ -34,6 +57,18 @@ type dataManager struct { // received by the `pollWrite` function. writeChannel chan *WriteContext + // listenChannel is the channel that is used to get data from a device + // that is being listened to. This is used by the SDK to collect push + // based data. While the data in the listenChannel and the data in the + // readChannel are similar, they are kept separate so the behaviors of + // pull-based/push-based reading can be tuned independently. + listenChannel chan *ReadContext + + // listenerRetry is a channel that all listeners will pass a ListenerRetryCtx + // to if they fail. This channel is read by a separate goroutine which will + // attempt to re-run the listener. + listenerRetry chan *ListenerCtx + // readings is a map of readings, where the key is the GUID of a // device, and the values are the readings associated with that device. readings map[string][]*Reading @@ -68,13 +103,17 @@ func (manager *dataManager) run() error { return err } - // Start the reader/writer + // Start the listeners/reader/writer + manager.goListen() manager.goRead() manager.goWrite() // Update the manager readings state manager.goUpdateData() + // Watch for failed listeners to retry them + go manager.watchForListenerRetry() + log.Info("[data manager] running") return nil } @@ -88,7 +127,9 @@ func (manager *dataManager) setup() error { return fmt.Errorf("plugin config not set, cannot setup data manager") } - // Initialize the read and write channels + // Initialize the listen, read, and write channels + manager.listenerRetry = make(chan *ListenerCtx, 50) + manager.listenChannel = make(chan *ReadContext, Config.Plugin.Settings.Listen.Buffer) manager.readChannel = make(chan *ReadContext, Config.Plugin.Settings.Read.Buffer) manager.writeChannel = make(chan *WriteContext, Config.Plugin.Settings.Write.Buffer) @@ -108,6 +149,78 @@ func (manager *dataManager) writesEnabled() bool { return Config.Plugin.Settings.Write.Enabled } +// goListen starts the goroutines for any listener functions for the configured +// devices. If there are no listener functions defined, this will do nothing. +func (manager *dataManager) goListen() { + // Although we consider listening to be a type of "read" behavior (e.g. collecting + // push-based readings vs. collecting pull-based readings), we use different + // configuration fields for listening to make it easier to tune independent of + // pull-based collection needs. If listening is globally disabled, there is + // nothing to do here. + if !Config.Plugin.Settings.Listen.Enabled { + log.Info("[data manager] skipping listener goroutine(s) (listen disabled)") + return + } + + // For each handler that has a listener function defined, get the devices for + // that handler and start the listener for the devices. + for _, handler := range ctx.deviceHandlers { + hlog := log.WithField("handler", handler.Name) + if handler.Listen != nil { + hlog.Info("[data manager] setting up listeners") + + // Get all of the devices that have registered with the handler + devices := handler.getDevicesForHandler() + if len(devices) == 0 { + hlog.Debugf("[data manager] found no devices for handler") + continue + } + + // For each device, run the listener goroutine + for _, device := range devices { + ctx := NewListenerCtx(handler, device) + go manager.runListener(ctx) + } + } + } +} + +// runListener runs the listener function for a device. If the listener +// fails, it will attempt to restart the listener. +func (manager *dataManager) runListener(ctx *ListenerCtx) { + log.WithFields(log.Fields{ + "handler": ctx.handler.Name, + "device": ctx.device.ID(), + }).Info("[data manager] running listener") + + err := ctx.handler.Listen(ctx.device, manager.listenChannel) + if err != nil { + log.WithField("device", ctx.device.ID()).Errorf( + "[data manager] failed to listen for device readings: %v", err, + ) + // pass the context to retry channel + manager.listenerRetry <- ctx + } +} + +// watchForListenerRetry waits for the 'runListener' function to pass a +// listener context to it via the 'listenerRetry' channel. If it gets +// a context, that listener had failed and needs to be restarted. +func (manager *dataManager) watchForListenerRetry() { + for { + ctx := <-manager.listenerRetry + // increment the restart counter + ctx.restarts++ + + llog := log.WithFields(log.Fields{ + "manager": ctx.handler.Name, + "device": ctx.device.ID(), + }) + llog.Infof("[data manager] restarting failed listener (restarts %v)", ctx.restarts) + go manager.runListener(ctx) + } +} + // goRead starts the goroutine for reading from configured devices. func (manager *dataManager) goRead() { mode := Config.Plugin.Settings.Mode @@ -378,9 +491,24 @@ func (manager *dataManager) write(w *WriteContext) { func (manager *dataManager) goUpdateData() { go func() { for { - reading := <-manager.readChannel + var ( + id string + readings []*Reading + ) + + // Read from the listen and read channel for incoming readings + select { + case reading := <-manager.readChannel: + id = reading.ID() + readings = reading.Reading + case reading := <-manager.listenChannel: + id = reading.ID() + readings = reading.Reading + } + + // Update the internal map of current reading state manager.dataLock.Lock() - manager.readings[reading.ID()] = reading.Reading + manager.readings[id] = readings manager.dataLock.Unlock() } }() diff --git a/sdk/data_manager_test.go b/sdk/data_manager_test.go index 7604504b..dd6a37a4 100644 --- a/sdk/data_manager_test.go +++ b/sdk/data_manager_test.go @@ -72,6 +72,7 @@ func TestDataManager_readOneOkNoLimiter(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -129,6 +130,7 @@ func TestDataManager_readOneOkWithLimiter(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, @@ -186,6 +188,7 @@ func TestDataManager_readOneErr(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -234,6 +237,7 @@ func TestDataManager_readBulkOkNoLimiter(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -310,6 +314,7 @@ func TestDataManager_readBulkOkWithLimiter(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, @@ -386,6 +391,7 @@ func TestDataManager_readBulkError(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -440,6 +446,7 @@ func TestDataManager_serialReadSingle(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -501,6 +508,7 @@ func TestDataManager_serialReadSingleBulk(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -576,6 +584,7 @@ func TestDataManager_parallelReadSingle(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -638,6 +647,7 @@ func TestDataManager_parallelReadSingleBulk(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -713,6 +723,7 @@ func TestDataManager_serialReadMultiple(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -815,6 +826,7 @@ func TestDataManager_parallelReadMultiple(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -918,6 +930,7 @@ func TestDataManager_writeOkNoLimiter(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -979,6 +992,7 @@ func TestDataManager_writeOkWithLimiter(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, @@ -1041,6 +1055,7 @@ func TestDataManager_writeNoDevice(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -1083,6 +1098,7 @@ func TestDataManager_writeError(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, } @@ -1144,6 +1160,7 @@ func TestDataManager_serialWriteSingle(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200, Max: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, @@ -1208,6 +1225,7 @@ func TestDataManager_serialWriteMultiple(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200, Max: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, @@ -1294,6 +1312,7 @@ func TestDataManager_parallelWriteSingle(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200, Max: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, @@ -1358,6 +1377,7 @@ func TestDataManager_parallelWriteMultiple(t *testing.T) { Settings: &PluginSettings{ Read: &ReadSettings{Buffer: 200}, Write: &WriteSettings{Buffer: 200, Max: 200}, + Listen: &ListenSettings{Buffer: 100}, Transaction: &TransactionSettings{TTL: "2s"}, }, Limiter: &LimiterSettings{Rate: 200, Burst: 200}, diff --git a/sdk/device.go b/sdk/device.go index 108a5014..c33e69cb 100644 --- a/sdk/device.go +++ b/sdk/device.go @@ -48,6 +48,13 @@ type DeviceHandler struct { // If a device does not support bulk read, this can be left as nil. Additionally, // a device can only be bulk read if there is no Read handler set. BulkRead func([]*Device) ([]*ReadContext, error) + + // Listen is a function that will listen for push-based data from the device. + // This function is called one per device using the handler, even if there are + // other handler functions (e.g. read, write) defined. The listener function + // will run in a separate goroutine for each device. The goroutines are started + // before the read/write loops. + Listen func(*Device, chan *ReadContext) error } // supportsBulkRead checks if the handler supports bulk reading for its Devices. @@ -347,7 +354,7 @@ func (device *Device) Write(data *WriteData) error { // IsReadable checks if the Device is readable based on the presence/absence // of a Read/BulkRead action defined in its DeviceHandler. func (device *Device) IsReadable() bool { - return device.Handler.Read != nil || device.Handler.BulkRead != nil + return device.Handler.Read != nil || device.Handler.BulkRead != nil || device.Handler.Listen != nil } // IsWritable checks if the Device is writable based on the presence/absence diff --git a/sdk/plugin.go b/sdk/plugin.go index ac79dfe5..c3273d4b 100644 --- a/sdk/plugin.go +++ b/sdk/plugin.go @@ -371,6 +371,10 @@ type PluginSettings struct { // be "serial" or "parallel". Mode string `default:"serial" yaml:"mode,omitempty" addedIn:"1.0"` + // Listen contains the settings to configure listener behavior. + // FIXME (etd) - field versioning is messed up, for now leaving this at 1.0 + Listen *ListenSettings `default:"{}" yaml:"listen,omitempty" addedIn:"1.0"` + // Read contains the settings to configure read behavior. Read *ReadSettings `default:"{}" yaml:"read,omitempty" addedIn:"1.0"` @@ -509,6 +513,36 @@ func (settings LimiterSettings) Validate(multiErr *errors.MultiError) { } } +// ListenSettings provides configuration options for listener operations. +// A listener is a function that is used to collect push-based data. +type ListenSettings struct { + // FIXME (etd) - field versioning is messed up, for now leaving this at 1.0 + + // Enabled globally enables or disables listening for the plugin. + // By default a plugin will have listening enabled. + Enabled bool `default:"true" yaml:"enabled,omitempty" addedIn:"1.0"` + + // Buffer defines the size of the listen buffer. This will be the + // size of the channel that passes all the collected push data from + // all listener instances to the data manager. + Buffer int `default:"100" yaml:"buffer,omitempty" addedIn:"1.0"` +} + +// Validate validates that the ListenSettings has no confiugration errors. +func (settings ListenSettings) Validate(multiErr *errors.MultiError) { + // If the buffer size is set to 0, return an error. A size + // of 0 would prevent any data from being moved around, blocking + // all listen operations. + if settings.Buffer <= 0 { + log.WithField("config", settings).Error("[validation] bad listen buffer") + multiErr.Add(errors.NewInvalidValueError( + multiErr.Context["source"], + "settings.listen.buffer", + "a value greater than 0", + )) + } +} + // ReadSettings provides configuration options for read operations. type ReadSettings struct { // Enabled globally enables or disables reading for the plugin. From 4fc076d22820c801464a0f3b4e945d5824eedc1b Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Fri, 12 Oct 2018 12:15:20 -0400 Subject: [PATCH 2/5] simplify scope of config field versioning (#308) --- sdk/config.go | 31 ++++++++++++++++--------------- sdk/config_test.go | 12 ++++++++---- sdk/plugin.go | 9 +++------ sdk/validate_test.go | 3 +-- 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/sdk/config.go b/sdk/config.go index 8d4445e5..fada3863 100644 --- a/sdk/config.go +++ b/sdk/config.go @@ -85,6 +85,16 @@ func (ctx *ConfigContext) IsOutputTypeConfig() bool { return ok } +// TODO (etd) [v2]: In SDK v2, we can probably get rid of this. While versioning +// the configuration fields is a unique approach to ensuring config compatibility, +// it doesn't actually buy us much and at this point just adds complexity to the +// code base. We should be fine to version the config files themselves (e.g. +// 1, 1.0, v1, ...) and distinguish a v1 config from a v2 config, but at the most +// all we would be able to do from that is complain and say that the given config +// file is not compatible with the current version of the SDK, so it really only +// makes sense to have validation of version at the config level, not at the field +// level. For v1, we will keep this in for compatibility, but this can be removed +// for v2. All similar components will be marked with a TODO [v2] tag for removal. const ( tagAddedIn = "addedIn" tagDeprecatedIn = "deprecatedIn" @@ -95,6 +105,9 @@ const ( // that can be compared to other SchemeVersions. type ConfigVersion struct { Major int + + // TODO (etd) [v2]: for v1, disabled checking against the minor version, + // can be removed for v2. Minor int } @@ -142,31 +155,19 @@ func (version *ConfigVersion) String() string { // IsLessThan returns true if the Version is less than the Version // provided as a parameter. func (version *ConfigVersion) IsLessThan(other *ConfigVersion) bool { - if version.Major < other.Major { - return true - } - if version.Major == other.Major && version.Minor < other.Minor { - return true - } - return false + return version.Major < other.Major } // IsGreaterOrEqualTo returns true if the ConfigVersion is greater than or equal to // the Version provided as a parameter. func (version *ConfigVersion) IsGreaterOrEqualTo(other *ConfigVersion) bool { - if version.Major > other.Major { - return true - } - if version.Major == other.Major && version.Minor >= other.Minor { - return true - } - return false + return version.Major >= other.Major } // IsEqual returns true if the Version is equal to the Version provided // as a parameter. func (version *ConfigVersion) IsEqual(other *ConfigVersion) bool { - return version.Major == other.Major && version.Minor == other.Minor + return version.Major == other.Major } // SchemeVersion is a struct that is used to extract the configuration diff --git a/sdk/config_test.go b/sdk/config_test.go index 5514847d..c542f236 100644 --- a/sdk/config_test.go +++ b/sdk/config_test.go @@ -267,6 +267,8 @@ func TestSchemeVersion_String(t *testing.T) { } // TestSchemeVersion_IsEqual test equality of SchemeVersions +// TODO (etd) [v2]: this will be changed for v2. for v1, this was updated to +// only check on the major component of the version. func TestSchemeVersion_IsEqual(t *testing.T) { var testTable = []struct { scheme1 *ConfigVersion @@ -296,7 +298,7 @@ func TestSchemeVersion_IsEqual(t *testing.T) { { scheme1: &ConfigVersion{1, 1}, scheme2: &ConfigVersion{1, 2}, - equal: false, + equal: true, }, } @@ -307,6 +309,8 @@ func TestSchemeVersion_IsEqual(t *testing.T) { } // TestSchemeVersion_IsLessThan tests if one Version is less than another +// TODO (etd) [v2]: this will be changed for v2. for v1, this was updated to +// only check on the major component of the version. func TestSchemeVersion_IsLessThan(t *testing.T) { var testTable = []struct { scheme1 *ConfigVersion @@ -336,7 +340,7 @@ func TestSchemeVersion_IsLessThan(t *testing.T) { { scheme1: &ConfigVersion{1, 1}, scheme2: &ConfigVersion{1, 2}, - lessThan: true, + lessThan: false, }, { scheme1: &ConfigVersion{1, 2}, @@ -352,7 +356,7 @@ func TestSchemeVersion_IsLessThan(t *testing.T) { } // TestSchemeVersion_IsGreaterOrEqualTo tests if one Version is greater than -// or qual to another +// or equal to another func TestSchemeVersion_IsGreaterOrEqualTo(t *testing.T) { var testTable = []struct { scheme1 *ConfigVersion @@ -382,7 +386,7 @@ func TestSchemeVersion_IsGreaterOrEqualTo(t *testing.T) { { scheme1: &ConfigVersion{1, 1}, scheme2: &ConfigVersion{1, 2}, - gte: false, + gte: true, }, { scheme1: &ConfigVersion{1, 2}, diff --git a/sdk/plugin.go b/sdk/plugin.go index c3273d4b..0bd5a68d 100644 --- a/sdk/plugin.go +++ b/sdk/plugin.go @@ -372,8 +372,7 @@ type PluginSettings struct { Mode string `default:"serial" yaml:"mode,omitempty" addedIn:"1.0"` // Listen contains the settings to configure listener behavior. - // FIXME (etd) - field versioning is messed up, for now leaving this at 1.0 - Listen *ListenSettings `default:"{}" yaml:"listen,omitempty" addedIn:"1.0"` + Listen *ListenSettings `default:"{}" yaml:"listen,omitempty" addedIn:"1.2"` // Read contains the settings to configure read behavior. Read *ReadSettings `default:"{}" yaml:"read,omitempty" addedIn:"1.0"` @@ -516,16 +515,14 @@ func (settings LimiterSettings) Validate(multiErr *errors.MultiError) { // ListenSettings provides configuration options for listener operations. // A listener is a function that is used to collect push-based data. type ListenSettings struct { - // FIXME (etd) - field versioning is messed up, for now leaving this at 1.0 - // Enabled globally enables or disables listening for the plugin. // By default a plugin will have listening enabled. - Enabled bool `default:"true" yaml:"enabled,omitempty" addedIn:"1.0"` + Enabled bool `default:"true" yaml:"enabled,omitempty" addedIn:"1.2"` // Buffer defines the size of the listen buffer. This will be the // size of the channel that passes all the collected push data from // all listener instances to the data manager. - Buffer int `default:"100" yaml:"buffer,omitempty" addedIn:"1.0"` + Buffer int `default:"100" yaml:"buffer,omitempty" addedIn:"1.2"` } // Validate validates that the ListenSettings has no confiugration errors. diff --git a/sdk/validate_test.go b/sdk/validate_test.go index e4ed87ae..28af7293 100644 --- a/sdk/validate_test.go +++ b/sdk/validate_test.go @@ -353,7 +353,6 @@ func TestSchemeValidator_Validate_Complex_Ok(t *testing.T) { Source: "", Config: &complexTestConfig{ SchemeVersion: SchemeVersion{Version: "1.0"}, - Foo: true, FloatVal: 20, IntVal: 3, UintVal: 2, @@ -429,7 +428,7 @@ func TestSchemeValidator_Validate_Complex_Error(t *testing.T) { err := validator.Validate(toValidate) assert.Error(t, err.Err()) - assert.Equal(t, 2, len(err.Errors), err.Error()) + assert.Equal(t, 1, len(err.Errors), err.Error()) // check that validation cleanup was successful checkValidationCleanup(t) From ed9a5ee8348c98f5c7a6e274014b470b5cec4d55 Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Wed, 17 Oct 2018 09:53:02 -0400 Subject: [PATCH 3/5] Windowed cache for reading data (#311) --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- internal/test/grpc.go | 49 +++++ sdk/cache.go | 139 +++++++++++++ sdk/cache_test.go | 412 +++++++++++++++++++++++++++++++++++++++ sdk/data_manager.go | 24 ++- sdk/data_manager_test.go | 11 ++ sdk/device.go | 2 +- sdk/device_test.go | 94 +++++++++ sdk/plugin.go | 26 +++ sdk/plugin_test.go | 94 +++++++++ sdk/server.go | 23 +++ sdk/server_test.go | 173 ++++++++++++++++ sdk/type_test.go | 38 ++++ sdk/utils.go | 16 ++ sdk/utils_test.go | 78 ++++++++ 16 files changed, 1180 insertions(+), 7 deletions(-) create mode 100644 sdk/cache.go create mode 100644 sdk/cache_test.go diff --git a/Gopkg.lock b/Gopkg.lock index a3f77b22..3e8de049 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -80,12 +80,12 @@ version = "v1.2.2" [[projects]] - branch = "master" - digest = "1:bf033fb06435e52e54e920d172f72b11b2ea91e44b2b9a5e21e57a616590f9d7" + branch = "cached-readings" + digest = "1:a95be5a656edf57b48a06335bbb627fc2eb5f37890df1183bbaab9443c43cae1" name = "github.com/vapor-ware/synse-server-grpc" packages = ["go"] pruneopts = "UT" - revision = "5a6280c2ec33b2eb3781e7700ddfec04294bf3c6" + revision = "b080d990ed2c4a7b52d6b1510b76fd2e2c911d4f" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 99bae262..2baddad8 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -47,7 +47,7 @@ [[constraint]] name = "github.com/vapor-ware/synse-server-grpc" - branch = "master" + branch = "cached-readings" [[constraint]] branch = "master" diff --git a/internal/test/grpc.go b/internal/test/grpc.go index e7f40a34..b2e079f5 100644 --- a/internal/test/grpc.go +++ b/internal/test/grpc.go @@ -7,6 +7,10 @@ import ( "google.golang.org/grpc" ) +// +// CAPABILITIES +// + // MockCapabilitiesStream mocks the stream for the Capabilities request, with no error. type MockCapabilitiesStream struct { grpc.ServerStream @@ -36,6 +40,10 @@ func (mock *MockCapabilitiesStreamErr) Send(capability *synse.DeviceCapability) return fmt.Errorf("grpc error") } +// +// DEVICES +// + // MockDevicesStream mocks the stream for the Devices request, with no error. type MockDevicesStream struct { grpc.ServerStream @@ -65,6 +73,10 @@ func (mock *MockDevicesStreamErr) Send(device *synse.Device) error { return fmt.Errorf("grpc error") } +// +// READ +// + // MockReadStream mocks the stream for the Read request, with no error. type MockReadStream struct { grpc.ServerStream @@ -94,6 +106,43 @@ func (mock *MockReadStreamErr) Send(reading *synse.Reading) error { return fmt.Errorf("grpc error") } +// +// READ CACHED +// + +// MockReadCachedStream mocks the stream for the ReadCached request, with no error. +type MockReadCachedStream struct { + grpc.ServerStream + Results []*synse.DeviceReading +} + +// NewMockReadCachedStream creates a new mock read cache stream. +func NewMockReadCachedStream() *MockReadCachedStream { + return &MockReadCachedStream{ + Results: []*synse.DeviceReading{}, + } +} + +// Send fulfils the stream interface for the mock grpc stream. +func (mock *MockReadCachedStream) Send(reading *synse.DeviceReading) error { + mock.Results = append(mock.Results, reading) + return nil +} + +// MockReadCachedStreamErr mocks the stream for a ReadCached request, with error. +type MockReadCachedStreamErr struct { + grpc.ServerStream +} + +// Send fulfils the stream interface for the mock grpc stream. +func (mock *MockReadCachedStreamErr) Send(reading *synse.DeviceReading) error { + return fmt.Errorf("grpc error") +} + +// +// TRANSACTION +// + // MockTransactionStream mocks the stream for the Transaction request, with no error. type MockTransactionStream struct { grpc.ServerStream diff --git a/sdk/cache.go b/sdk/cache.go new file mode 100644 index 00000000..b30053ea --- /dev/null +++ b/sdk/cache.go @@ -0,0 +1,139 @@ +package sdk + +import ( + "time" + + log "github.com/Sirupsen/logrus" + "github.com/patrickmn/go-cache" +) + +// readingsCache is the cache that will store the readings collected by the +// plugin, if it is enabled in the plugin configuration. +var readingsCache *cache.Cache + +// cacheContexts is how ReadContexts are stored in the readings cache. Since +// we may want to filter readings based on the timestamp they were added, we +// want to store the ReadContexts against a timestamp key. In order to support +// multiple contexts at a given time, we store them as a slice. +type cacheContexts []*ReadContext + +// setupReadingsCache sets up a cache that will be used to store readings, +// if it is enabled in the plugin configuration. +func setupReadingsCache() { + cacheSettings := Config.Plugin.Settings.Cache + if cacheSettings.Enabled { + log.Debugf("[cache] readings cache is enabled") + if readingsCache == nil { + log.WithField( + "ttl", cacheSettings.TTL, + ).Info("[cache] creating new readings cache") + readingsCache = cache.New(cacheSettings.TTL, cacheSettings.TTL*2) + } + } else { + log.Debug("[cache] readings cache disabled - will only provide current readings") + } +} + +// addReading adds a reading to the readings cache. +func addReadingToCache(ctx *ReadContext) { + if Config.Plugin.Settings.Cache.Enabled { + now := GetCurrentTime() + item, exists := readingsCache.Get(now) + if !exists { + newCtxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set(now, &newCtxs, cache.DefaultExpiration) + } else { + cached := item.(*cacheContexts) + *cached = append(*cached, ctx) + } + } +} + +// getReadingsFromCache takes optional start/end bounds (which can be left as empty strings +// to specify no bounds on the reading data) and a channel. It will collect all pertinent +// reading data from the cache and pass it through the channel. Once the function returns, +// the channel will be closed. +func getReadingsFromCache(start, end string, readings chan *ReadContext) { + // Whether we exit the function by passing all cached readings through + // the channel or by erroring out, we want to close the channel. This + // will signal to caller (who provides the channel) that we are done. + defer close(readings) + + // Parse the timestamps for the starting and ending bounds on the data + // window, if they are set. + startTime, err := ParseRFC3339Nano(start) + if err != nil { + log.Errorf("[cache] failed to parse start time: %v", err) + } + endTime, err := ParseRFC3339Nano(end) + if err != nil { + log.Errorf("[cache] failed to parse end time: %v", err) + } + + // If caching reads is disabled, just return all of the current + // tracked readings, if they fall within the specified time bound. + // Otherwise, collect the readings from the cache. + if Config.Plugin.Settings.Cache.Enabled { + getCachedReadings(startTime, endTime, readings) + } else { + getCurrentReadings(readings) + } +} + +// getCachedReadings gets the readings from the read cache, filters them based +// on the provided start and end bounds, and passes them to the provided channel. +func getCachedReadings(start, end time.Time, readings chan *ReadContext) { + for ts, item := range readingsCache.Items() { + cachedTime, err := ParseRFC3339Nano(ts) + if err != nil { + // If we can't parse the timestamp from the cache, an error is logged + // and we move on. We should always be using RFC3339 formatted timestamps + // as keys when things get inserted, so if we find something in there that + // does not conform, it means something is wrong and we should not use it + // (data corruption, something added incorrectly, ...) + log.Error("[cache] failed to parse RFC3339 timestamp from cache - ignoring") + continue + } + + // If we have a start bound, check that the cached items are + // within that bound. If not, ignore them. + if !start.IsZero() && cachedTime.Before(start) { + continue + } + + // If we have an end bound, check that the cached items are + // within that bound. If not, ignore them. + if !end.IsZero() && cachedTime.After(end) { + continue + } + + // Pass the read contexts to the channel + ctxs := item.Object.(*cacheContexts) + for _, ctx := range *ctxs { + readings <- ctx + } + } +} + +// getCurrentReadings gets the current readings from the data manager and passes +// them to the provided channel. +func getCurrentReadings(readings chan *ReadContext) { + for deviceID, data := range DataManager.getAllReadings() { + // We have the device ID, but we will also want the provenance info + // (rack, board, device), so we will need to lookup the device by ID. + dev, ok := ctx.devices[deviceID] + if !ok { + log.WithField( + "id", deviceID, + ).Error("[cache] found orphan reading (id does not match any known devices)") + continue + } + + readings <- &ReadContext{ + Rack: dev.Location.Rack, + Board: dev.Location.Board, + Device: dev.id, + Reading: data, + } + } +} diff --git a/sdk/cache_test.go b/sdk/cache_test.go new file mode 100644 index 00000000..b8c950cd --- /dev/null +++ b/sdk/cache_test.go @@ -0,0 +1,412 @@ +package sdk + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// Test setting up the readings cache when it is enabled in the config. +func Test_setupReadingsCache_Enabled(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + + assert.Nil(t, readingsCache) + setupReadingsCache() + assert.NotNil(t, readingsCache) +} + +// Test setting up the readings cache when it is disabled in the config. +func Test_setupReadingsCache_Disabled(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: false, + }, + }, + } + + assert.Nil(t, readingsCache) + setupReadingsCache() + assert.Nil(t, readingsCache) +} + +// Test adding a reading to the cache when the timestamp does not already +// exist in the cache. +// TODO: figure out how to mock out GetCurrentTime to test when the timestamp does exist +func Test_addReadingToCache_Enabled(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + assert.Equal(t, 0, readingsCache.ItemCount()) + addReadingToCache(&ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + }) + assert.Equal(t, 1, readingsCache.ItemCount()) +} + +// Test adding a reading to the cache when it is disabled in the config. +func Test_addReadingToCache_Disabled(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: false, + }, + }, + } + + assert.Nil(t, readingsCache) + addReadingToCache(&ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + }) + assert.Nil(t, readingsCache) +} + +// Test getting readings when the cache is enabled. +func Test_getReadingsFromCache_Enabled(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + // manually add to the readingsCache + ctx := &ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + } + ctxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set("2018-10-16T22:08:50.000000000Z", &ctxs, 0) + + c := make(chan *ReadContext, 10) + go getReadingsFromCache("", "", c) + + var results []*ReadContext + for r := range c { + results = append(results, r) + } + assert.Equal(t, 1, len(results)) +} + +// Test getting readings from the cache with a start bound applied. +func Test_getReadingsFromCache_Enabled_Start(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + // manually add to the readingsCache + ctx := &ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + } + ctxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set("2018-10-16T22:08:50.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:51.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:52.000000000Z", &ctxs, 0) + + c := make(chan *ReadContext, 10) + go getReadingsFromCache("2018-10-16T22:08:51.000000000Z", "", c) + + var results []*ReadContext + for r := range c { + results = append(results, r) + } + assert.Equal(t, 2, len(results)) +} + +// Test getting readings from the cache with an end bound applied. +func Test_getReadingsFromCache_Enabled_End(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + // manually add to the readingsCache + ctx := &ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + } + ctxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set("2018-10-16T22:08:50.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:51.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:52.000000000Z", &ctxs, 0) + + c := make(chan *ReadContext, 10) + go getReadingsFromCache("", "2018-10-16T22:08:51.000000000Z", c) + + var results []*ReadContext + for r := range c { + results = append(results, r) + } + assert.Equal(t, 2, len(results)) +} + +// Test getting readings from the cache with both start and end bounds applied. +func Test_getReadingsFromCache_Enabled_StartEnd(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + // manually add to the readingsCache + ctx := &ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + } + ctxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set("2018-10-16T22:08:50.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:51.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:52.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:53.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:54.000000000Z", &ctxs, 0) + + c := make(chan *ReadContext, 10) + go getReadingsFromCache("2018-10-16T22:08:51.500000000Z", "2018-10-16T22:08:53.000000000Z", c) + var results []*ReadContext + for r := range c { + results = append(results, r) + } + assert.Equal(t, 2, len(results)) +} + +// Test getting readings from the cache when no data falls within the bounds. +func Test_getReadingsFromCache_Enabled_OutOfBounds(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } +} + +// Test getting readings from the cache when the cache is disabled. This +// should lead to the current readings (the data manager state) being used. +func Test_getReadingsFromCache_Disabled(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: false, + }, + }, + } +} + +// Test getting readings when the start time is not an RFC3339-formatted +// timestamp. If this is the case, the timestamp will be ignored. +func Test_getReadingsFromCache_invalidStart(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + // manually add to the readingsCache + ctx := &ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + } + ctxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set("2018-10-16T22:08:50.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:51.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:52.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:53.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:54.000000000Z", &ctxs, 0) + + c := make(chan *ReadContext, 10) + go getReadingsFromCache("Tues Oct 16 22:08:52 UTC 2018", "", c) + var results []*ReadContext + for r := range c { + results = append(results, r) + } + assert.Equal(t, 5, len(results)) +} + +// Test getting readings when the end time is not an RFC3339-formatted +// timestamp. If this is the case, the timestamp will be ignored. +func Test_getReadingsFromCache_invalidEnd(t *testing.T) { + defer func() { + // reset plugin state + resetContext() + Config.reset() + + // reset readings cache + readingsCache = nil + }() + + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: true, + }, + }, + } + setupReadingsCache() + + // manually add to the readingsCache + ctx := &ReadContext{ + Rack: "rack", + Board: "board", + Device: "device", + Reading: []*Reading{{Type: "test", Value: 1}}, + } + ctxs := cacheContexts([]*ReadContext{ctx}) + readingsCache.Set("2018-10-16T22:08:50.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:51.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:52.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:53.000000000Z", &ctxs, 0) + readingsCache.Set("2018-10-16T22:08:54.000000000Z", &ctxs, 0) + + c := make(chan *ReadContext, 10) + go getReadingsFromCache("", "Tues Oct 16 22:08:53 UTC 2018", c) + var results []*ReadContext + for r := range c { + results = append(results, r) + } + assert.Equal(t, 5, len(results)) +} diff --git a/sdk/data_manager.go b/sdk/data_manager.go index 34d1b4c2..724d63d1 100644 --- a/sdk/data_manager.go +++ b/sdk/data_manager.go @@ -497,11 +497,12 @@ func (manager *dataManager) goUpdateData() { ) // Read from the listen and read channel for incoming readings + var reading *ReadContext select { - case reading := <-manager.readChannel: + case reading = <-manager.readChannel: id = reading.ID() readings = reading.Reading - case reading := <-manager.listenChannel: + case reading = <-manager.listenChannel: id = reading.ID() readings = reading.Reading } @@ -510,6 +511,9 @@ func (manager *dataManager) goUpdateData() { manager.dataLock.Lock() manager.readings[id] = readings manager.dataLock.Unlock() + + // update the readings cache + addReadingToCache(reading) } }() } @@ -525,6 +529,22 @@ func (manager *dataManager) getReadings(device string) []*Reading { return manager.readings[device] } +// getAllReadings safely copies the current reading state in the data manager and +// returns all of the readings. +func (manager *dataManager) getAllReadings() map[string][]*Reading { + mapCopy := make(map[string][]*Reading) + manager.dataLock.RLock() + defer manager.dataLock.RUnlock() + + // Iterate over the map to make a copy - we want a copy or else we would be + // returning a reference to the underlying data which should only be accessed + // in a lock context. + for k, v := range manager.readings { + mapCopy[k] = v + } + return mapCopy +} + // Read fulfills a Read request by providing the latest data read from a device // and framing it up for the gRPC response. func (manager *dataManager) Read(req *synse.DeviceFilter) ([]*synse.Reading, error) { diff --git a/sdk/data_manager_test.go b/sdk/data_manager_test.go index dd6a37a4..679f84e2 100644 --- a/sdk/data_manager_test.go +++ b/sdk/data_manager_test.go @@ -1446,3 +1446,14 @@ func TestDataManager_parallelWriteMultiple(t *testing.T) { assert.Equal(t, "", ctx.transaction.message) } } + +// Test creating a new instance of a listener context. +func TestNewListenerCtx(t *testing.T) { + handler := &DeviceHandler{} + device := &Device{} + + ctx := NewListenerCtx(handler, device) + assert.Equal(t, handler, ctx.handler) + assert.Equal(t, device, ctx.device) + assert.Equal(t, 0, ctx.restarts) +} diff --git a/sdk/device.go b/sdk/device.go index 69a29396..1d6dd382 100644 --- a/sdk/device.go +++ b/sdk/device.go @@ -441,7 +441,7 @@ func updateDeviceMap(devices []*Device) { ctx.devices[d.GUID()] = d } if foundDuplicates { - log.Fatal("[sdk] unable to run plugin with duplicate device configurations") + log.Panic("[sdk] unable to run plugin with duplicate device configurations") } } diff --git a/sdk/device_test.go b/sdk/device_test.go index ebb343f5..f04bfbec 100644 --- a/sdk/device_test.go +++ b/sdk/device_test.go @@ -176,6 +176,41 @@ func TestDevice_GetOutput(t *testing.T) { assert.Equal(t, "foo", output.Name) } +// TestDevice_JSON_1 tests dumping an empty Device to a JSON string. +func TestDevice_JSON_1(t *testing.T) { + d := Device{} + out, err := d.JSON() + assert.NoError(t, err) + assert.Equal( + t, + `{"Kind":"","Metadata":null,"Plugin":"","Info":"","Location":null,"Data":null,"Outputs":null,"SortOrdinal":0}`, + out, + ) +} + +// TestDevice_JSON_2 tests dumping a Device to a JSON string. +func TestDevice_JSON_2(t *testing.T) { + d := Device{ + Kind: "foo", + Metadata: map[string]string{"test": "data"}, + Info: "info", + Handler: &DeviceHandler{}, + SortOrdinal: 1, + Location: &Location{ + Rack: "rack", + Board: "board", + }, + } + + out, err := d.JSON() + assert.NoError(t, err) + assert.Equal( + t, + `{"Kind":"foo","Metadata":{"test":"data"},"Plugin":"","Info":"info","Location":{"Rack":"rack","Board":"board"},"Data":null,"Outputs":null,"SortOrdinal":1}`, + out, + ) +} + // TestMakeDevices tests making a single device. func TestMakeDevices(t *testing.T) { defer resetContext() @@ -770,9 +805,39 @@ func Test_updateDeviceMap(t *testing.T) { assert.Equal(t, 0, len(ctx.devices)) updateDeviceMap([]*Device{device}) + //r := recover() + //assert.NotNil(t, r) assert.Equal(t, 1, len(ctx.devices)) } +// Test_updateDeviceMap2 tests updating the device map when a device +// with that id already exists. +func Test_updateDeviceMap2(t *testing.T) { + defer resetContext() + + // this will be run after we panic + defer func() { + r := recover() + assert.NotNil(t, r) + assert.Equal(t, 1, len(ctx.devices)) + }() + + device := &Device{ + Kind: "test", + Location: &Location{ + Rack: "rack", + Board: "board", + }, + } + // manually add the device to the device map + ctx.devices[device.GUID()] = device + assert.Equal(t, 1, len(ctx.devices)) + + // now try updating the map - this will be a duplicate since + // we already have this device in the map. + updateDeviceMap([]*Device{device}) +} + // Test_getInstanceOutputs tests getting instance output when none are defined. func Test_getInstanceOutputs(t *testing.T) { kind := &DeviceKind{} @@ -949,6 +1014,35 @@ func TestNewDeviceConfig(t *testing.T) { assert.Equal(t, 0, len(cfg.Devices)) } +// TestDeviceConfig_JSON_1 tests dumping an empty DeviceConfig to a JSON string. +func TestDeviceConfig_JSON_1(t *testing.T) { + d := DeviceConfig{} + out, err := d.JSON() + assert.NoError(t, err) + assert.Equal( + t, + `{"Version":"","Locations":null,"Devices":null}`, + out, + ) +} + +// TestDeviceConfig_JSON_2 tests dumping a DeviceConfig to a JSON string. +func TestDeviceConfig_JSON_2(t *testing.T) { + d := DeviceConfig{ + SchemeVersion: SchemeVersion{Version: "1.0"}, + Locations: []*LocationConfig{{Name: "test", Rack: &LocationData{Name: "test"}, Board: &LocationData{Name: "test"}}}, + Devices: []*DeviceKind{{Name: "test"}}, + } + + out, err := d.JSON() + assert.NoError(t, err) + assert.Equal( + t, + `{"Version":"1.0","Locations":[{"Name":"test","Rack":{"Name":"test","FromEnv":""},"Board":{"Name":"test","FromEnv":""}}],"Devices":[{"Name":"test","Metadata":null,"Instances":null,"Outputs":null,"HandlerName":""}]}`, + out, + ) +} + // TestDeviceConfig_GetLocation_Ok tests getting locations from a DeviceConfig successfully. func TestDeviceConfig_GetLocation_Ok(t *testing.T) { var testTable = []struct { diff --git a/sdk/plugin.go b/sdk/plugin.go index 0bd5a68d..f5326288 100644 --- a/sdk/plugin.go +++ b/sdk/plugin.go @@ -231,6 +231,9 @@ func (plugin *Plugin) setup() error { } setupTransactionCache(ttl) + // Set up the readings cache, if its configured + setupReadingsCache() + // Initialize a gRPC server for the Plugin to use. plugin.server = newServer( Config.Plugin.Network.Type, @@ -383,6 +386,10 @@ type PluginSettings struct { // Transaction contains the settings to configure transaction // handling behavior. Transaction *TransactionSettings `default:"{}" yaml:"transaction,omitempty" addedIn:"1.0"` + + // Cache contains the settings to configure local data caching + // by the plugin. + Cache *CacheSettings `default:"{}" yaml:"cache,omitempty" addedIn:"1.2"` } // Validate validates that the PluginSettings has no configuration errors. @@ -674,3 +681,22 @@ type HealthSettings struct { func (settings HealthSettings) Validate(multiErr *errors.MultiError) { // Nothing to validate } + +// CacheSettings provides configuration options for an in-memory windowed +// cache for plugin readings. +type CacheSettings struct { + // Enabled sets whether the plugin will use a local + // in-memory cache to store a small window of readings. + // By default, the cache is not enabled. + Enabled bool `default:"false" yaml:"enabled,omitempty" addedIn:"1.2"` + + // TTL is the time-to-live for a reading in the readings cache. + // This will only be used if the cache is enabled. Once a reading + // has exceeded its TTL, it will be removed from the cache. + TTL time.Duration `default:"3m" yaml:"ttl,omitempty" addedIn:"1.2"` +} + +// Validate validates that the CacheSettings has no configuration errors. +func (settings CacheSettings) Validate(multiErr *errors.MultiError) { + // Nothing to validate +} diff --git a/sdk/plugin_test.go b/sdk/plugin_test.go index fac326d0..ddc547da 100644 --- a/sdk/plugin_test.go +++ b/sdk/plugin_test.go @@ -737,3 +737,97 @@ func TestHealthSettings_Validate(t *testing.T) { config.Validate(merr) assert.NoError(t, merr.Err()) } + +// Test validating the ListenSettings successfully. +func TestListenSettings_Validate_Ok(t *testing.T) { + var testTable = []struct { + desc string + config ListenSettings + }{ + { + desc: "listen enabled, small buffer", + config: ListenSettings{ + Enabled: true, + Buffer: 1, + }, + }, + { + desc: "listen enabled, larger buffer", + config: ListenSettings{ + Enabled: true, + Buffer: 100, + }, + }, + { + desc: "listen disabled, small buffer", + config: ListenSettings{ + Enabled: false, + Buffer: 1, + }, + }, + { + desc: "listen disabled, larger buffer", + config: ListenSettings{ + Enabled: false, + Buffer: 100, + }, + }, + } + + for _, testCase := range testTable { + merr := errors.NewMultiError("test") + + testCase.config.Validate(merr) + assert.NoError(t, merr.Err(), testCase.desc) + } +} + +// Test validating the ListenSettings unsuccessfully. +func TestListenSettings_Validate_Error(t *testing.T) { + var testTable = []struct { + desc string + errCount int + config ListenSettings + }{ + { + desc: "listen enabled, zero buffer", + errCount: 1, + config: ListenSettings{ + Enabled: true, + Buffer: 0, + }, + }, + { + desc: "listen enabled, negative buffer", + errCount: 1, + config: ListenSettings{ + Enabled: true, + Buffer: -1, + }, + }, + { + desc: "listen disabled, zero buffer", + errCount: 1, + config: ListenSettings{ + Enabled: false, + Buffer: 0, + }, + }, + { + desc: "listen disabled, negative buffer", + errCount: 1, + config: ListenSettings{ + Enabled: false, + Buffer: -1, + }, + }, + } + + for _, testCase := range testTable { + merr := errors.NewMultiError("test") + + testCase.config.Validate(merr) + assert.Error(t, merr.Err(), testCase.desc) + assert.Equal(t, testCase.errCount, len(merr.Errors), merr.Error()) + } +} diff --git a/sdk/server.go b/sdk/server.go index 26ad737e..88c089b1 100644 --- a/sdk/server.go +++ b/sdk/server.go @@ -364,6 +364,29 @@ func (server *server) Read(request *synse.DeviceFilter, stream synse.Plugin_Read return nil } +// ReadCached is the handler for the Synse GRPC Plugin service's `ReadCached` RPC method. +func (server *server) ReadCached(bounds *synse.Bounds, stream synse.Plugin_ReadCachedServer) error { + log.WithField("bounds", bounds).Debugf("[grpc] read cached rpc request") + + // create a channel that will be used to collect the cached readings + readings := make(chan *ReadContext, 128) + go getReadingsFromCache(bounds.Start, bounds.End, readings) + for r := range readings { + for _, data := range r.Reading { + deviceReading := &synse.DeviceReading{ + Rack: r.Rack, + Board: r.Board, + Device: r.Device, + Reading: data.encode(), + } + if err := stream.Send(deviceReading); err != nil { + return err + } + } + } + return nil +} + // Write is the handler for the Synse GRPC Plugin service's `Write` RPC method. func (server *server) Write(ctx context.Context, request *synse.WriteInfo) (*synse.Transactions, error) { log.WithField("request", request).Debug("[grpc] write rpc request") diff --git a/sdk/server_test.go b/sdk/server_test.go index e59bbe29..c50503d7 100644 --- a/sdk/server_test.go +++ b/sdk/server_test.go @@ -759,6 +759,179 @@ func TestServer_Read4(t *testing.T) { assert.Error(t, err) } +// Test the ReadCached method of the gRPC plugin service. +func TestServer_ReadCached1(t *testing.T) { + defer func() { + DataManager = newDataManager() + resetContext() + Config.reset() + }() + + // Disable the listener -- this will mean we get readings from + // the DataManager, which we will manually add readings to next. + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: false, + }, + }, + } + ctx.devices["rack-board-device"] = &Device{ + id: "device", + Kind: "foo", + Location: &Location{ + Rack: "rack", + Board: "board", + }, + Outputs: []*Output{ + {OutputType: OutputType{Name: "output1"}}, + {OutputType: OutputType{Name: "output2"}}, + }, + Handler: &DeviceHandler{ + Read: func(device *Device) ([]*Reading, error) { + return nil, nil + }, + }, + } + DataManager.readings["rack-board-device"] = []*Reading{ + { + Timestamp: "2018-10-17T13:19:44.326431979Z", + Type: "temperature", + Value: 3, + }, + { + Timestamp: "2018-10-17T13:19:44.338671923Z", + Type: "humidity", + Value: 5, + }, + } + + s := server{} + bounds := &synse.Bounds{} + mock := test.NewMockReadCachedStream() + err := s.ReadCached(bounds, mock) + + assert.NoError(t, err) + assert.Equal(t, 2, len(mock.Results)) +} + +// Test the ReadCached method of the gRPC plugin service. In this test +// case, we have the cache disabled (pulling current readings) and specify +// bounds. When the cache is disabled, the bounds should be ignored, so +// we expect to get readings back, even though they are out of bounds here. +func TestServer_ReadCached2(t *testing.T) { + defer func() { + DataManager = newDataManager() + resetContext() + Config.reset() + }() + + // Disable the listener -- this will mean we get readings from + // the DataManager, which we will manually add readings to next. + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: false, + }, + }, + } + ctx.devices["rack-board-device"] = &Device{ + id: "device", + Kind: "foo", + Location: &Location{ + Rack: "rack", + Board: "board", + }, + Outputs: []*Output{ + {OutputType: OutputType{Name: "output1"}}, + {OutputType: OutputType{Name: "output2"}}, + }, + Handler: &DeviceHandler{ + Read: func(device *Device) ([]*Reading, error) { + return nil, nil + }, + }, + } + DataManager.readings["rack-board-device"] = []*Reading{ + { + Timestamp: "2018-10-17T13:19:44.326431979Z", + Type: "temperature", + Value: 3, + }, + { + Timestamp: "2018-10-17T13:19:44.338671923Z", + Type: "humidity", + Value: 5, + }, + } + + s := server{} + bounds := &synse.Bounds{ + End: "2018-10-17T13:19:40.000000000Z", + } + mock := test.NewMockReadCachedStream() + err := s.ReadCached(bounds, mock) + + assert.NoError(t, err) + assert.Equal(t, 2, len(mock.Results)) +} + +// Test the ReadCached method of the gRPC plugin service when the stream +// returns an error. +func TestServer_ReadCached3(t *testing.T) { + defer func() { + DataManager = newDataManager() + resetContext() + Config.reset() + }() + + // Disable the listener -- this will mean we get readings from + // the DataManager, which we will manually add readings to next. + Config.Plugin = &PluginConfig{ + Settings: &PluginSettings{ + Cache: &CacheSettings{ + Enabled: false, + }, + }, + } + ctx.devices["rack-board-device"] = &Device{ + id: "device", + Kind: "foo", + Location: &Location{ + Rack: "rack", + Board: "board", + }, + Outputs: []*Output{ + {OutputType: OutputType{Name: "output1"}}, + {OutputType: OutputType{Name: "output2"}}, + }, + Handler: &DeviceHandler{ + Read: func(device *Device) ([]*Reading, error) { + return nil, nil + }, + }, + } + DataManager.readings["rack-board-device"] = []*Reading{ + { + Timestamp: "2018-10-17T13:19:44.326431979Z", + Type: "temperature", + Value: 3, + }, + { + Timestamp: "2018-10-17T13:19:44.338671923Z", + Type: "humidity", + Value: 5, + }, + } + + s := server{} + bounds := &synse.Bounds{} + mock := &test.MockReadCachedStreamErr{} + err := s.ReadCached(bounds, mock) + + assert.Error(t, err) +} + // TestServer_Write tests the Write method of the gRPC plugin service when // the specified device isn't found. func TestServer_Write(t *testing.T) { diff --git a/sdk/type_test.go b/sdk/type_test.go index 9a6ba8d5..8a939f01 100644 --- a/sdk/type_test.go +++ b/sdk/type_test.go @@ -543,3 +543,41 @@ func TestNilOutput(t *testing.T) { t.Error("nil OutputType should fail") } } + +// Test dumping an OutputType to a JSON string. +func TestOutputType_JSON(t *testing.T) { + var testTable = []struct { + output OutputType + expected string + }{ + { + output: OutputType{}, + expected: `{"Version":"","Name":"","Precision":0,"Unit":{"Name":"","Symbol":""},"ScalingFactor":""}`, + }, + { + output: OutputType{ + Name: "foo", + Precision: 2, + }, + expected: `{"Version":"","Name":"foo","Precision":2,"Unit":{"Name":"","Symbol":""},"ScalingFactor":""}`, + }, + { + output: OutputType{ + Name: "test", + Precision: 4, + Unit: Unit{ + Name: "unit", + Symbol: "u", + }, + ScalingFactor: "1e6", + }, + expected: `{"Version":"","Name":"test","Precision":4,"Unit":{"Name":"unit","Symbol":"u"},"ScalingFactor":"1e6"}`, + }, + } + + for _, testCase := range testTable { + actual, err := testCase.output.JSON() + assert.NoError(t, err) + assert.Equal(t, testCase.expected, actual) + } +} diff --git a/sdk/utils.go b/sdk/utils.go index f5ddeb85..08c3c6dc 100644 --- a/sdk/utils.go +++ b/sdk/utils.go @@ -21,6 +21,22 @@ func GetCurrentTime() string { return time.Now().UTC().Format(time.RFC3339Nano) } +// ParseRFC3339Nano parses a timestamp string in RFC3339Nano format into a Time struct. +// If it is given an empty string, it will return the zero-value for a Time +// instance. You can check if it is a zero time with the Time's `IsZero` method. +func ParseRFC3339Nano(timestamp string) (t time.Time, err error) { + if timestamp == "" { + return + } + t, err = time.Parse(time.RFC3339Nano, timestamp) + if err != nil { + log.WithField( + "timestamp", timestamp, + ).Error("[sdk] failed to parse timestamp from RFC3339Nano format") + } + return +} + // GetTypeByName gets the output type with the given name from the collection of // output types registered with the SDK for the plugin. If an output type with the // given name does not exist, an error is returned. diff --git a/sdk/utils_test.go b/sdk/utils_test.go index db94ec7d..f944d034 100644 --- a/sdk/utils_test.go +++ b/sdk/utils_test.go @@ -5,6 +5,8 @@ import ( "sort" "testing" + "time" + "github.com/stretchr/testify/assert" ) @@ -241,6 +243,82 @@ func TestFilterDevicesErr(t *testing.T) { } } +// TestParseRFC3339Nano_Ok tests successfully parsing an RFC3339 timestamp +// into a Time struct. +func TestParseRFC3339Nano_Ok(t *testing.T) { + // get the EST location + est, err := time.LoadLocation("EST") + assert.NoError(t, err) + + var tests = []struct { + timestamp string + expected time.Time + }{ + { + // no timestamp defaults to zero value for time + timestamp: "", + expected: time.Time{}, + }, + { + // rfc3339 utc + timestamp: "2018-10-16T18:22:50Z", + expected: time.Date(2018, 10, 16, 18, 22, 50, 0, time.UTC), + }, + { + // rfc3339nano utc + timestamp: "2018-10-16T18:22:50.573971054Z", + expected: time.Date(2018, 10, 16, 18, 22, 50, 573971054, time.UTC), + }, + { + // rcf3339 est + timestamp: "2018-10-16T13:25:00-05:00", + expected: time.Date(2018, 10, 16, 13, 25, 0, 0, est), + }, + { + // rfc3339nano est + timestamp: "2018-10-16T13:25:00.410241272-05:00", + expected: time.Date(2018, 10, 16, 13, 25, 0, 410241272, est), + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + actual, err := ParseRFC3339Nano(tt.timestamp) + assert.NoError(t, err) + assert.True(t, tt.expected.Equal(actual)) + }) + } +} + +// TestParseRFC3339Nano_Error tests unsuccessfully parsing a timestamp into +// a Time struct +func TestParseRFC3339Nano_Error(t *testing.T) { + var tests = []string{ + "foobar", + "...", + "16 Oct 18 18:22 UTC", + "16 Oct 18 13:25 EST", + "Tue Oct 16 18:22:50 2018", + "Tue Oct 16 13:25:00 2018", + "6:22PM", + "1:25PM", + "Tue, 16 Oct 2018 18:22:50 +0000", + "Tue, 16 Oct 2018 13:25:00 -0500", + "Oct 16 18:22:50.573997", + "Oct 16 13:25:00.410271", + "Tue Oct 16 18:22:50 UTC 2018", + "Tue Oct 16 13:25:00 EST 2018", + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + actual, err := ParseRFC3339Nano(tt) + assert.Error(t, err) + assert.Empty(t, actual) + }) + } +} + // TestGetCurrentTime tests getting the current time. func TestGetCurrentTime(t *testing.T) { // TODO: figure out how to test the response... From a32d2e7c371f273bad385903d7f9a3ec6aca82a7 Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Tue, 23 Oct 2018 09:23:30 -0400 Subject: [PATCH 4/5] update deps, update lint target --- Gopkg.lock | 22 +++++++++++----------- Gopkg.toml | 8 ++++---- Makefile | 6 ++++-- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 3e8de049..355f13b5 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -26,7 +26,7 @@ version = "v1.1.1" [[projects]] - digest = "1:ae0036fa14b41c52607c53fa072ef5045ec5f9013ed569e6a5fa19dd5e2ac89a" + digest = "1:4c0989ca0bcd10799064318923b9bc2db6b4d6338dd75f3f2d86c3511aaaf5cf" name = "github.com/golang/protobuf" packages = [ "proto", @@ -80,12 +80,12 @@ version = "v1.2.2" [[projects]] - branch = "cached-readings" digest = "1:a95be5a656edf57b48a06335bbb627fc2eb5f37890df1183bbaab9443c43cae1" name = "github.com/vapor-ware/synse-server-grpc" packages = ["go"] pruneopts = "UT" - revision = "b080d990ed2c4a7b52d6b1510b76fd2e2c911d4f" + revision = "3bc0b24b2bfbc8df1d67f02607d13380318d68ee" + version = "1.1.0" [[projects]] branch = "master" @@ -93,11 +93,11 @@ name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "UT" - revision = "a92615f3c49003920a58dedcf32cf55022cefb8d" + revision = "0c41d7ab0a0ee717d4590a44bcb987dfd9e183eb" [[projects]] branch = "master" - digest = "1:5dbea1a55ed20bd18a9584b8d50a431ceedc4c48a5872fceb429d59a4c30b3f9" + digest = "1:505dbee0833715a72a529bb57c354826ad42a4496fad787fa143699b4de1a6d0" name = "golang.org/x/net" packages = [ "context", @@ -109,21 +109,21 @@ "trace", ] pruneopts = "UT" - revision = "49bb7cea24b1df9410e1712aa6433dae904ff66a" + revision = "04a2e542c03f1d053ab3e4d6e5abcd4b66e2be8e" [[projects]] branch = "master" - digest = "1:306e2db17d22ee51108d97f0e053755543b4d504d5b699e8aea295cbc4514306" + digest = "1:eb2ed765d17a2cedfc307cf8beda6f0857ad378431a4d8f2726c419f3d7105de" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "UT" - revision = "fa43e7bc11baaae89f3f902b2b4d832b68234844" + revision = "8a28ead16f52c8aaeffbf79239b251dfdf6c4f96" [[projects]] - digest = "1:7509ba4347d1f8de6ae9be8818b0cd1abc3deeffe28aeaf4be6d4b6b5178d9ca" + digest = "1:a2ab62866c75542dd18d2b069fec854577a20211d7c0ea6ae746072a1dccdd18" name = "golang.org/x/text" packages = [ "collate", @@ -159,10 +159,10 @@ name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] pruneopts = "UT" - revision = "af9cb2a35e7f169ec875002c1829c9b315cddc04" + revision = "94acd270e44e65579b9ee3cdab25034d33fed608" [[projects]] - digest = "1:23f6471aaea2518fc2f7be25dc12de5980aeab1220c765e657e5c131f550df1c" + digest = "1:ab8e92d746fb5c4c18846b0879842ac8e53b3d352449423d0924a11f1020ae1b" name = "google.golang.org/grpc" packages = [ ".", diff --git a/Gopkg.toml b/Gopkg.toml index 2baddad8..379065cf 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -27,7 +27,7 @@ [[constraint]] name = "github.com/Sirupsen/logrus" - version = "1.0.5" + version = "1.1.1" [[constraint]] name = "github.com/creasty/defaults" @@ -39,7 +39,7 @@ [[constraint]] name = "github.com/rs/xid" - version = "1.2.0" + version = "1.2.1" [[constraint]] name = "github.com/stretchr/testify" @@ -47,7 +47,7 @@ [[constraint]] name = "github.com/vapor-ware/synse-server-grpc" - branch = "cached-readings" + version = "1.1.0" [[constraint]] branch = "master" @@ -59,7 +59,7 @@ [[constraint]] name = "google.golang.org/grpc" - version = "1.12.2" + version = "1.15.0" [[constraint]] name = "gopkg.in/yaml.v2" diff --git a/Makefile b/Makefile index bad324a0..384cef39 100644 --- a/Makefile +++ b/Makefile @@ -84,13 +84,15 @@ ifndef HAS_LINT gometalinter --install endif @ # disable gotype: https://github.com/alecthomas/gometalinter/issues/40 - gometalinter ./... \ + gometalinter \ --disable=gotype --disable=interfacer \ --tests \ --vendor \ --sort=path --sort=line \ --aggregate \ - --deadline=5m + --deadline=5m \ + -e $$(go env GOROOT) \ + ./... .PHONY: setup setup: ## Install the build and development dependencies From 1fc46d4d7a391afc4f5f2e2013a788fe92388cda Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Tue, 23 Oct 2018 09:30:26 -0400 Subject: [PATCH 5/5] bump version to 1.2.0 --- sdk/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/version.go b/sdk/version.go index fdd218da..ecccb984 100644 --- a/sdk/version.go +++ b/sdk/version.go @@ -10,7 +10,7 @@ import ( ) // Version specifies the version of the Synse Plugin SDK. -const Version = "1.1.1" +const Version = "1.2.0" // version is a reference to a binVersion that is used by the SDK to get // the version info for a plugin.