From da335dbd8fa16fca8d318ef9c37be153a31896ec Mon Sep 17 00:00:00 2001 From: Pavel Pustovalov Date: Wed, 13 Dec 2023 23:27:21 +0100 Subject: [PATCH] Improve logs publishing --- endpoint/eventsEth.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/endpoint/eventsEth.go b/endpoint/eventsEth.go index 3e93b9e..5ff14c8 100644 --- a/endpoint/eventsEth.go +++ b/endpoint/eventsEth.go @@ -3,6 +3,7 @@ package endpoint import ( "context" "errors" + "sync" "github.com/aurora-is-near/relayer2-base/broker" "github.com/aurora-is-near/relayer2-base/endpoint" @@ -18,17 +19,19 @@ var ( type EventsEth struct { *endpoint.Endpoint - eventBroker broker.Broker - newHeadsCh chan event.Block - logsChMap map[rpc.ID]chan event.Logs + eventBroker broker.Broker + newHeadsCh chan event.Block + logsChMap map[rpc.ID]chan event.Logs + logsChMapMutex sync.Mutex } func NewEventsEth(ep *endpoint.Endpoint, eb broker.Broker) *EventsEth { return &EventsEth{ - Endpoint: ep, - eventBroker: eb, - newHeadsCh: make(chan event.Block, events.NewHeadsChSize), - logsChMap: make(map[rpc.ID]chan event.Logs), + Endpoint: ep, + eventBroker: eb, + newHeadsCh: make(chan event.Block, events.NewHeadsChSize), + logsChMap: make(map[rpc.ID]chan event.Logs), + logsChMapMutex: sync.Mutex{}, } } @@ -66,7 +69,9 @@ func (e *EventsEth) Logs(ctx context.Context, subOpts request.LogSubscriptionOpt go func() { logsCh := make(chan event.Logs, events.LogsChSize) + e.logsChMapMutex.Lock() e.logsChMap[rpcSub.ID] = logsCh + e.logsChMapMutex.Unlock() logsSubs := e.eventBroker.SubscribeLogs(subOpts, logsCh) for { @@ -77,7 +82,9 @@ func (e *EventsEth) Logs(ctx context.Context, subOpts request.LogSubscriptionOpt } case <-rpcSub.Err(): e.eventBroker.UnsubscribeFromLogs(logsSubs) + e.logsChMapMutex.Lock() delete(e.logsChMap, rpcSub.ID) + e.logsChMapMutex.Unlock() return } }