From 40173fbc1b813bd5cc4465faa049714b88da5f60 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 11 Jan 2024 10:21:57 -0500 Subject: [PATCH] Rename StreamRegistry => Registry --- core/services/chainlink/application.go | 2 +- core/services/streams/delegate.go | 6 +++--- core/services/streams/stream_registry.go | 10 +++++----- core/services/streams/stream_registry_test.go | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 5c350a8d146..a9f9c22df52 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -291,7 +291,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) - streamRegistry = streams.NewStreamRegistry(globalLogger, pipelineRunner) + streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner) ) for _, chain := range legacyEVMChains.Slice() { diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index 31352ecbdc4..fac2c7a36b3 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -23,14 +23,14 @@ type DelegateConfig interface { type Delegate struct { lggr logger.Logger - registry StreamRegistry + registry Registry runner ocrcommon.Runner cfg DelegateConfig } var _ job.Delegate = (*Delegate)(nil) -func NewDelegate(lggr logger.Logger, registry StreamRegistry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate { +func NewDelegate(lggr logger.Logger, registry Registry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate { return &Delegate{lggr, registry, runner, cfg} } @@ -66,7 +66,7 @@ type ResultRunSaver interface { } type StreamService struct { - registry StreamRegistry + registry Registry id StreamID spec *pipeline.Spec lggr logger.Logger diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 086a2f2c7bc..6034c5d6b4a 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -14,7 +14,7 @@ func (s StreamID) String() string { return string(s) } -type StreamRegistry interface { +type Registry interface { Get(streamID StreamID) (strm Stream, exists bool) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error Unregister(streamID StreamID) @@ -27,14 +27,14 @@ type streamRegistry struct { streams map[StreamID]Stream } -func NewStreamRegistry(lggr logger.Logger, runner Runner) StreamRegistry { - return newStreamRegistry(lggr, runner) +func NewRegistry(lggr logger.Logger, runner Runner) Registry { + return newRegistry(lggr, runner) } -func newStreamRegistry(lggr logger.Logger, runner Runner) *streamRegistry { +func newRegistry(lggr logger.Logger, runner Runner) *streamRegistry { return &streamRegistry{ sync.RWMutex{}, - lggr.Named("StreamRegistry"), + lggr.Named("Registry"), runner, make(map[StreamID]Stream), } diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 08298a66ce0..2c7c2bd6ecc 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -21,12 +21,12 @@ func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunRe return m.run, m.trrs, m.err } -func Test_StreamRegistry(t *testing.T) { +func Test_Registry(t *testing.T) { lggr := logger.TestLogger(t) runner := &mockRunner{} t.Run("Get", func(t *testing.T) { - sr := newStreamRegistry(lggr, runner) + sr := newRegistry(lggr, runner) sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} @@ -49,7 +49,7 @@ func Test_StreamRegistry(t *testing.T) { assert.False(t, exists) }) t.Run("Register", func(t *testing.T) { - sr := newStreamRegistry(lggr, runner) + sr := newRegistry(lggr, runner) t.Run("registers new stream", func(t *testing.T) { assert.Len(t, sr.streams, 0) @@ -79,7 +79,7 @@ func Test_StreamRegistry(t *testing.T) { }) }) t.Run("Unregister", func(t *testing.T) { - sr := newStreamRegistry(lggr, runner) + sr := newRegistry(lggr, runner) sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}}