forked from rcrowley/go-metrics
-
Notifications
You must be signed in to change notification settings - Fork 1
/
meter.go
108 lines (95 loc) · 2.51 KB
/
meter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package metrics
import "time"
// Meters count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate.
//
// This is an interface so as to encourage other structs to implement
// the Meter API as appropriate.
type Meter interface {
Count() int64
Mark(int64)
Rate1() float64
Rate5() float64
Rate15() float64
RateMean() float64
}
// The standard implementation of a Meter uses a goroutine to synchronize
// its calculations and another goroutine (via time.Ticker) to produce
// clock ticks.
type StandardMeter struct {
in chan int64
out chan meterV
ticker *time.Ticker
}
// Force the compiler to check that StandardMeter implements Meter.
var _ Meter = &StandardMeter{}
// A meterV contains all the values that would need to be passed back
// from the synchronizing goroutine.
type meterV struct {
count int64
rate1, rate5, rate15, rateMean float64
}
// Create a new meter. Create the communication channels and start the
// synchronizing goroutine.
func NewMeter() *StandardMeter {
m := &StandardMeter{
make(chan int64),
make(chan meterV),
time.NewTicker(5e9),
}
go m.arbiter()
return m
}
// Return the count of events seen.
func (m *StandardMeter) Count() int64 {
return (<-m.out).count
}
// Mark the occurance of n events.
func (m *StandardMeter) Mark(n int64) {
m.in <- n
}
// Return the meter's one-minute moving average rate of events.
func (m *StandardMeter) Rate1() float64 {
return (<-m.out).rate1
}
// Return the meter's five-minute moving average rate of events.
func (m *StandardMeter) Rate5() float64 {
return (<-m.out).rate5
}
// Return the meter's fifteen-minute moving average rate of events.
func (m *StandardMeter) Rate15() float64 {
return (<-m.out).rate15
}
// Return the meter's mean rate of events.
func (m *StandardMeter) RateMean() float64 {
return (<-m.out).rateMean
}
// Receive inputs and send outputs. Count each input and update the various
// moving averages and the mean rate of events. Send a copy of the meterV
// as output.
func (m *StandardMeter) arbiter() {
var mv meterV
a1 := NewEWMA1()
a5 := NewEWMA5()
a15 := NewEWMA15()
tsStart := time.Now()
for {
select {
case n := <-m.in:
mv.count += n
a1.Update(n)
mv.rate1 = a1.Rate()
a5.Update(n)
mv.rate5 = a5.Rate()
a15.Update(n)
mv.rate15 = a15.Rate()
mv.rateMean = float64(1e9*mv.count) / float64(
time.Now().Sub(tsStart))
case m.out <- mv:
case <-m.ticker.C:
a1.Tick()
a5.Tick()
a15.Tick()
}
}
}