Skip to content

Commit

Permalink
Improve logs publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
pustovalov committed Dec 13, 2023
1 parent fd291d8 commit da335db
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions endpoint/eventsEth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package endpoint
import (
"context"
"errors"
"sync"

"github.com/aurora-is-near/relayer2-base/broker"
"github.com/aurora-is-near/relayer2-base/endpoint"
Expand All @@ -18,17 +19,19 @@ var (

type EventsEth struct {
*endpoint.Endpoint
eventBroker broker.Broker
newHeadsCh chan event.Block
logsChMap map[rpc.ID]chan event.Logs
eventBroker broker.Broker
newHeadsCh chan event.Block
logsChMap map[rpc.ID]chan event.Logs
logsChMapMutex sync.Mutex
}

func NewEventsEth(ep *endpoint.Endpoint, eb broker.Broker) *EventsEth {
return &EventsEth{
Endpoint: ep,
eventBroker: eb,
newHeadsCh: make(chan event.Block, events.NewHeadsChSize),
logsChMap: make(map[rpc.ID]chan event.Logs),
Endpoint: ep,
eventBroker: eb,
newHeadsCh: make(chan event.Block, events.NewHeadsChSize),
logsChMap: make(map[rpc.ID]chan event.Logs),
logsChMapMutex: sync.Mutex{},
}
}

Expand Down Expand Up @@ -66,7 +69,9 @@ func (e *EventsEth) Logs(ctx context.Context, subOpts request.LogSubscriptionOpt

go func() {
logsCh := make(chan event.Logs, events.LogsChSize)
e.logsChMapMutex.Lock()
e.logsChMap[rpcSub.ID] = logsCh
e.logsChMapMutex.Unlock()

logsSubs := e.eventBroker.SubscribeLogs(subOpts, logsCh)
for {
Expand All @@ -77,7 +82,9 @@ func (e *EventsEth) Logs(ctx context.Context, subOpts request.LogSubscriptionOpt
}
case <-rpcSub.Err():
e.eventBroker.UnsubscribeFromLogs(logsSubs)
e.logsChMapMutex.Lock()
delete(e.logsChMap, rpcSub.ID)
e.logsChMapMutex.Unlock()
return
}
}
Expand Down

0 comments on commit da335db

Please sign in to comment.