Skip to content

Commit

Permalink
Gracefully shutdown analytics module/runner (#3335)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongshixi authored Jul 29, 2024
1 parent 466ff83 commit ed3e4a1
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 10 deletions.
5 changes: 5 additions & 0 deletions analytics/agma/agma_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ func (l *AgmaLogger) LogVideoObject(event *analytics.VideoObject) {
l.bufferCh <- data
}

func (l *AgmaLogger) Shutdown() {
glog.Info("[AgmaAnalytics] Shutdown, trying to flush buffer")
l.flush() // mutex safe
}

func (l *AgmaLogger) LogCookieSyncObject(event *analytics.CookieSyncObject) {}
func (l *AgmaLogger) LogNotificationEventObject(event *analytics.NotificationEvent) {}
func (l *AgmaLogger) LogSetUIDObject(event *analytics.SetUIDObject) {}
35 changes: 35 additions & 0 deletions analytics/agma/agma_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,38 @@ func TestRaceEnd2End(t *testing.T) {

assert.Equal(t, expected, actual)
}

func TestShutdownFlush(t *testing.T) {
cfg := config.AgmaAnalytics{
Enabled: true,
Endpoint: config.AgmaAnalyticsHttpEndpoint{
Url: "http://localhost:8000/event",
Timeout: "5s",
},
Buffers: config.AgmaAnalyticsBuffer{
EventCount: 1000,
BufferSize: "100mb",
Timeout: "5m",
},
Accounts: []config.AgmaAnalyticsAccount{
{
PublisherId: "track-me",
Code: "abc",
},
},
}
mockedSender := new(MockedSender)
mockedSender.On("Send", mock.Anything).Return(nil)
clockMock := clock.NewMock()
logger, err := newAgmaLogger(cfg, mockedSender.Send, clockMock)
assert.NoError(t, err)

go logger.start()
logger.LogAuctionObject(&mockValidAuctionObject)
logger.Shutdown()

time.Sleep(10 * time.Millisecond)

mockedSender.AssertCalled(t, "Send", mock.Anything)
mockedSender.AssertNumberOfCalls(t, "Send", 1)
}
7 changes: 7 additions & 0 deletions analytics/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ func (ea enabledAnalytics) LogNotificationEventObject(ne *analytics.Notification
}
}

// Shutdown - correctly shutdown all analytics modules and wait for them to finish
func (ea enabledAnalytics) Shutdown() {
for _, module := range ea {
module.Shutdown()
}
}

func evaluateActivities(rw *openrtb_ext.RequestWrapper, ac privacy.ActivityControl, componentName string) (bool, *openrtb_ext.RequestWrapper) {
// returned nil request wrapper means that request wrapper was not modified by activities and doesn't have to be changed in analytics object
// it is needed in order to use one function for all analytics objects with RequestWrapper
Expand Down
17 changes: 17 additions & 0 deletions analytics/build/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func (m *sampleModule) LogAmpObject(ao *analytics.AmpObject) { *m.count++ }

func (m *sampleModule) LogNotificationEventObject(ne *analytics.NotificationEvent) { *m.count++ }

func (m *sampleModule) Shutdown() { *m.count++ }

func initAnalytics(count *int) analytics.Runner {
modules := make(enabledAnalytics, 0)
modules["sampleModule"] = &sampleModule{count}
Expand All @@ -92,6 +94,19 @@ func TestNewPBSAnalytics(t *testing.T) {
assert.Equal(t, len(instance), 0)
}

func TestPBSAnalyticsShutdown(t *testing.T) {
countA := 0
countB := 0
modules := make(enabledAnalytics, 0)
modules["sampleModuleA"] = &sampleModule{count: &countA}
modules["sampleModuleB"] = &sampleModule{count: &countB}

modules.Shutdown()

assert.Equal(t, 1, countA, "sampleModuleA should have been shutdown")
assert.Equal(t, 1, countB, "sampleModuleB should have been shutdown")
}

func TestNewPBSAnalytics_FileLogger(t *testing.T) {
if _, err := os.Stat(TEST_DIR); os.IsNotExist(err) {
if err = os.MkdirAll(TEST_DIR, 0755); err != nil {
Expand Down Expand Up @@ -415,6 +430,8 @@ func (m *mockAnalytics) LogSetUIDObject(ao *analytics.SetUIDObject) {}

func (m *mockAnalytics) LogNotificationEventObject(ao *analytics.NotificationEvent) {}

func (m *mockAnalytics) Shutdown() {}

func TestLogObject(t *testing.T) {
tests := []struct {
description string
Expand Down
1 change: 1 addition & 0 deletions analytics/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Module interface {
LogSetUIDObject(*SetUIDObject)
LogAmpObject(*AmpObject)
LogNotificationEventObject(*NotificationEvent)
Shutdown()
}

// Loggable object of a transaction at /openrtb2/auction endpoint
Expand Down
27 changes: 20 additions & 7 deletions analytics/filesystem/file_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"bytes"
"fmt"

"github.com/chasex/glog"
cglog "github.com/chasex/glog"
"github.com/golang/glog"
"github.com/prebid/openrtb/v20/openrtb2"
"github.com/prebid/prebid-server/v2/analytics"
"github.com/prebid/prebid-server/v2/util/jsonutil"
Expand All @@ -21,9 +22,14 @@ const (
NOTIFICATION_EVENT RequestType = "/event"
)

type Logger interface {
Debug(v ...interface{})
Flush()
}

// Module that can perform transactional logging
type FileLogger struct {
Logger *glog.Logger
Logger Logger
}

// Writes AuctionObject to file
Expand Down Expand Up @@ -85,15 +91,22 @@ func (f *FileLogger) LogNotificationEventObject(ne *analytics.NotificationEvent)
f.Logger.Flush()
}

// Shutdown the logger
func (f *FileLogger) Shutdown() {
// clear all pending buffered data in case there is any
glog.Info("[FileLogger] Shutdown, trying to flush buffer")
f.Logger.Flush()
}

// Method to initialize the analytic module
func NewFileLogger(filename string) (analytics.Module, error) {
options := glog.LogOptions{
options := cglog.LogOptions{
File: filename,
Flag: glog.LstdFlags,
Level: glog.Ldebug,
Mode: glog.R_Day,
Flag: cglog.LstdFlags,
Level: cglog.Ldebug,
Mode: cglog.R_Day,
}
if logger, err := glog.New(options); err == nil {
if logger, err := cglog.New(options); err == nil {
return &FileLogger{
logger,
}, nil
Expand Down
25 changes: 25 additions & 0 deletions analytics/filesystem/file_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,25 @@ import (

"github.com/prebid/prebid-server/v2/analytics"
"github.com/prebid/prebid-server/v2/config"
"github.com/stretchr/testify/mock"

"github.com/prebid/openrtb/v20/openrtb2"
)

const TEST_DIR string = "testFiles"

type MockLogger struct {
mock.Mock
}

func (ml *MockLogger) Debug(v ...interface{}) {
ml.Called(v)
}

func (ml *MockLogger) Flush() {
ml.Called()
}

func TestAmpObject_ToJson(t *testing.T) {
ao := &analytics.AmpObject{
Status: http.StatusOK,
Expand Down Expand Up @@ -97,3 +110,15 @@ func TestFileLogger_LogObjects(t *testing.T) {
t.Fatalf("Couldn't initialize file logger: %v", err)
}
}

func TestFileLoggerShutdown(t *testing.T) {
mockLogger := &MockLogger{}
fl := &FileLogger{
Logger: mockLogger,
}
mockLogger.On("Flush").Return(nil)

fl.Shutdown()

mockLogger.AssertNumberOfCalls(t, "Flush", 1)
}
6 changes: 6 additions & 0 deletions analytics/pubstack/pubstack_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func (p *PubstackModule) LogAmpObject(ao *analytics.AmpObject) {
p.eventChannels[amp].Push(payload)
}

// Shutdown - no op since the analytic module already implements system signal handling
// and trying to close a closed channel will cause panic
func (p *PubstackModule) Shutdown() {
glog.Info("[PubstackModule] Shutdown")
}

func (p *PubstackModule) start(c <-chan *Configuration) {
for {
select {
Expand Down
1 change: 1 addition & 0 deletions analytics/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type Runner interface {
LogSetUIDObject(*SetUIDObject)
LogAmpObject(*AmpObject, privacy.ActivityControl)
LogNotificationEventObject(*NotificationEvent, privacy.ActivityControl)
Shutdown()
}
4 changes: 4 additions & 0 deletions endpoints/cookie_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,10 @@ func (m *MockAnalyticsRunner) LogNotificationEventObject(obj *analytics.Notifica
m.Called(obj, ac)
}

func (m *MockAnalyticsRunner) Shutdown() {
m.Called()
}

type MockGDPRPerms struct {
mock.Mock
}
Expand Down
2 changes: 2 additions & 0 deletions endpoints/events/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (e *eventsMockAnalyticsModule) LogNotificationEventObject(ne *analytics.Not
e.Invoked = true
}

func (e *eventsMockAnalyticsModule) Shutdown() {}

var mockAccountData = map[string]json.RawMessage{
"events_enabled": json.RawMessage(`{"events": {"enabled":true}}`),
"events_disabled": json.RawMessage(`{"events": {"enabled":false}}`),
Expand Down
1 change: 1 addition & 0 deletions endpoints/openrtb2/amp_auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,7 @@ func (logger mockLogger) LogNotificationEventObject(uuidObj *analytics.Notificat
func (logger mockLogger) LogAmpObject(ao *analytics.AmpObject, _ privacy.ActivityControl) {
*logger.ampObject = *ao
}
func (logger mockLogger) Shutdown() {}

func TestBuildAmpObject(t *testing.T) {
testCases := []struct {
Expand Down
2 changes: 2 additions & 0 deletions endpoints/openrtb2/video_auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,8 @@ func (m *mockAnalyticsModule) LogAmpObject(ao *analytics.AmpObject, _ privacy.Ac
func (m *mockAnalyticsModule) LogNotificationEventObject(ne *analytics.NotificationEvent, _ privacy.ActivityControl) {
}

func (m *mockAnalyticsModule) Shutdown() {}

func mockDeps(t *testing.T, ex *mockExchangeVideo) *endpointDeps {
return &endpointDeps{
fakeUUIDGenerator{},
Expand Down
17 changes: 14 additions & 3 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ type Router struct {
*httprouter.Router
MetricsEngine *metricsConf.DetailedMetricsEngine
ParamsValidator openrtb_ext.BidderParamValidator
Shutdown func()

shutdowns []func()
}

func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *Router, err error) {
Expand Down Expand Up @@ -201,11 +202,12 @@ func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *R
// Metrics engine
r.MetricsEngine = metricsConf.NewMetricsEngine(cfg, openrtb_ext.CoreBidderNames(), syncerKeys, moduleStageNames)
shutdown, fetcher, ampFetcher, accounts, categoriesFetcher, videoFetcher, storedRespFetcher := storedRequestsConf.NewStoredRequests(cfg, r.MetricsEngine, generalHttpClient, r.Router)
// todo(zachbadgett): better shutdown
r.Shutdown = shutdown

analyticsRunner := analyticsBuild.New(&cfg.Analytics)

// register the analytics runner for shutdown
r.shutdowns = append(r.shutdowns, shutdown, analyticsRunner.Shutdown)

paramsValidator, err := openrtb_ext.NewBidderParamsValidator(schemaDirectory)
if err != nil {
glog.Fatalf("Failed to create the bidder params validator. %v", err)
Expand Down Expand Up @@ -301,6 +303,15 @@ func New(cfg *config.Configuration, rateConvertor *currency.RateConverter) (r *R
return r, nil
}

// Shutdown closes any dependencies of the router that may need closing
func (r *Router) Shutdown() {
glog.Info("[PBS Router] shutting down")
for _, shutdown := range r.shutdowns {
shutdown()
}
glog.Info("[PBS Router] shut down")
}

func checkSupportedUserSyncEndpoints(bidderInfos config.BidderInfos) error {
for name, info := range bidderInfos {
if info.Syncer == nil {
Expand Down

0 comments on commit ed3e4a1

Please sign in to comment.