Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay WebSocket library refactor #1930

Merged
merged 53 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
43b12ef
Move Broadcast message objects to separate library
lambchr Oct 10, 2023
3f119a4
Create backlog library
lambchr Oct 10, 2023
342e3c9
create a race condition test for backlog library
lambchr Oct 11, 2023
b55d38e
fix backlog race conditions
lambchr Oct 11, 2023
fc4db62
add delete method to race condition test
lambchr Oct 11, 2023
d805660
swap catchupbuffer for backlog in wsbroadcastserver
lambchr Oct 18, 2023
54ea210
fix go vet errors
lambchr Oct 19, 2023
946046a
use Lookup() and requestedSeqNum instead of Head() when sending messa…
lambchr Nov 1, 2023
ff440a4
change warn to error
lambchr Nov 1, 2023
5d36e6b
change lookupByIndex map values from atomic pointer to backlogSegment…
lambchr Nov 1, 2023
bff53fc
remove max catchup TODO - not necessary
lambchr Nov 1, 2023
1faa39e
fix broadcaster tests after Head -> Lookup change in clientconnection
lambchr Nov 1, 2023
06d72a8
fix broadcast client invalid signature test
lambchr Nov 2, 2023
a0bf64b
fix backlog sending duplicate messages to clients
lambchr Nov 7, 2023
1858770
fix BroadcastClient test TestBroadcastClientConfirmedMessage
lambchr Nov 9, 2023
66357cf
move flateWriter to serializeMessage to avoid race condition
lambchr Nov 10, 2023
b046b8f
refactor ClientConnection to stop relying on ClientManager - CM will …
lambchr Nov 14, 2023
7cf1b2b
refactor backlog to allow deletion of all confirmed messages from seg…
lambchr Nov 14, 2023
a265e27
fix messages in broadcastclients
lambchr Nov 14, 2023
617c0d6
fix lint errors
lambchr Nov 14, 2023
8e72df6
remove unnecessary exported functions from BacklogSegment interface a…
lambchr Nov 15, 2023
ea46fd9
ensure the ClientConnection only sends the requested messages to the …
lambchr Nov 15, 2023
60ce382
fix backlog lint error
lambchr Nov 15, 2023
5e2f1aa
ensure WSBroadcast server sends the whole backlog when the requested …
lambchr Nov 16, 2023
812e7a6
add comments to backlog functions
lambchr Nov 16, 2023
0be22bc
change lookupByKey to containers.SyncMap and remove lookupLock
lambchr Nov 30, 2023
ffa6636
fix log.info line
lambchr Nov 30, 2023
3403b7b
fix potential delete race condition in backlog.Get
lambchr Nov 30, 2023
d9d479d
remove backlogSegment type assertion
lambchr Nov 30, 2023
1fc038c
remove backlogSegment.start, backlogSegment.end and backlogSegment.me…
lambchr Nov 30, 2023
11e26ed
fix go vet errors from containers.SyncMap addition, change to a point…
lambchr Nov 30, 2023
c098840
return nil interface from backlogSegment.Next rather than nil *backlo…
lambchr Nov 30, 2023
ccb6499
return copies of messages slice from backlogSegment.Get and backlogSe…
lambchr Nov 30, 2023
2537247
get messagesLock for entire backlogSegment.Get and
lambchr Nov 30, 2023
369b5ea
remove metric TODO
lambchr Nov 30, 2023
c86fcfd
use length of messages slice in copy rather than segment limit to avo…
lambchr Dec 1, 2023
b706481
change IsBacklogSegmentNil to check for nil interface as backlogSegme…
lambchr Dec 1, 2023
7334858
add version as a constant to messages lib
lambchr Dec 1, 2023
20e5d69
Merge branch 'master' into cl/relay-refactor
lambchr Dec 4, 2023
39514fc
correct broadcaster lib
lambchr Dec 4, 2023
6d1eca1
only send one message with the confirmed sequence number
lambchr Dec 4, 2023
ebed210
add comment explaining additional doBroadcast call for confirmed mess…
lambchr Dec 4, 2023
fb17717
move last sent sequence number check from client manager to client co…
lambchr Dec 4, 2023
93144bf
fix sending only 1 message on confirm
lambchr Dec 4, 2023
4238891
change warn to debug
lambchr Dec 4, 2023
0bd866a
fix moving last sent check
lambchr Dec 4, 2023
fed7de2
remove recursive lock calls from backlogSegment object
lambchr Dec 11, 2023
5a7ac9e
reload head after lookup call
lambchr Dec 11, 2023
3b4c9b6
fix no new var error
lambchr Dec 11, 2023
da08db5
change copy slice to only copy the selected elements
lambchr Dec 11, 2023
69e8dde
remove recursive lock calls from backlogSegment object
lambchr Dec 13, 2023
756f616
Merge branch 'master' into cl/relay-refactor
PlasmaPower Dec 13, 2023
4b66827
Merge branch 'master' into cl/relay-refactor
PlasmaPower Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
if err != nil {
return fmt.Errorf("error getting tx streamer message count: %w", err)
}
var feedMessages []*broadcaster.BroadcastFeedMessage
var feedMessages []*m.BroadcastFeedMessage
for seqNum := startMessage; seqNum < messageCount; seqNum++ {
message, err := t.txStreamer.GetMessage(seqNum)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -426,7 +427,7 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC
return s.AddMessagesAndEndBatch(pos, messagesAreConfirmed, messages, nil)
}

func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFeedMessage) error {
if len(feedMessages) == 0 {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -117,7 +117,7 @@ var DefaultTestConfig = Config{
}

type TransactionStreamerInterface interface {
AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error
AddBroadcastMessages(feedMessages []*m.BroadcastFeedMessage) error
}

type BroadcastClient struct {
Expand Down Expand Up @@ -381,7 +381,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) {
backoffDuration = bc.config().ReconnectInitialBackoff

if msg != nil {
res := broadcaster.BroadcastMessage{}
res := m.BroadcastMessage{}
err = json.Unmarshal(msg, &res)
if err != nil {
log.Error("error unmarshalling message", "msg", msg, "err", err)
Expand Down Expand Up @@ -483,7 +483,7 @@ func (bc *BroadcastClient) StopAndWait() {
}
}

func (bc *BroadcastClient) isValidSignature(ctx context.Context, message *broadcaster.BroadcastFeedMessage) error {
func (bc *BroadcastClient) isValidSignature(ctx context.Context, message *m.BroadcastFeedMessage) error {
if bc.config().Verify.Dangerous.AcceptMissing && bc.sigVerifier == nil {
// Verifier disabled
return nil
Expand Down
7 changes: 4 additions & 3 deletions broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/testhelpers"
Expand Down Expand Up @@ -178,20 +179,20 @@ func TestInvalidSignature(t *testing.T) {
}

type dummyTransactionStreamer struct {
messageReceiver chan broadcaster.BroadcastFeedMessage
messageReceiver chan m.BroadcastFeedMessage
chainId uint64
sequencerAddr *common.Address
}

func NewDummyTransactionStreamer(chainId uint64, sequencerAddr *common.Address) *dummyTransactionStreamer {
return &dummyTransactionStreamer{
messageReceiver: make(chan broadcaster.BroadcastFeedMessage),
messageReceiver: make(chan m.BroadcastFeedMessage),
chainId: chainId,
sequencerAddr: sequencerAddr,
}
}

func (ts *dummyTransactionStreamer) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
func (ts *dummyTransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
ts.messageReceiver <- *feedMessage
}
Expand Down
12 changes: 6 additions & 6 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/stopwaiter"
)
Expand All @@ -25,14 +25,14 @@ const PRIMARY_FEED_UPTIME = time.Minute * 10

type Router struct {
stopwaiter.StopWaiter
messageChan chan broadcaster.BroadcastFeedMessage
messageChan chan m.BroadcastFeedMessage
confirmedSequenceNumberChan chan arbutil.MessageIndex

forwardTxStreamer broadcastclient.TransactionStreamerInterface
forwardConfirmationChan chan arbutil.MessageIndex
}

func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
func (r *Router) AddBroadcastMessages(feedMessages []*m.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
r.messageChan <- *feedMessage
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func NewBroadcastClients(
}
newStandardRouter := func() *Router {
return &Router{
messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
messageChan: make(chan m.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE),
forwardTxStreamer: txStreamer,
forwardConfirmationChan: confirmedSequenceNumberListener,
Expand Down Expand Up @@ -152,15 +152,15 @@ func (bcs *BroadcastClients) Start(ctx context.Context) {
defer stopSecondaryFeedTimer.Stop()
defer primaryFeedIsDownTimer.Stop()

msgHandler := func(msg broadcaster.BroadcastFeedMessage, router *Router) error {
msgHandler := func(msg m.BroadcastFeedMessage, router *Router) error {
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
return nil
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
return nil
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
if err := router.forwardTxStreamer.AddBroadcastMessages([]*m.BroadcastFeedMessage{&msg}); err != nil {
return err
}
return nil
Expand Down
Loading
Loading