forked from hashicorp/go-eventlogger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
80 lines (66 loc) · 1.92 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package eventlogger
import (
"context"
"fmt"
)
// NodeType defines the possible Node type's in the system.
type NodeType int
const (
_ NodeType = iota
NodeTypeFilter
NodeTypeFormatter
NodeTypeSink
NodeTypeFormatterFilter // A node that formats and then filters the events based on the new format.
)
// A Node in a graph
type Node interface {
// Process does something with the Event: filter, redaction,
// marshalling, persisting.
Process(ctx context.Context, e *Event) (*Event, error)
// Reopen is used to re-read any config stored externally
// and to close and reopen files, e.g. for log rotation.
Reopen() error
// Type describes the type of the node. This is mostly just used to
// validate that pipelines are sensibly arranged, e.g. ending with a sink.
Type() NodeType
}
type linkedNode struct {
node Node
nodeID NodeID
next []*linkedNode
}
// linkNodes is a convenience function that connects Nodes together into a
// linked list.
func linkNodes(nodes []Node, ids []NodeID) (*linkedNode, error) {
if len(nodes) == 0 {
return nil, fmt.Errorf("no nodes given")
}
root := &linkedNode{node: nodes[0]}
cur := root
for _, n := range nodes[1:] {
next := &linkedNode{node: n}
cur.next = []*linkedNode{next}
cur = next
}
return root, nil
}
// linkNodesAndSinks is a convenience function that connects
// the inner Nodes together into a linked list. Then it appends the sinks
// to the end as a set of fan-out leaves.
func linkNodesAndSinks(inner, sinks []Node, nodeIDs, sinkIDs []NodeID) (*linkedNode, error) {
root, err := linkNodes(inner, nodeIDs)
if err != nil {
return nil, err
}
// This is inefficient but since it's only used in setup we don't care:
cur := root
for cur.next != nil {
cur = cur.next[0]
}
for i, s := range sinks {
cur.next = append(cur.next, &linkedNode{node: s, nodeID: sinkIDs[i]})
}
return root, nil
}