Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make backend responsible for reporting its own metrics #819

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ type Backend interface {
// Status returns a string describing the current status, this can detail queue sizes or other attributes
Status() string

// Heartbeat is called every minute, it can be used by backends to log status to a dashboard such as librato
Heartbeat() error

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool
}
Expand Down
25 changes: 24 additions & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,33 @@
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "events"), b.flushChannelEventFile)

b.startMetricsReporter(time.Minute)

slog.Info("backend started", "comp", "backend", "state", "started")
return nil
}

func (b *backend) startMetricsReporter(interval time.Duration) {
b.waitGroup.Add(1)

go func() {
defer func() {
slog.Info("metrics reporter exiting")
b.waitGroup.Done()
}()

for {
select {
case <-b.stopChan:
b.reportMetrics()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@norkans7 this should ensure any last metrics are sent

return
case <-time.After(interval):
b.reportMetrics()

Check warning on line 261 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L260-L261

Added lines #L260 - L261 were not covered by tests
}
}
}()
}

// Stop stops our RapidPro backend, closing our db and redis connections
func (b *backend) Stop() error {
// close our stop channel
Expand Down Expand Up @@ -741,7 +764,7 @@
return health.String()
}

func (b *backend) Heartbeat() error {
func (b *backend) reportMetrics() error {
rc := b.rp.Get()
defer rc.Close()

Expand Down
4 changes: 0 additions & 4 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,6 @@ func (ts *BackendTestSuite) TestHealth() {
ts.Equal(ts.b.Health(), "")
}

func (ts *BackendTestSuite) TestHeartbeat() {
ts.NoError(ts.b.Heartbeat())
}

func (ts *BackendTestSuite) TestCheckForDuplicate() {
rc := ts.b.rp.Get()
defer rc.Close()
Expand Down
19 changes: 0 additions & 19 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,6 @@ func (s *server) Start() error {
}
}()

s.waitGroup.Add(1)

// start our heartbeat
go func() {
defer s.waitGroup.Done()

for !s.stopped {
select {
case <-s.stopChan:
return
case <-time.After(time.Minute):
err := s.backend.Heartbeat()
if err != nil {
slog.Error("error running backend heartbeat", "error", err)
}
}
}
}()

slog.Info(fmt.Sprintf("server listening on %d", s.config.Port),
"comp", "server",
"port", s.config.Port,
Expand Down
5 changes: 0 additions & 5 deletions test/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,6 @@ func (mb *MockBackend) Status() string {
return "ALL GOOD"
}

// Heartbeat is a noop for our mock backend
func (mb *MockBackend) Heartbeat() error {
return nil
}

// RedisPool returns the redisPool for this backend
func (mb *MockBackend) RedisPool() *redis.Pool {
return mb.redisPool
Expand Down
Loading