diff --git a/request.go b/request.go index aa74e49..f12a555 100644 --- a/request.go +++ b/request.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/netip" + "slices" "sort" "strconv" "strings" @@ -631,34 +632,36 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) { func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg) { var toSend []wsMsg - validJids := map[Jid]struct{}{} rq.mu.Lock() if len(rq.wsQueue) > 0 { + validJids := map[Jid]struct{}{} for _, elem := range rq.elems { if !elem.deleted { validJids[elem.Jid()] = struct{}{} } } - for _, msg := range rq.wsQueue { - ok := msg.Jid < 1 || msg.What == what.Delete + for i := range rq.wsQueue { + ok := rq.wsQueue[i].Jid < 1 || rq.wsQueue[i].What == what.Delete if !ok { - _, ok = validJids[msg.Jid] + _, ok = validJids[rq.wsQueue[i].Jid] } if ok { - toSend = append(toSend, msg) + toSend = append(toSend, rq.wsQueue[i]) } } rq.wsQueue = rq.wsQueue[:0] } rq.mu.Unlock() - for _, msg := range toSend { + slices.SortStableFunc(toSend, func(a, b wsMsg) int { return int(a.Jid - b.Jid) }) + + for i := range toSend { select { case <-rq.Done(): - case outboundMsgCh <- msg: + case outboundMsgCh <- toSend[i]: default: - panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg)) + panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), toSend[i])) } } } @@ -699,19 +702,18 @@ func (rq *Request) deleteElement(e *Element) { func (rq *Request) makeUpdateList() (todo []*Element) { rq.mu.Lock() - defer rq.mu.Unlock() + seen := map[*Element]struct{}{} for _, tag := range rq.todoDirt { for _, elem := range rq.tagMap[tag] { - if !elem.updating { - elem.updating = true + if _, ok := seen[elem]; !ok { + seen[elem] = struct{}{} todo = append(todo, elem) } } } - for _, elem := range todo { - elem.updating = false - } + clear(rq.todoDirt) rq.todoDirt = rq.todoDirt[:0] + rq.mu.Unlock() sort.Slice(todo, func(i, j int) bool { return todo[i].Jid() < todo[j].Jid() }) return }