Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle context done in event processing #108

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 90 additions & 53 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package blockchain

import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -17,8 +19,14 @@ import (
"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

// Stop events listening when no new events received for this time.
const EventsListeningTimeout = 60 * time.Second
const (
// EventsListeningTimeout stop events listening when no new events received for this time.
EventsListeningTimeout = 60 * time.Second
)

var (
ErrHeaderChannelClosed = errors.New("header channel closed")
)

type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) error

Expand Down Expand Up @@ -85,19 +93,22 @@ func (c *Client) ListenEvents(
g, ctx := errgroup.WithContext(ctx)

liveHeadersC := sub.Chan()
go func() {
g.Go(func() error {
defer sub.Unsubscribe()

<-ctx.Done()
sub.Unsubscribe()
}()

return ctx.Err()
})

// Query historical headers.
histHeadersC := make(chan types.Header)
g.Go(func() error {
defer close(histHeadersC)
defer close(histHeadersC)
khssnv marked this conversation as resolved.
Show resolved Hide resolved

firstLiveHeader, ok := <-liveHeadersC // first live header will be the last historical
if !ok {
return ctx.Err()
g.Go(func() error {
firstLiveHeader, err := getFirstLiveHeader(ctx, liveHeadersC)
if err != nil {
return err
}

for block := begin; block < firstLiveHeader.Number; block++ {
Expand Down Expand Up @@ -129,80 +140,74 @@ func (c *Client) ListenEvents(

// Sequence historical and live headers.
headersC := make(chan types.Header, 2)
g.Go(func() error {
defer close(headersC)

for header := range histHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}
defer close(headersC)

for header := range liveHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
g.Go(func() error {
if err = forwardHeaders(ctx, histHeadersC, headersC); err != nil {
return err
}

return nil
return forwardHeaders(ctx, liveHeadersC, headersC)
})

// Retrieve events skipping blocks before 'begin'.
eventsC := make(chan blockEvents, 2)
g.Go(func() error {
defer close(eventsC)

for header := range headersC {
if header.Number < begin {
continue
}

hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
return err
}

events, err := retriever.GetEvents(hash)
if err != nil {
return err
}
defer close(eventsC)

g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case eventsC <- blockEvents{
Events: events,
Hash: hash,
Number: header.Number,
}:
case header := <-headersC:
khssnv marked this conversation as resolved.
Show resolved Hide resolved
if header.Number < begin {
continue
}

hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
return err
}

events, err := retriever.GetEvents(hash)
if err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case eventsC <- blockEvents{
Events: events,
Hash: hash,
Number: header.Number,
}:
}
}
}

return nil
})

// Invoke listeners.
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case blockEvents := <-eventsC:
for callback := range c.eventsListeners {
err := (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
return fmt.Errorf("callback func failed: %w", err)
}
}

if after != nil {
err := after(blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
return fmt.Errorf("after func failed: %w", err)
}
}

// Watchdog for the websocket. It silently hangs sometimes with no error nor new events. In
// all Cere blockchain runtimes we have `pallet-timestamp` which makes at least one event
// (System.ExtrinsicSuccess for the timestamp.set extrinsic) per block.
Expand All @@ -215,6 +220,38 @@ func (c *Client) ListenEvents(
return g.Wait()
}

func forwardHeaders(ctx context.Context, from <-chan types.Header, to chan types.Header) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case header, ok := <-from:
if !ok {
return ErrHeaderChannelClosed
}

select {
case <-ctx.Done():
return ctx.Err()
case to <- header:
}
}
}
}

func getFirstLiveHeader(ctx context.Context, c <-chan types.Header) (types.Header, error) {
select {
case <-ctx.Done():
return types.Header{}, ctx.Err()
case firstLiveHeader, ok := <-c:
if !ok {
return types.Header{}, ErrHeaderChannelClosed
}

return firstLiveHeader, nil
}
}

// RegisterEventsListener subscribes given callback to blockchain events.
func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelFunc {
c.mu.Lock()
Expand Down
Loading