Skip to content

Commit

Permalink
chore: disable field concatenation for gnmi (#1078)
Browse files Browse the repository at this point in the history
* chore: disable field concatenation for gnmi

* chore: interface example for gnmi
  • Loading branch information
kongfei605 authored Oct 22, 2024
1 parent 185474f commit 164094c
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 11 deletions.
200 changes: 200 additions & 0 deletions conf/input.gnmi/interface.toml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#interval="5s"
# gNMI telemetry input plugin
# [[instances]]
## Address and port of the gNMI GRPC server
# addresses = []

## define credentials
username = "admin"
password = "admin"

## gNMI encoding requested (one of: "proto", "json", "json_ietf", "bytes")
encoding = "json"

## redial in case of failures after
redial = "10s"

## gRPC Maximum Message Size
max_msg_size = 4194304

## Enable to get the canonical path as field-name
# canonical_field_names = false

## Remove leading slashes and dots in field-name
# trim_field_names = false

## Guess the path-tag if an update does not contain a prefix-path
## If enabled, the common-path of all elements in the update is used.
# guess_path_tag = false

## enable client-side TLS and define CA to authenticate the device
# enable_tls = true
# tls_ca = "/etc/categraf/ca.pem"
## Minimal TLS version to accept by the client
# tls_min_version = "TLS12"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true

## define client-side TLS certificate & key to authenticate to the device
# tls_cert = "/etc/categraf/cert.pem"
# tls_key = "/etc/categraf/key.pem"

## gNMI subscription prefix (optional, can usually be left empty)
## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths
# origin = ""
# prefix = ""
# target = ""

## Vendor specific options
## This defines what vendor specific options to load.
## * Juniper Header Extension (juniper_header): some sensors are directly managed by
## Linecard, which adds the Juniper GNMI Header Extension. Enabling this
## allows the decoding of the Extension header if present. Currently this knob
## adds component, component_id & sub_component_id as additionnal tags
# vendor_specific = []

## Define additional aliases to map encoding paths to measurement names
#[instances.aliases]
# incomming = "openconfig:/interfaces/interface/state/counters/in-octets"
# incomming_ucastpkgts = "openconfig:/interfaces/interface/state/counters/in-unicast-pkts"
# incomming_errors = "openconfig:/interfaces/interface/state/counters/in-errors"
# outgoing = "openconfig:/interfaces/interface/state/counters/out-octets"
# outgoing_ucastpkts = "openconfig:/interfaces/interface/state/counters/out-unicast-pkts"
# outgoing_errors = "openconfig:/interfaces/interface/state/counters/out-errors"
# ifname = "openconfig:/interfaces/interface/state/name"
# ifalias = "openconfig:/interfaces/interface/config/description"
# type = "openconfig:/interfaces/interface/state/type"
# ostatus = "openconfig:/interfaces/interface/state/oper-status"
# speed = "openconfig:/interfaces/interface/ethernet/state/port-speed"

[[instances.subscription]]
name = "gnmi_interface_incomming"
origin = "openconfig"
path = "/interfaces/interface/state/counters/in-octets"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_incoming_errors"
origin = "openconfig"
path = "/interfaces/interface/state/counters/in-errors"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_incomming_ucastpkts"
origin = "openconfig"
path = "/interfaces/interface/state/counters/in-unicast-pkts"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_outgoing"
origin = "openconfig"
path = "/interfaces/interface/state/counters/out-octets"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_outgoing_errors"
origin = "openconfig"
path = "/interfaces/interface/state/counters/out-errors"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_outgoing_ucastpkts"
origin = "openconfig"
path = "/interfaces/interface/state/counters/out-unicast-pkts"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_admin_status"
origin = "openconfig"
path = "/interfaces/interface/state/admin-status"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_ostatus"
origin = "openconfig"
path = "/interfaces/interface/state/oper-status"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true
[[instances.subscription]]
name = "gnmi_interface_speed"
origin = "openconfig"
path = "/interfaces/interface/ethernet/state/port-speed"
subscription_mode = "sample"
sample_interval = "15s"
disable_concatenation = true


#[[instances.subscription]]
## Name of the measurement that will be emitted
#name = "gnmi"

## Origin and path of the subscription
## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths
##
## origin usually refers to a (YANG) data model implemented by the device
## and path to a specific substructure inside it that should be subscribed
## to (similar to an XPath). YANG models can be found e.g. here:
## https://github.com/YangModels/yang/tree/master/vendor/cisco/xr
#origin = "openconfig"
#path = "/interfaces/interface/state/counters" #/in-octets"

## Subscription mode ("target_defined", "sample", "on_change") and interval
#subscription_mode = "sample"
#sample_interval = "15s"

## Suppress redundant transmissions when measured values are unchanged
#suppress_redundant = true

## If suppression is enabled, send updates at least every X seconds anyway
#heartbeat_interval = "15s"
[[instances.tag_subscription]]
name = "ifname"
origin = ""
path = "/interfaces/interface/state/name"
subscription_mode = "on_change"
match = "name"
#elements = ["name"]
disable_concatenation = true

[[instances.tag_subscription]]
name = "type"
origin = ""
path = "/interfaces/interface/state/type"
subscription_mode = "on_change"
match = "name"
#elements = ["interface"]
disable_concatenation = true

[[instances.tag_subscription]]
name = "ifalias"
origin = ""
path = "/interfaces/interface/state/description"
subscription_mode = "on_change"
match = "elements"
elements = ["interface"]
disable_concatenation = true


[[instances.processor_enum]]
metrics = ["*status"]
[instances.processor_enum.value_mappings]
UP = 1
DOWN = 0
NOT_PRESENT = -1

[[instances.processor_enum]]
metrics = ["*speed"]
[instances.processor_enum.value_mappings]
"openconfig-if-ethernet:SPEED_25GB" = 25000000000
"openconfig-if-ethernet:SPEED_50GB" = 50000000000
"openconfig-if-ethernet:SPEED_100GB" = 100000000000
"openconfig-if-ethernet:SPEED_200GB" = 200000000000
"openconfig-if-ethernet:SPEED_UNKNOWN" = -1
9 changes: 9 additions & 0 deletions inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type (
cancel context.CancelFunc
wg sync.WaitGroup
slist *types.SampleList

AgentHostTag string `toml:"agent_host_tag"`
}
)

Expand All @@ -91,6 +93,8 @@ type Subscription struct {

// Mark this subscription as a tag-only lookup source, not emitting any metric
TagOnly bool `toml:"tag_only" deprecated:"1.25.0;2.0.0;please use 'tag_subscription's instead"`

DisableConcatenation bool `toml:"disable_concatenation"`
}

// Tag Subscription for a gNMI client
Expand All @@ -107,6 +111,9 @@ func (c *Instance) Init() error {
if len(c.Encoding) == 0 {
c.Encoding = "proto"
}
if len(c.AgentHostTag) == 0 {
c.AgentHostTag = "agent_host"
}
c.slist = types.NewSampleList()
// Check options
if time.Duration(c.Redial) <= 0 {
Expand Down Expand Up @@ -228,13 +235,15 @@ func (c *Instance) Start() error {
address: addr,
aliases: c.internalAliases,
tagsubs: c.TagSubscriptions,
subs: c.Subscriptions,
maxMsgSize: int(c.MaxMsgSize),
vendorExt: c.VendorSpecific,
tagStore: newTagStore(c.TagSubscriptions),
trace: c.Trace,
canonicalFieldNames: c.CanonicalFieldNames,
trimSlash: c.TrimFieldNames,
guessPathTag: c.GuessPathTag,
sourceTag: c.AgentHostTag,
}
for ctx.Err() == nil {
if err := h.subscribeGNMI(ctx, c.slist, tlscfg, request); err != nil && ctx.Err() == nil {
Expand Down
30 changes: 22 additions & 8 deletions inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
jnprHeader "flashcat.cloud/categraf/inputs/gnmi/extensions/jnpr_gnmi_extention"
"flashcat.cloud/categraf/pkg/choice"
"flashcat.cloud/categraf/types"
"flashcat.cloud/categraf/types/metric"
)

const eidJuniperTelemetryHeader = 1
Expand All @@ -34,6 +33,7 @@ type handler struct {
address string
aliases map[*pathInfo]string
tagsubs []TagSubscription
subs []Subscription
maxMsgSize int
emptyNameWarnShown bool
vendorExt []string
Expand All @@ -42,6 +42,8 @@ type handler struct {
canonicalFieldNames bool
trimSlash bool
guessPathTag bool

sourceTag string
}

// SubscribeGNMI and extract telemetry data
Expand Down Expand Up @@ -118,7 +120,6 @@ func (h *handler) subscribeGNMI(ctx context.Context, slist *types.SampleList, tl

// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (h *handler) handleSubscribeResponseUpdate(slist *types.SampleList, response *gnmiLib.SubscribeResponse_Update, extension []*gnmiExt.Extension) {
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)

// Extract tags from potential extension in the update notification
Expand Down Expand Up @@ -154,7 +155,7 @@ func (h *handler) handleSubscribeResponseUpdate(slist *types.SampleList, respons
prefix := newInfoFromPath(response.Update.Prefix)

// Add info to the tags
headerTags["source"], _, _ = net.SplitHostPort(h.address)
headerTags[h.sourceTag], _, _ = net.SplitHostPort(h.address)
if !prefix.empty() {
headerTags["path"] = prefix.String()
}
Expand Down Expand Up @@ -256,13 +257,26 @@ func (h *handler) handleSubscribeResponseUpdate(slist *types.SampleList, respons
log.Printf("E! Invalid empty path %q with alias %q", fieldPath, aliasPath)
continue
}
grouper.Add(name, tags, timestamp, key, field.value)
}
prefix := inputName
if strings.HasPrefix(name, inputName) {
prefix = ""
}
disableConcatenating := false
for _, sub := range h.subs {
if strings.HasSuffix(field.path.String(), sub.Path) {
// field.path => origin:path
// sub.path => path
disableConcatenating = sub.DisableConcatenation
}
}
if !disableConcatenating {
name = name + "_" + key
}

// Add grouped measurements
for _, metric := range grouper.Metrics() {
slist.PushSamples(metric.Name(), metric.Fields(), metric.Tags())
sample := types.NewSample(prefix, name, field.value, tags).SetTime(timestamp)
slist.PushFront(sample)
}

}

// Try to find the alias for the given path
Expand Down
12 changes: 9 additions & 3 deletions inputs/gnmi/tag_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [
if len(f.path.segments) > 0 {
key := f.path.segments[len(f.path.segments)-1]
key = strings.ReplaceAll(key, "-", "_")
tagName += "/" + key
if !subscription.DisableConcatenation {
tagName += "/" + key
}
}
sv, err := ToString(f.value)
if err != nil {
Expand Down Expand Up @@ -75,7 +77,9 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [
if len(f.path.segments) > 0 {
key := f.path.segments[len(f.path.segments)-1]
key = strings.ReplaceAll(key, "-", "_")
tagName += "/" + key
if !subscription.DisableConcatenation {
tagName += "/" + key
}
}
sv, err := ToString(f.value)
if err != nil {
Expand Down Expand Up @@ -104,7 +108,9 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [
if len(f.path.segments) > 0 {
key := f.path.segments[len(f.path.segments)-1]
key = strings.ReplaceAll(key, "-", "_")
tagName += "/" + key
if !subscription.DisableConcatenation {
tagName += "/" + key
}
}
sv, err := ToString(f.value)
if err != nil {
Expand Down

0 comments on commit 164094c

Please sign in to comment.