Skip to content

Commit

Permalink
Merge pull request #38 from linkdata/unify-ws-queue
Browse files Browse the repository at this point in the history
Unify ws queue
  • Loading branch information
linkdata authored Feb 19, 2024
2 parents ea84827 + 3c280be commit f95f7f3
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 108 deletions.
15 changes: 5 additions & 10 deletions element.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Element struct {
*Request // (read-only) the Request the Element belongs to
// internals
ui UI // the UI object
wsQueue []wsMsg // changes queued
handlers []EventHandler // custom event handlers registered, if any
jid jid.Jid // JaWS ID, unique to this Element within it's Request
updating bool // about to have Update() called
Expand Down Expand Up @@ -65,15 +64,11 @@ func (e *Element) Render(w io.Writer, params []any) (err error) {

func (e *Element) queue(wht what.What, data string) {
if !e.deleted {
if len(e.wsQueue) < maxWsQueueLengthPerElement {
e.wsQueue = append(e.wsQueue, wsMsg{
Data: data,
Jid: e.jid,
What: wht,
})
} else {
e.Request.cancel(newErrWebsocketQueueOverflow(e))
}
e.Request.queue(wsMsg{
Data: data,
Jid: e.jid,
What: wht,
})
}
}

Expand Down
9 changes: 7 additions & 2 deletions element_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ func TestElement_Tag(t *testing.T) {
is.True(!e.HasTag(Tag("zomg")))
e.Tag(Tag("zomg"))
is.True(e.HasTag(Tag("zomg")))
is.True(strings.Contains(e.String(), "zomg"))
rq.mu.RLock()
defer rq.mu.RUnlock()
s := e.String()
if !strings.Contains(s, "zomg") {
t.Error(s)
}
}

func TestElement_Queued(t *testing.T) {
Expand All @@ -100,7 +105,7 @@ func TestElement_Queued(t *testing.T) {
e.Order([]jid.Jid{1, 2})
replaceHtml := template.HTML(fmt.Sprintf("<div id=\"%s\"></div>", e.Jid().String()))
e.Replace(replaceHtml)
th.Equal(e.wsQueue, []wsMsg{
th.Equal(rq.wsQueue, []wsMsg{
{
Data: "hidden\n",
Jid: e.jid,
Expand Down
27 changes: 0 additions & 27 deletions errwebsocketqueueoverflow.go

This file was deleted.

85 changes: 53 additions & 32 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/netip"
"sort"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -43,6 +44,7 @@ type Request struct {
connectFn ConnectFn // a ConnectFn to call before starting message processing for the Request
elems []*Element
tagMap map[any][]*Element
wsQueue []wsMsg
}

type eventFnCall struct {
Expand Down Expand Up @@ -249,12 +251,12 @@ func (rq *Request) TagsOf(elem *Element) (tags []any) {
if elem != nil {
if rq.mu.TryRLock() {
defer rq.mu.RUnlock()
for tag, elems := range rq.tagMap {
for _, e := range elems {
if e == elem {
tags = append(tags, tag)
break
}
}
for tag, elems := range rq.tagMap {
for _, e := range elems {
if e == elem {
tags = append(tags, tag)
break
}
}
}
Expand Down Expand Up @@ -446,16 +448,12 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
}
}()

var wsQueue []wsMsg

for {
var tagmsg Message
var wsmsg wsMsg
var ok bool

if len(wsQueue) > 0 {
wsQueue = rq.sendQueue(outboundMsgCh, wsQueue)
}
rq.sendQueue(outboundMsgCh)

// Empty the dirty tags list and call JawsUpdate()
// for identified elements. This queues up wsMsg's
Expand All @@ -464,18 +462,16 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
elem.Update()
}

// Append pending WS messages to the queue
/*// Append pending WS messages to the queue
// in the order of Element creation.
rq.mu.RLock()
for _, elem := range rq.elems {
wsQueue = append(wsQueue, elem.wsQueue...)
elem.wsQueue = elem.wsQueue[:0]
}
rq.mu.RUnlock()
rq.mu.RUnlock()*/

if len(wsQueue) > 0 {
wsQueue = rq.sendQueue(outboundMsgCh, wsQueue)
}
rq.sendQueue(outboundMsgCh)

select {
case <-jawsDoneCh:
Expand Down Expand Up @@ -509,7 +505,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
case *Request:
case string:
// target is a regular HTML ID
wsQueue = append(wsQueue, wsMsg{
rq.queue(wsMsg{
Data: v + "\t" + strconv.Quote(tagmsg.Data),
What: tagmsg.What,
Jid: -1,
Expand All @@ -520,7 +516,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM

switch tagmsg.What {
case what.Reload, what.Redirect, what.Order, what.Alert:
wsQueue = append(wsQueue, wsMsg{
rq.queue(wsMsg{
Jid: 0,
Data: tagmsg.Data,
What: tagmsg.What,
Expand All @@ -529,7 +525,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
for _, elem := range todo {
switch tagmsg.What {
case what.Delete:
wsQueue = append(wsQueue, wsMsg{
rq.queue(wsMsg{
Jid: elem.Jid(),
What: what.Delete,
})
Expand All @@ -546,7 +542,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
// an error to be sent out as an alert message.
// primary usecase is tests.
if err := rq.Jaws.Log(rq.callAllEventHandlers(elem.Jid(), tagmsg.What, tagmsg.Data)); err != nil {
wsQueue = append(wsQueue, wsMsg{
rq.queue(wsMsg{
Data: tagmsg.Data,
Jid: elem.Jid(),
What: what.Alert,
Expand All @@ -555,7 +551,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
case what.Update:
elem.Update()
default:
wsQueue = append(wsQueue, wsMsg{
rq.queue(wsMsg{
Data: tagmsg.Data,
Jid: elem.Jid(),
What: tagmsg.What,
Expand All @@ -576,6 +572,12 @@ func (rq *Request) handleRemove(data string) {
}
}

func (rq *Request) queue(msg wsMsg) {
// rq.mu.Lock()
rq.wsQueue = append(rq.wsQueue, msg)
// rq.mu.Unlock()
}

func (rq *Request) callAllEventHandlers(id Jid, wht what.What, val string) (err error) {
var elems []*Element
rq.mu.RLock()
Expand Down Expand Up @@ -628,20 +630,38 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) {
}
}

func (rq *Request) wsSend(outboundMsgCh chan<- wsMsg, msg wsMsg) {
select {
case <-rq.Done():
case outboundMsgCh <- msg:
default:
panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg))
func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg) {
var toSend []wsMsg
validJids := map[Jid]struct{}{}

rq.mu.Lock()
if len(rq.wsQueue) > 0 {
for _, elem := range rq.elems {
if !elem.deleted {
validJids[elem.Jid()] = struct{}{}
}
}
for _, msg := range rq.wsQueue {
ok := msg.Jid < 1 || msg.What == what.Delete
if !ok {
_, ok = validJids[msg.Jid]
}
if ok {
toSend = append(toSend, msg)
}
}
rq.wsQueue = rq.wsQueue[:0]
}
}
rq.mu.Unlock()

func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg, wsQueue []wsMsg) []wsMsg {
for _, msg := range wsQueue {
rq.wsSend(outboundMsgCh, msg)
for _, msg := range toSend {
select {
case <-rq.Done():
case outboundMsgCh <- msg:
default:
panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg))
}
}
return wsQueue[:0]
}

func deleteElement(s []*Element, e *Element) []*Element {
Expand Down Expand Up @@ -693,6 +713,7 @@ func (rq *Request) makeUpdateList() (todo []*Element) {
elem.updating = false
}
rq.todoDirt = rq.todoDirt[:0]
sort.Slice(todo, func(i, j int) bool { return todo[i].Jid() < todo[j].Jid() })
return
}

Expand Down
52 changes: 19 additions & 33 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package jaws
import (
"context"
"errors"
"html/template"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -68,14 +65,18 @@ func TestRequest_SendArrivesOk(t *testing.T) {
rq := newTestRequest()
defer rq.Close()
jid := rq.Register("foo")
elem := rq.getElementByJid(jid)
is.True(elem != nil)
rq.jw.Broadcast(Message{Dest: Tag("foo"), What: what.Inner, Data: "bar"})
select {
case <-time.NewTimer(testTimeout).C:
is.Fail()
case <-time.NewTimer(time.Hour).C:
is.Error("timeout")
case msg := <-rq.outCh:
elem := rq.getElementByJid(jid)
is.True(elem != nil)
is.Equal(msg, wsMsg{Jid: elem.jid, Data: "bar", What: what.Inner})
if elem != nil {
is.Equal(msg, wsMsg{Jid: elem.jid, Data: "bar", What: what.Inner})
}
}
}

Expand Down Expand Up @@ -573,38 +574,23 @@ func TestRequest_ConnectFn(t *testing.T) {
th.Equal(rq.onConnect(), wantErr)
}

func TestRequest_WsQueueOverflowCancels(t *testing.T) {
th := newTestHelper(t)
jw := New()
defer jw.Close()
hr := httptest.NewRequest(http.MethodGet, "/", nil)
rq := jw.NewRequest(hr)
elem := rq.NewElement(NewUiDiv(makeHtmlGetter("foo")))
go func() {
for i := 0; i < maxWsQueueLengthPerElement*10; i++ {
elem.SetInner(template.HTML(strconv.Itoa(i)))
}
}()
select {
case <-th.C:
th.Timeout()
case <-rq.Done():
}
th.True(errors.Is(context.Cause(rq.Context()), ErrWebsocketQueueOverflow))
}

func TestRequest_Dirty(t *testing.T) {
th := newTestHelper(t)
rq := newTestRequest()
defer rq.Close()

tss := &testUi{s: "foo"}
rq.UI(NewUiText(tss))
th.Equal(tss.getCalled, int32(1))
th.True(strings.Contains(string(rq.BodyString()), "foo"))

rq.Dirty(tss)
for atomic.LoadInt32(&tss.getCalled) < 2 {
tss1 := &testUi{s: "foo1"}
tss2 := &testUi{s: "foo2"}
rq.UI(NewUiText(tss1))
rq.UI(NewUiText(tss2))
th.Equal(tss1.getCalled, int32(1))
th.Equal(tss2.getCalled, int32(1))
th.True(strings.Contains(string(rq.BodyString()), "foo1"))
th.True(strings.Contains(string(rq.BodyString()), "foo2"))

rq.Dirty(tss1)
rq.Dirty(tss2)
for atomic.LoadInt32(&tss1.getCalled) < 2 && atomic.LoadInt32(&tss2.getCalled) < 2 {
select {
case <-th.C:
th.Timeout()
Expand Down
6 changes: 3 additions & 3 deletions uicontainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ func TestRequest_Container_Alteration(t *testing.T) {
t.Fatal(err)
}
tt.c.contents = tt.l
ui.JawsUpdate(elem)
if !slices.Equal(elem.wsQueue, tt.want) {
t.Errorf("got %v, want %v", elem.wsQueue, tt.want)
elem.Update()
if !slices.Equal(rq.wsQueue, tt.want) {
t.Errorf("got %v, want %v", rq.wsQueue, tt.want)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion uihtmlinner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestUiHtmlInner_JawsUpdate(t *testing.T) {
Jid: 1,
What: what.Inner,
}}
if !slices.Equal(elem.wsQueue, want) {
if !slices.Equal(rq.wsQueue, want) {
t.Errorf("got %v, want %v", elem.wsQueue, want)
}
}

0 comments on commit f95f7f3

Please sign in to comment.