Skip to content

Commit

Permalink
Merge pull request #819 from nyaruka/move_heartbeat
Browse files Browse the repository at this point in the history
Make backend responsible for reporting its own metrics
  • Loading branch information
rowanseymour authored Dec 18, 2024
2 parents d4ec7e6 + 24f6a81 commit c6f48ad
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 32 deletions.
3 changes: 0 additions & 3 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ type Backend interface {
// Status returns a string describing the current status, this can detail queue sizes or other attributes
Status() string

// Heartbeat is called every minute, it can be used by backends to log status to a dashboard such as librato
Heartbeat() error

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool
}
Expand Down
25 changes: 24 additions & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,33 @@ func (b *backend) Start() error {
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "events"), b.flushChannelEventFile)

b.startMetricsReporter(time.Minute)

slog.Info("backend started", "comp", "backend", "state", "started")
return nil
}

func (b *backend) startMetricsReporter(interval time.Duration) {
b.waitGroup.Add(1)

go func() {
defer func() {
slog.Info("metrics reporter exiting")
b.waitGroup.Done()
}()

for {
select {
case <-b.stopChan:
b.reportMetrics()
return
case <-time.After(interval):
b.reportMetrics()
}
}
}()
}

// Stop stops our RapidPro backend, closing our db and redis connections
func (b *backend) Stop() error {
// close our stop channel
Expand Down Expand Up @@ -741,7 +764,7 @@ func (b *backend) Health() string {
return health.String()
}

func (b *backend) Heartbeat() error {
func (b *backend) reportMetrics() error {
rc := b.rp.Get()
defer rc.Close()

Expand Down
4 changes: 0 additions & 4 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,6 @@ func (ts *BackendTestSuite) TestHealth() {
ts.Equal(ts.b.Health(), "")
}

func (ts *BackendTestSuite) TestHeartbeat() {
ts.NoError(ts.b.Heartbeat())
}

func (ts *BackendTestSuite) TestCheckForDuplicate() {
rc := ts.b.rp.Get()
defer rc.Close()
Expand Down
19 changes: 0 additions & 19 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,6 @@ func (s *server) Start() error {
}
}()

s.waitGroup.Add(1)

// start our heartbeat
go func() {
defer s.waitGroup.Done()

for !s.stopped {
select {
case <-s.stopChan:
return
case <-time.After(time.Minute):
err := s.backend.Heartbeat()
if err != nil {
slog.Error("error running backend heartbeat", "error", err)
}
}
}
}()

slog.Info(fmt.Sprintf("server listening on %d", s.config.Port),
"comp", "server",
"port", s.config.Port,
Expand Down
5 changes: 0 additions & 5 deletions test/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,6 @@ func (mb *MockBackend) Status() string {
return "ALL GOOD"
}

// Heartbeat is a noop for our mock backend
func (mb *MockBackend) Heartbeat() error {
return nil
}

// RedisPool returns the redisPool for this backend
func (mb *MockBackend) RedisPool() *redis.Pool {
return mb.redisPool
Expand Down

0 comments on commit c6f48ad

Please sign in to comment.