Skip to content

Commit

Permalink
remove the direct send channel from Request
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdata committed Oct 18, 2023
1 parent df9ffc7 commit 96edbbf
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 108 deletions.
2 changes: 1 addition & 1 deletion jaws.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func New() (jw *Jaws) {

// Close frees resources associated with the JaWS object, and
// closes the completion channel if the JaWS was created with New().
// Once the completion channel is closed, broadcasts and sends are discarded.
// Once the completion channel is closed, broadcasts and sends may be discarded.
// Subsequent calls to Close() have no effect.
func (jw *Jaws) Close() {
jw.mu.Lock()
Expand Down
4 changes: 1 addition & 3 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ type Message struct {
Dest interface{} // destination (tag, html ID or *Element)
What what.What // what to change or do
Data interface{} // data (e.g. inner HTML content or slice of tags)
from interface{} // don't send to this
}

// String returns the Message in a form suitable for debug output.
func (msg *Message) String() string {
return fmt.Sprintf("{%q, %v, %q, %v}",
return fmt.Sprintf("{%q, %v, %q}",
msg.Dest,
msg.What,
msg.Data,
msg.from,
)
}
16 changes: 3 additions & 13 deletions message_test.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
package jaws

import (
"fmt"
"testing"

"github.com/linkdata/jaws/what"
"github.com/matryer/is"
)

func Test_Message_String(t *testing.T) {
is := is.New(t)
msg := Message{
Dest: "Elem",
What: what.Update,
Data: "Data\nText",
}
is.Equal(msg.String(), "{\"Elem\", Update, \"Data\\nText\", <nil>}")
const jawsKey = uint64(0xcafebabe)
msg.from = &Request{JawsKey: jawsKey}
keyStr := JawsKeyString(jawsKey)
keyVal := JawsKeyValue(keyStr)
is.Equal(keyVal, jawsKey)
is.Equal(uint64(0), JawsKeyValue(""))
is.Equal(msg.String(), fmt.Sprintf("{\"Elem\", Update, \"Data\\nText\", Request<%s>}", keyStr))
msg.from = &Request{JawsKey: 0}
is.Equal(msg.String(), "{\"Elem\", Update, \"Data\\nText\", Request<>}")
if msg.String() != "{\"Elem\", Update, \"Data\\nText\"}" {
t.Fail()
}
}
70 changes: 13 additions & 57 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Request struct {
Initial *http.Request // (read-only) initial HTTP request passed to Jaws.NewRequest
remoteIP net.IP // (read-only) remote IP, or nil
session *Session // (read-only) session, if established
sendCh chan Message // (read-only) direct send message channel
mu deadlock.RWMutex // protects following
dirty []interface{} // dirty tags
ctx context.Context // current context, derived from either Jaws or WS HTTP req
Expand All @@ -59,7 +58,6 @@ var requestPool = sync.Pool{New: newRequest}

func newRequest() interface{} {
rq := &Request{
sendCh: make(chan Message),
tagMap: make(map[interface{}][]*Element),
}
return rq
Expand Down Expand Up @@ -186,12 +184,6 @@ func (rq *Request) Set(key string, val interface{}) {
rq.Session().Set(key, val)
}

// Broadcast sends a broadcast to all Requests except the current one.
func (rq *Request) Broadcast(msg Message) {
msg.from = rq
rq.Jaws.Broadcast(msg)
}

// Context returns the Request's Context, which is derived from the
// WebSocket's HTTP requests Context.
func (rq *Request) Context() (ctx context.Context) {
Expand All @@ -210,45 +202,22 @@ func (rq *Request) cancel(err error) {
}
}

func (rq *Request) getDoneCh() (jawsDoneCh, ctxDoneCh <-chan struct{}) {
rq.mu.RLock()
defer rq.mu.RUnlock()
if rq.Jaws == nil {
panic("Request.Send(): request is dead")
}
jawsDoneCh = rq.Jaws.Done()
ctxDoneCh = rq.ctx.Done()
return
}

// Send a message to the current Request only.
// Returns true if the message was successfully sent.
func (rq *Request) Send(msg Message) bool {
jawsDoneCh, ctxDoneCh := rq.getDoneCh()
select {
case <-jawsDoneCh:
case <-ctxDoneCh:
case rq.sendCh <- msg:
return true
}
return false
}

// Alert attempts to show an alert message on the current request webpage if it has an HTML element with the id 'jaws-alert'.
// The lvl argument should be one of Bootstraps alert levels: primary, secondary, success, danger, warning, info, light or dark.
//
// The default JaWS javascript only supports Bootstrap.js dismissable alerts.
func (rq *Request) Alert(lvl, msg string) {
rq.Send(Message{
rq.Jaws.Broadcast(Message{
Dest: rq,
What: what.Alert,
Data: lvl + "\n" + msg,
})
}

// AlertError calls Alert if the given error is not nil.
func (rq *Request) AlertError(err error) {
if err != nil {
rq.Send(makeAlertDangerMessage(rq.Jaws.Log(err)))
if rq.Jaws.Log(err) != nil {
rq.Alert("danger", html.EscapeString(err.Error()))
}
}

Expand All @@ -273,7 +242,8 @@ func (rq *Request) makeIdList(tags []interface{}) string {

// Redirect requests the current Request to navigate to the given URL.
func (rq *Request) Redirect(url string) {
rq.Send(Message{
rq.Jaws.Broadcast(Message{
Dest: rq,
What: what.Redirect,
Data: url,
})
Expand Down Expand Up @@ -332,7 +302,7 @@ func (rq *Request) Dirty(tags ...interface{}) {

// wantMessage returns true if the Request want the message.
func (rq *Request) wantMessage(msg *Message) (yes bool) {
if rq != nil && msg.from != rq {
if rq != nil {
switch dest := msg.Dest.(type) {
case *Request:
yes = dest == rq
Expand Down Expand Up @@ -467,7 +437,6 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
for {
select {
case <-eventCallCh:
case <-rq.sendCh:
case <-incomingMsgCh:
case <-eventDoneCh:
close(outboundCh)
Expand Down Expand Up @@ -518,7 +487,6 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
select {
case <-jawsDoneCh:
case <-ctxDoneCh:
case tagmsg, ok = <-rq.sendCh:
case tagmsg, ok = <-broadcastMsgCh:
case wsmsg, ok = <-incomingMsgCh:
if ok {
Expand Down Expand Up @@ -602,11 +570,11 @@ func (rq *Request) process(broadcastMsgCh chan Message, incomingMsgCh <-chan wsM
// the function must not send any messages itself, but may return
// an error to be sent out as an alert message.
// primary usecase is tests.
if errmsg := makeAlertDangerMessage(rq.callAllEventHandlers(elem.jid, tagmsg.What, wsdata)); errmsg.What != what.Update {
if err := rq.Jaws.Log(rq.callAllEventHandlers(elem.jid, tagmsg.What, wsdata)); err != nil {
wsQueue = append(wsQueue, wsMsg{
Data: wsdata,
Jid: elem.jid,
What: errmsg.What,
What: what.Alert,
})
}
case what.Update:
Expand All @@ -627,10 +595,8 @@ func (rq *Request) handleRemove(data string) {
rq.mu.Lock()
defer rq.mu.Unlock()
for _, jidstr := range strings.Split(data, "\t") {
if jid := jid.ParseString(jidstr); jid.IsValid() {
if e := rq.getElementLocked(jid); e != nil {
rq.deleteElementLocked(e)
}
if e := rq.getElementLocked(jid.ParseString(jidstr)); e != nil {
rq.deleteElementLocked(e)
}
}
}
Expand Down Expand Up @@ -685,7 +651,7 @@ func (rq *Request) queueEvent(eventCallCh chan eventFnCall, call eventFnCall) {
}
}

func (rq *Request) send(outboundCh chan<- string, s string) {
func (rq *Request) wsSend(outboundCh chan<- string, s string) {
select {
case <-rq.Jaws.Done():
case <-rq.Done():
Expand All @@ -700,7 +666,7 @@ func (rq *Request) sendQueue(outboundCh chan<- string, wsQueue []wsMsg) []wsMsg
for _, msg := range wsQueue {
sb.WriteString(msg.Format())
}
rq.send(outboundCh, sb.String())
rq.wsSend(outboundCh, sb.String())
return wsQueue[:0]
}

Expand Down Expand Up @@ -770,13 +736,3 @@ func (rq *Request) onConnect() (err error) {
}
return
}

func makeAlertDangerMessage(err error) (msg Message) {
if err != nil {
msg = Message{
Data: "danger\n" + html.EscapeString(err.Error()),
What: what.Alert,
}
}
return
}
44 changes: 22 additions & 22 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func fillWsCh(ch chan string) {
}
}

func fillTagCh(ch chan Message) {
/*func fillTagCh(ch chan Message) {
for {
select {
case ch <- Message{}:
default:
return
}
}
}
}*/

func TestRequest_Registrations(t *testing.T) {
is := is.New(t)
Expand All @@ -127,14 +127,14 @@ func TestRequest_Registrations(t *testing.T) {
is.True(jid != jid2)
}

func TestRequest_SendFailsWhenJawsClosed(t *testing.T) {
/*func TestRequest_SendFailsWhenJawsClosed(t *testing.T) {
is := is.New(t)
jw := New()
rq := jw.NewRequest(nil)
jw.UseRequest(rq.JawsKey, nil)
jw.Close()
is.Equal(rq.Send(Message{}), false)
}
is.Equal(rq.send(Message{}), false)
}*/

func TestRequest_SendPanicsAfterRecycle(t *testing.T) {
// can not run in parallel
Expand All @@ -150,10 +150,10 @@ func TestRequest_SendPanicsAfterRecycle(t *testing.T) {
defer jw.Close()
rq := jw.NewRequest(nil)
rq.recycle()
rq.Send(Message{})
rq.Jaws.Broadcast(Message{})
}

func TestRequest_SendFailsWhenContextDone(t *testing.T) {
/*func TestRequest_SendFailsWhenContextDone(t *testing.T) {
is := is.New(t)
jw := New()
defer jw.Close()
Expand All @@ -165,10 +165,9 @@ func TestRequest_SendFailsWhenContextDone(t *testing.T) {
if rq.cancelFn == nil {
is.Fail()
}
fillTagCh(rq.sendCh)
cancel()
is.Equal(rq.Send(Message{}), false)
}
is.Equal(rq.send(Message{}), false)
}*/

func TestRequest_HeadHTML(t *testing.T) {
is := is.New(t)
Expand All @@ -187,9 +186,7 @@ func TestRequest_SendArrivesOk(t *testing.T) {
rq := newTestRequest()
defer rq.Close()
jid := rq.Register("foo")
theMsg := Message{Dest: Tag("foo"), What: what.Inner, Data: "bar"}

is.Equal(rq.Send(theMsg), true)
rq.jw.Broadcast(Message{Dest: Tag("foo"), What: what.Inner, Data: "bar"})
select {
case <-time.NewTimer(testTimeout).C:
is.Fail()
Expand Down Expand Up @@ -261,7 +258,7 @@ func TestRequest_OutboundOverflowPanicsWithNoLogger(t *testing.T) {
defer rq.Close()
rq.Register(Tag("foo"))
fillWsCh(rq.outCh)
rq.sendCh <- Message{Dest: Tag("foo"), What: what.Inner, Data: "bar"}
rq.Jaws.Broadcast(Message{Dest: Tag("foo"), What: what.Inner, Data: "bar"})
select {
case <-time.NewTimer(testTimeout).C:
is.Fail()
Expand Down Expand Up @@ -290,8 +287,8 @@ func TestRequest_Trigger(t *testing.T) {
})

// broadcasts from ourselves should not invoke fn
rq.Broadcast(Message{Dest: Tag("foo"), What: what.Input, Data: "bar"})
rq.Broadcast(Message{Dest: Tag("err"), What: what.Input, Data: "baz"})
// rq.Broadcast(Message{Dest: Tag("foo"), What: what.Input, Data: "bar"})
// rq.Broadcast(Message{Dest: Tag("err"), What: what.Input, Data: "baz"})
rq.jw.Broadcast(Message{Dest: Tag("end"), What: what.Input, Data: ""}) // to know when to stop
select {
case <-time.NewTimer(testTimeout).C:
Expand Down Expand Up @@ -396,16 +393,18 @@ func TestRequest_EventFnQueueOverflowPanicsWithNoLogger(t *testing.T) {
rq.expectPanic = true
rq.jw.Logger = nil
tmr := time.NewTimer(testTimeout)

defer tmr.Stop()
for {
select {
case rq.sendCh <- Message{Dest: Tag("bomb"), What: what.Input}:
case <-rq.doneCh:
is.True(rq.panicked)
is.True(strings.Contains(rq.panicVal.(error).Error(), "eventCallCh is full sending"))
return
case <-tmr.C:
is.Fail()
default:
rq.Jaws.Broadcast(Message{Dest: Tag("bomb"), What: what.Input})
}
}
}
Expand Down Expand Up @@ -447,7 +446,7 @@ func TestRequest_IgnoresIncomingMsgsDuringShutdown(t *testing.T) {
// outbound channel is full, but with the
// event fn holding it won't be able to end
select {
case rq.sendCh <- Message{Dest: Tag("foo"), What: what.Inner, Data: ""}:
case rq.bcastCh <- Message{Dest: Tag("foo"), What: what.Inner, Data: ""}:
case <-time.NewTimer(testTimeout).C:
is.Fail()
case <-rq.doneCh:
Expand All @@ -459,11 +458,12 @@ func TestRequest_IgnoresIncomingMsgsDuringShutdown(t *testing.T) {
tmr := time.NewTimer(testTimeout)
for i := 0; i < cap(rq.outCh)*2; i++ {
select {
case rq.sendCh <- Message{}:
case <-rq.doneCh:
is.Fail()
case <-tmr.C:
is.Fail()
default:
rq.Jaws.Broadcast(Message{Dest: rq})
}
select {
case rq.inCh <- wsMsg{}:
Expand Down Expand Up @@ -513,17 +513,17 @@ func TestRequest_Sends(t *testing.T) {
case <-rq.readyCh:
}

rq.Send(Message{
rq.jw.Broadcast(Message{
Dest: Tag("SetAttr"),
What: what.SAttr,
Data: "bar\nbaz",
})
rq.Send(Message{
rq.jw.Broadcast(Message{
Dest: Tag("SetAttr"),
What: what.SAttr,
Data: "bar\nbaz",
})
rq.Send(Message{
rq.jw.Broadcast(Message{
Dest: Tag("RemoveAttr"),
What: what.RAttr,
Data: "bar",
Expand Down
Loading

0 comments on commit 96edbbf

Please sign in to comment.