Skip to content

Commit

Permalink
Merge pull request #37 from linkdata/direct-ws-writing
Browse files Browse the repository at this point in the history
Direct ws writing
  • Loading branch information
linkdata authored Feb 16, 2024
2 parents 04ecbc1 + 67a8ae4 commit a9bb588
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 73 deletions.
16 changes: 7 additions & 9 deletions html.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func appendAttrs(b []byte, attrs []template.HTMLAttr) []byte {
}

func WriteHtmlTag(w io.Writer, jid jid.Jid, htmlTag, typeAttr, valueAttr string, attrs []template.HTMLAttr) (err error) {
b := make([]byte, 0, 64)
b = jid.AppendStartTagAttr(b, htmlTag)
b := jid.AppendStartTagAttr(nil, htmlTag)
if typeAttr != "" {
b = append(b, ` type=`...)
b = strconv.AppendQuote(b, typeAttr)
Expand All @@ -66,13 +65,12 @@ func WriteHtmlInput(w io.Writer, jid jid.Jid, typeAttr, valueAttr string, attrs
func WriteHtmlInner(w io.Writer, jid jid.Jid, htmlTag, typeAttr string, innerHtml template.HTML, attrs ...template.HTMLAttr) (err error) {
if err = WriteHtmlTag(w, jid, htmlTag, typeAttr, "", attrs); err == nil {
if innerHtml != "" || needClosingTag(htmlTag) {
if _, err = w.Write([]byte(innerHtml)); err == nil {
var b []byte
b = append(b, "</"...)
b = append(b, htmlTag...)
b = append(b, '>')
_, err = w.Write(b)
}
var b []byte
b = append(b, innerHtml...)
b = append(b, "</"...)
b = append(b, htmlTag...)
b = append(b, '>')
_, err = w.Write(b)
}
}
return
Expand Down
28 changes: 13 additions & 15 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,12 @@ func (rq *Request) Done() (ch <-chan struct{}) {
}

// process is the main message processing loop. Will unsubscribe broadcastMsgCh and close outboundMsgCh on exit.
func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsMsg, outboundCh chan<- string) {
func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsMsg, outboundMsgCh chan<- wsMsg) {
jawsDoneCh := rq.Jaws.Done()
ctxDoneCh := rq.Done()
eventDoneCh := make(chan struct{})
eventCallCh := make(chan eventFnCall, cap(outboundCh))
go rq.eventCaller(eventCallCh, outboundCh, eventDoneCh)
eventCallCh := make(chan eventFnCall, cap(outboundMsgCh))
go rq.eventCaller(eventCallCh, outboundMsgCh, eventDoneCh)

defer func() {
rq.Jaws.unsubscribe(broadcastMsgCh)
Expand All @@ -433,7 +433,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
case <-eventCallCh:
case <-incomingMsgCh:
case <-eventDoneCh:
close(outboundCh)
close(outboundMsgCh)
if x := recover(); x != nil {
var err error
var ok bool
Expand All @@ -455,7 +455,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
var ok bool

if len(wsQueue) > 0 {
wsQueue = rq.sendQueue(outboundCh, wsQueue)
wsQueue = rq.sendQueue(outboundMsgCh, wsQueue)
}

// Empty the dirty tags list and call JawsUpdate()
Expand All @@ -475,7 +475,7 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
rq.mu.RUnlock()

if len(wsQueue) > 0 {
wsQueue = rq.sendQueue(outboundCh, wsQueue)
wsQueue = rq.sendQueue(outboundMsgCh, wsQueue)
}

select {
Expand Down Expand Up @@ -629,21 +629,19 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) {
}
}

func (rq *Request) wsSend(outboundCh chan<- string, s string) {
func (rq *Request) wsSend(outboundMsgCh chan<- wsMsg, msg wsMsg) {
select {
case <-rq.Done():
case outboundCh <- s:
case outboundMsgCh <- msg:
default:
panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundCh), s))
panic(fmt.Errorf("jaws: %v: outbound message channel is full (%d) sending %s", rq, len(outboundMsgCh), msg))
}
}

func (rq *Request) sendQueue(outboundCh chan<- string, wsQueue []wsMsg) []wsMsg {
var sb strings.Builder
func (rq *Request) sendQueue(outboundMsgCh chan<- wsMsg, wsQueue []wsMsg) []wsMsg {
for _, msg := range wsQueue {
sb.WriteString(msg.Format())
rq.wsSend(outboundMsgCh, msg)
}
rq.wsSend(outboundCh, sb.String())
return wsQueue[:0]
}

Expand Down Expand Up @@ -700,14 +698,14 @@ func (rq *Request) makeUpdateList() (todo []*Element) {
}

// eventCaller calls event functions
func (rq *Request) eventCaller(eventCallCh <-chan eventFnCall, outboundCh chan<- string, eventDoneCh chan<- struct{}) {
func (rq *Request) eventCaller(eventCallCh <-chan eventFnCall, outboundMsgCh chan<- wsMsg, eventDoneCh chan<- struct{}) {
defer close(eventDoneCh)
for call := range eventCallCh {
if err := rq.callAllEventHandlers(call.jid, call.wht, call.data); err != nil {
var m wsMsg
m.FillAlert(err)
select {
case outboundCh <- m.Format():
case outboundMsgCh <- m:
default:
_ = rq.Jaws.Log(fmt.Errorf("jaws: outboundMsgCh full sending event error '%s'", err.Error()))
}
Expand Down
57 changes: 42 additions & 15 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (

const testTimeout = time.Second * 3

func fillWsCh(ch chan string) {
func fillWsCh(ch chan wsMsg) {
for {
select {
case ch <- "":
case ch <- wsMsg{}:
default:
return
}
Expand Down Expand Up @@ -72,9 +72,7 @@ func TestRequest_SendArrivesOk(t *testing.T) {
select {
case <-time.NewTimer(testTimeout).C:
is.Fail()
case msgstr := <-rq.outCh:
msg, ok := wsParse([]byte(msgstr))
is.True(ok)
case msg := <-rq.outCh:
elem := rq.getElementByJid(jid)
is.True(elem != nil)
is.Equal(msg, wsMsg{Jid: elem.jid, Data: "bar", What: what.Inner})
Expand Down Expand Up @@ -214,7 +212,7 @@ func TestRequest_Trigger(t *testing.T) {
case <-th.C:
th.Timeout()
case msg := <-rq.outCh:
th.Equal(msg, (&wsMsg{
th.Equal(msg.Format(), (&wsMsg{
Data: "danger\nomg",
Jid: jid.Jid(0),
What: what.Alert,
Expand Down Expand Up @@ -393,7 +391,8 @@ func TestRequest_Alert(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq1.outCh:
case msg := <-rq1.outCh:
s := msg.Format()
if s != "Alert\t\t\"info\\n<html>\\nnot\\tescaped\"\n" {
t.Errorf("%q", s)
}
Expand All @@ -416,7 +415,8 @@ func TestRequest_Redirect(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq1.outCh:
case msg := <-rq1.outCh:
s := msg.Format()
if s != "Redirect\t\t\"some-url\"\n" {
t.Errorf("%q", s)
}
Expand All @@ -437,7 +437,8 @@ func TestRequest_AlertError(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\n&lt;html&gt;\\nshould-be-escaped\"\n" {
t.Errorf("%q", s)
}
Expand Down Expand Up @@ -478,19 +479,43 @@ func TestRequest_DeleteByTag(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq1.outCh:
if s != "Delete\tJid.1\t\"\"\nDelete\tJid.3\t\"\"\n" {
case msg := <-rq1.outCh:
s := msg.Format()
if s != "Delete\tJid.1\t\"\"\n" {
t.Errorf("%q", s)
}
}

select {
case <-th.C:
th.Timeout()
case s := <-rq2.outCh:
if s != "Delete\tJid.4\t\"\"\nDelete\tJid.6\t\"\"\n" {
case msg := <-rq1.outCh:
s := msg.Format()
if s != "Delete\tJid.3\t\"\"\n" {
t.Errorf("%q", s)
}
}

select {
case <-th.C:
th.Timeout()
case msg := <-rq2.outCh:
s := msg.Format()
if s != "Delete\tJid.4\t\"\"\n" {
t.Errorf("%q", s)
}
}

select {
case <-th.C:
th.Timeout()
case msg := <-rq2.outCh:
s := msg.Format()
if s != "Delete\tJid.6\t\"\"\n" {
t.Errorf("%q", s)
}
}

}

func TestRequest_HtmlIdBroadcast(t *testing.T) {
Expand All @@ -508,15 +533,17 @@ func TestRequest_HtmlIdBroadcast(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq1.outCh:
case msg := <-rq1.outCh:
s := msg.Format()
if s != "Inner\tfooId\t\"inner\"\n" {
t.Errorf("%q", s)
}
}
select {
case <-th.C:
th.Timeout()
case s := <-rq2.outCh:
case msg := <-rq2.outCh:
s := msg.Format()
if s != "Inner\tfooId\t\"inner\"\n" {
t.Errorf("%q", s)
}
Expand Down
4 changes: 2 additions & 2 deletions testjaws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type testRequest struct {
readyCh chan struct{}
doneCh chan struct{}
inCh chan wsMsg
outCh chan string
outCh chan wsMsg
bcastCh chan Message
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -70,7 +70,7 @@ func (tj *testJaws) newRequest(hr *http.Request) (tr *testRequest) {
readyCh: make(chan struct{}),
doneCh: make(chan struct{}),
inCh: make(chan wsMsg),
outCh: make(chan string, cap(bcastCh)),
outCh: make(chan wsMsg, cap(bcastCh)),
bcastCh: bcastCh,
ctx: ctx,
cancel: cancel,
Expand Down
9 changes: 6 additions & 3 deletions uicheckbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestRequest_Checkbox(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Value\tJid.1\t\"true\"\n" {
t.Errorf("%q", s)
}
Expand All @@ -58,7 +59,8 @@ func TestRequest_Checkbox(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\nstrconv.ParseBool: parsing &#34;omg&#34;: invalid syntax\"\n" {
t.Errorf("wrong Alert: %q", s)
}
Expand All @@ -69,7 +71,8 @@ func TestRequest_Checkbox(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\nmeh\"\n" {
t.Errorf("wrong Alert: %q", s)
}
Expand Down
9 changes: 6 additions & 3 deletions uidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestRequest_Date(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != fmt.Sprintf("Value\tJid.1\t\"%s\"\n", val.Format(ISO8601)) {
t.Error("wrong Value")
}
Expand All @@ -62,7 +63,8 @@ func TestRequest_Date(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\nparsing time &#34;omg&#34; as &#34;2006-01-02&#34;: cannot parse &#34;omg&#34; as &#34;2006&#34;\"\n" {
t.Errorf("wrong Alert: %q", s)
}
Expand All @@ -73,7 +75,8 @@ func TestRequest_Date(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\nmeh\"\n" {
t.Errorf("wrong Alert: %q", s)
}
Expand Down
3 changes: 2 additions & 1 deletion uiimg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestRequest_Img(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "SAttr\tJid.1\t\"src\\n\\\"unquoted.jpg\\\"\"\n" {
t.Error(strconv.Quote(s))
}
Expand Down
9 changes: 6 additions & 3 deletions uinumber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func TestRequest_Number(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != fmt.Sprintf("Value\tJid.1\t\"%v\"\n", val) {
t.Error("wrong Value")
}
Expand All @@ -59,7 +60,8 @@ func TestRequest_Number(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\nstrconv.ParseFloat: parsing &#34;omg&#34;: invalid syntax\"\n" {
t.Errorf("wrong Alert: %q", s)
}
Expand All @@ -70,7 +72,8 @@ func TestRequest_Number(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "Alert\t\t\"danger\\nmeh\"\n" {
t.Errorf("wrong Alert: %q", s)
}
Expand Down
6 changes: 4 additions & 2 deletions uioption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestUiOption(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "RAttr\tJid.1\t\"selected\"\n" {
t.Errorf("%q", s)
}
Expand All @@ -41,7 +42,8 @@ func TestUiOption(t *testing.T) {
select {
case <-th.C:
th.Timeout()
case s := <-rq.outCh:
case msg := <-rq.outCh:
s := msg.Format()
if s != "SAttr\tJid.1\t\"selected\\n\"\n" {
t.Errorf("%q", s)
}
Expand Down
Loading

0 comments on commit a9bb588

Please sign in to comment.