From 568e68691aa6f635235961ec4563a3d0d219bda3 Mon Sep 17 00:00:00 2001 From: Johan Lindh Date: Mon, 19 Feb 2024 12:35:56 +0100 Subject: [PATCH 1/2] filter deleted in sendQueue --- request.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/request.go b/request.go index 3cc00b6..1e83339 100644 --- a/request.go +++ b/request.go @@ -628,18 +628,28 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) { } } -func (rq *Request) wsSend(outboundMsgCh chan<- wsMsg, msg wsMsg) { - select { - case <-rq.Done(): - case outboundMsgCh <- msg: - default: - panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg)) - } -} - func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg, wsQueue []wsMsg) []wsMsg { + validJids := map[Jid]struct{}{} + rq.mu.RLock() + for _, elem := range rq.elems { + if !elem.deleted { + validJids[elem.Jid()] = struct{}{} + } + } + rq.mu.RUnlock() for _, msg := range wsQueue { - rq.wsSend(outboundMsgCh, msg) + ok := msg.Jid < 1 || msg.What == what.Delete + if !ok { + _, ok = validJids[msg.Jid] + } + if ok { + select { + case <-rq.Done(): + case outboundMsgCh <- msg: + default: + panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg)) + } + } } return wsQueue[:0] } From 3c280beed34216fdcd59dc4d5080841f66e1daae Mon Sep 17 00:00:00 2001 From: Johan Lindh Date: Mon, 19 Feb 2024 13:32:09 +0100 Subject: [PATCH 2/2] move wsQueue to Request --- element.go | 15 ++---- element_test.go | 9 +++- errwebsocketqueueoverflow.go | 27 ----------- request.go | 91 ++++++++++++++++++++---------------- request_test.go | 52 ++++++++------------- uicontainer_test.go | 6 +-- uihtmlinner_test.go | 2 +- 7 files changed, 86 insertions(+), 116 deletions(-) delete mode 100644 errwebsocketqueueoverflow.go diff --git a/element.go b/element.go index e5cad54..05db2e8 100644 --- a/element.go +++ b/element.go @@ -15,7 +15,6 @@ type Element struct { *Request // (read-only) the Request the Element belongs to // internals ui UI // the UI object - wsQueue []wsMsg // changes queued handlers []EventHandler // custom event handlers registered, if any jid jid.Jid // JaWS ID, unique to this Element within it's Request updating bool // about to have Update() called @@ -65,15 +64,11 @@ func (e *Element) Render(w io.Writer, params []any) (err error) { func (e *Element) queue(wht what.What, data string) { if !e.deleted { - if len(e.wsQueue) < maxWsQueueLengthPerElement { - e.wsQueue = append(e.wsQueue, wsMsg{ - Data: data, - Jid: e.jid, - What: wht, - }) - } else { - e.Request.cancel(newErrWebsocketQueueOverflow(e)) - } + e.Request.queue(wsMsg{ + Data: data, + Jid: e.jid, + What: wht, + }) } } diff --git a/element_test.go b/element_test.go index 4f4c4a0..49f55c3 100644 --- a/element_test.go +++ b/element_test.go @@ -79,7 +79,12 @@ func TestElement_Tag(t *testing.T) { is.True(!e.HasTag(Tag("zomg"))) e.Tag(Tag("zomg")) is.True(e.HasTag(Tag("zomg"))) - is.True(strings.Contains(e.String(), "zomg")) + rq.mu.RLock() + defer rq.mu.RUnlock() + s := e.String() + if !strings.Contains(s, "zomg") { + t.Error(s) + } } func TestElement_Queued(t *testing.T) { @@ -100,7 +105,7 @@ func TestElement_Queued(t *testing.T) { e.Order([]jid.Jid{1, 2}) replaceHtml := template.HTML(fmt.Sprintf("
", e.Jid().String())) e.Replace(replaceHtml) - th.Equal(e.wsQueue, []wsMsg{ + th.Equal(rq.wsQueue, []wsMsg{ { Data: "hidden\n", Jid: e.jid, diff --git a/errwebsocketqueueoverflow.go b/errwebsocketqueueoverflow.go deleted file mode 100644 index edfea39..0000000 --- a/errwebsocketqueueoverflow.go +++ /dev/null @@ -1,27 +0,0 @@ -package jaws - -import ( - "fmt" -) - -// ErrWebsocketQueueOverflow is returned when an Element queues up too many changes. -// Try reducing the number of JawsUpdate() calls to the element or the number of -// changes made during JawsUpdate() for the UI element. -var ErrWebsocketQueueOverflow errWebsocketQueueOverflow - -type errWebsocketQueueOverflow struct { - str string -} - -func (e errWebsocketQueueOverflow) Error() string { - return fmt.Sprintf("WebSocket queue overflow on %s", e.str) -} - -func (e errWebsocketQueueOverflow) Is(target error) (yes bool) { - _, yes = target.(errWebsocketQueueOverflow) - return -} - -func newErrWebsocketQueueOverflow(e *Element) error { - return errWebsocketQueueOverflow{str: e.String()} -} diff --git a/request.go b/request.go index 1e83339..4727c7b 100644 --- a/request.go +++ b/request.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/netip" + "sort" "strconv" "strings" "sync/atomic" @@ -43,6 +44,7 @@ type Request struct { connectFn ConnectFn // a ConnectFn to call before starting message processing for the Request elems []*Element tagMap map[any][]*Element + wsQueue []wsMsg } type eventFnCall struct { @@ -249,12 +251,12 @@ func (rq *Request) TagsOf(elem *Element) (tags []any) { if elem != nil { if rq.mu.TryRLock() { defer rq.mu.RUnlock() - for tag, elems := range rq.tagMap { - for _, e := range elems { - if e == elem { - tags = append(tags, tag) - break - } + } + for tag, elems := range rq.tagMap { + for _, e := range elems { + if e == elem { + tags = append(tags, tag) + break } } } @@ -446,16 +448,12 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM } }() - var wsQueue []wsMsg - for { var tagmsg Message var wsmsg wsMsg var ok bool - if len(wsQueue) > 0 { - wsQueue = rq.sendQueue(outboundMsgCh, wsQueue) - } + rq.sendQueue(outboundMsgCh) // Empty the dirty tags list and call JawsUpdate() // for identified elements. This queues up wsMsg's @@ -464,18 +462,16 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM elem.Update() } - // Append pending WS messages to the queue + /*// Append pending WS messages to the queue // in the order of Element creation. rq.mu.RLock() for _, elem := range rq.elems { wsQueue = append(wsQueue, elem.wsQueue...) elem.wsQueue = elem.wsQueue[:0] } - rq.mu.RUnlock() + rq.mu.RUnlock()*/ - if len(wsQueue) > 0 { - wsQueue = rq.sendQueue(outboundMsgCh, wsQueue) - } + rq.sendQueue(outboundMsgCh) select { case <-jawsDoneCh: @@ -509,7 +505,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM case *Request: case string: // target is a regular HTML ID - wsQueue = append(wsQueue, wsMsg{ + rq.queue(wsMsg{ Data: v + "\t" + strconv.Quote(tagmsg.Data), What: tagmsg.What, Jid: -1, @@ -520,7 +516,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM switch tagmsg.What { case what.Reload, what.Redirect, what.Order, what.Alert: - wsQueue = append(wsQueue, wsMsg{ + rq.queue(wsMsg{ Jid: 0, Data: tagmsg.Data, What: tagmsg.What, @@ -529,7 +525,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM for _, elem := range todo { switch tagmsg.What { case what.Delete: - wsQueue = append(wsQueue, wsMsg{ + rq.queue(wsMsg{ Jid: elem.Jid(), What: what.Delete, }) @@ -546,7 +542,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM // an error to be sent out as an alert message. // primary usecase is tests. if err := rq.Jaws.Log(rq.callAllEventHandlers(elem.Jid(), tagmsg.What, tagmsg.Data)); err != nil { - wsQueue = append(wsQueue, wsMsg{ + rq.queue(wsMsg{ Data: tagmsg.Data, Jid: elem.Jid(), What: what.Alert, @@ -555,7 +551,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM case what.Update: elem.Update() default: - wsQueue = append(wsQueue, wsMsg{ + rq.queue(wsMsg{ Data: tagmsg.Data, Jid: elem.Jid(), What: tagmsg.What, @@ -576,6 +572,12 @@ func (rq *Request) handleRemove(data string) { } } +func (rq *Request) queue(msg wsMsg) { + // rq.mu.Lock() + rq.wsQueue = append(rq.wsQueue, msg) + // rq.mu.Unlock() +} + func (rq *Request) callAllEventHandlers(id Jid, wht what.What, val string) (err error) { var elems []*Element rq.mu.RLock() @@ -628,30 +630,38 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) { } } -func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg, wsQueue []wsMsg) []wsMsg { +func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg) { + var toSend []wsMsg validJids := map[Jid]struct{}{} - rq.mu.RLock() - for _, elem := range rq.elems { - if !elem.deleted { - validJids[elem.Jid()] = struct{}{} - } - } - rq.mu.RUnlock() - for _, msg := range wsQueue { - ok := msg.Jid < 1 || msg.What == what.Delete - if !ok { - _, ok = validJids[msg.Jid] + + rq.mu.Lock() + if len(rq.wsQueue) > 0 { + for _, elem := range rq.elems { + if !elem.deleted { + validJids[elem.Jid()] = struct{}{} + } } - if ok { - select { - case <-rq.Done(): - case outboundMsgCh <- msg: - default: - panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg)) + for _, msg := range rq.wsQueue { + ok := msg.Jid < 1 || msg.What == what.Delete + if !ok { + _, ok = validJids[msg.Jid] + } + if ok { + toSend = append(toSend, msg) } } + rq.wsQueue = rq.wsQueue[:0] + } + rq.mu.Unlock() + + for _, msg := range toSend { + select { + case <-rq.Done(): + case outboundMsgCh <- msg: + default: + panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg)) + } } - return wsQueue[:0] } func deleteElement(s []*Element, e *Element) []*Element { @@ -703,6 +713,7 @@ func (rq *Request) makeUpdateList() (todo []*Element) { elem.updating = false } rq.todoDirt = rq.todoDirt[:0] + sort.Slice(todo, func(i, j int) bool { return todo[i].Jid() < todo[j].Jid() }) return } diff --git a/request_test.go b/request_test.go index 5e6ddba..9074ba9 100644 --- a/request_test.go +++ b/request_test.go @@ -3,9 +3,6 @@ package jaws import ( "context" "errors" - "html/template" - "net/http" - "net/http/httptest" "strconv" "strings" "sync/atomic" @@ -68,14 +65,18 @@ func TestRequest_SendArrivesOk(t *testing.T) { rq := newTestRequest() defer rq.Close() jid := rq.Register("foo") + elem := rq.getElementByJid(jid) + is.True(elem != nil) rq.jw.Broadcast(Message{Dest: Tag("foo"), What: what.Inner, Data: "bar"}) select { - case <-time.NewTimer(testTimeout).C: - is.Fail() + case <-time.NewTimer(time.Hour).C: + is.Error("timeout") case msg := <-rq.outCh: elem := rq.getElementByJid(jid) is.True(elem != nil) - is.Equal(msg, wsMsg{Jid: elem.jid, Data: "bar", What: what.Inner}) + if elem != nil { + is.Equal(msg, wsMsg{Jid: elem.jid, Data: "bar", What: what.Inner}) + } } } @@ -573,38 +574,23 @@ func TestRequest_ConnectFn(t *testing.T) { th.Equal(rq.onConnect(), wantErr) } -func TestRequest_WsQueueOverflowCancels(t *testing.T) { - th := newTestHelper(t) - jw := New() - defer jw.Close() - hr := httptest.NewRequest(http.MethodGet, "/", nil) - rq := jw.NewRequest(hr) - elem := rq.NewElement(NewUiDiv(makeHtmlGetter("foo"))) - go func() { - for i := 0; i < maxWsQueueLengthPerElement*10; i++ { - elem.SetInner(template.HTML(strconv.Itoa(i))) - } - }() - select { - case <-th.C: - th.Timeout() - case <-rq.Done(): - } - th.True(errors.Is(context.Cause(rq.Context()), ErrWebsocketQueueOverflow)) -} - func TestRequest_Dirty(t *testing.T) { th := newTestHelper(t) rq := newTestRequest() defer rq.Close() - tss := &testUi{s: "foo"} - rq.UI(NewUiText(tss)) - th.Equal(tss.getCalled, int32(1)) - th.True(strings.Contains(string(rq.BodyString()), "foo")) - - rq.Dirty(tss) - for atomic.LoadInt32(&tss.getCalled) < 2 { + tss1 := &testUi{s: "foo1"} + tss2 := &testUi{s: "foo2"} + rq.UI(NewUiText(tss1)) + rq.UI(NewUiText(tss2)) + th.Equal(tss1.getCalled, int32(1)) + th.Equal(tss2.getCalled, int32(1)) + th.True(strings.Contains(string(rq.BodyString()), "foo1")) + th.True(strings.Contains(string(rq.BodyString()), "foo2")) + + rq.Dirty(tss1) + rq.Dirty(tss2) + for atomic.LoadInt32(&tss1.getCalled) < 2 && atomic.LoadInt32(&tss2.getCalled) < 2 { select { case <-th.C: th.Timeout() diff --git a/uicontainer_test.go b/uicontainer_test.go index 4ba998f..c5876e3 100644 --- a/uicontainer_test.go +++ b/uicontainer_test.go @@ -176,9 +176,9 @@ func TestRequest_Container_Alteration(t *testing.T) { t.Fatal(err) } tt.c.contents = tt.l - ui.JawsUpdate(elem) - if !slices.Equal(elem.wsQueue, tt.want) { - t.Errorf("got %v, want %v", elem.wsQueue, tt.want) + elem.Update() + if !slices.Equal(rq.wsQueue, tt.want) { + t.Errorf("got %v, want %v", rq.wsQueue, tt.want) } }) } diff --git a/uihtmlinner_test.go b/uihtmlinner_test.go index 958de8f..af9dafd 100644 --- a/uihtmlinner_test.go +++ b/uihtmlinner_test.go @@ -34,7 +34,7 @@ func TestUiHtmlInner_JawsUpdate(t *testing.T) { Jid: 1, What: what.Inner, }} - if !slices.Equal(elem.wsQueue, want) { + if !slices.Equal(rq.wsQueue, want) { t.Errorf("got %v, want %v", elem.wsQueue, want) } }