Skip to content

Commit

Permalink
send wsMsgs in Jid order
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdata committed Feb 26, 2024
1 parent 214234c commit 2a03431
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/netip"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -631,34 +632,36 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) {

func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg) {
var toSend []wsMsg
validJids := map[Jid]struct{}{}

rq.mu.Lock()
if len(rq.wsQueue) > 0 {
validJids := map[Jid]struct{}{}
for _, elem := range rq.elems {
if !elem.deleted {
validJids[elem.Jid()] = struct{}{}
}
}
for _, msg := range rq.wsQueue {
ok := msg.Jid < 1 || msg.What == what.Delete
for i := range rq.wsQueue {
ok := rq.wsQueue[i].Jid < 1 || rq.wsQueue[i].What == what.Delete
if !ok {
_, ok = validJids[msg.Jid]
_, ok = validJids[rq.wsQueue[i].Jid]
}
if ok {
toSend = append(toSend, msg)
toSend = append(toSend, rq.wsQueue[i])
}
}
rq.wsQueue = rq.wsQueue[:0]
}
rq.mu.Unlock()

for _, msg := range toSend {
slices.SortStableFunc(toSend, func(a, b wsMsg) int { return int(a.Jid - b.Jid) })

for i := range toSend {
select {
case <-rq.Done():
case outboundMsgCh <- msg:
case outboundMsgCh <- toSend[i]:
default:
panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg))
panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), toSend[i]))
}
}
}
Expand Down Expand Up @@ -699,19 +702,18 @@ func (rq *Request) deleteElement(e *Element) {

func (rq *Request) makeUpdateList() (todo []*Element) {
rq.mu.Lock()
defer rq.mu.Unlock()
seen := map[*Element]struct{}{}
for _, tag := range rq.todoDirt {
for _, elem := range rq.tagMap[tag] {
if !elem.updating {
elem.updating = true
if _, ok := seen[elem]; !ok {
seen[elem] = struct{}{}
todo = append(todo, elem)
}
}
}
for _, elem := range todo {
elem.updating = false
}
clear(rq.todoDirt)
rq.todoDirt = rq.todoDirt[:0]
rq.mu.Unlock()
sort.Slice(todo, func(i, j int) bool { return todo[i].Jid() < todo[j].Jid() })
return
}
Expand Down

0 comments on commit 2a03431

Please sign in to comment.