Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to RelayFrame to reveal Arg2 offset #746

Merged
merged 7 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ type CallFrame interface {
// RoutingKey may refer to an alternate traffic group instead of the
// traffic group identified by the service name.
RoutingKey() []byte
// Arg2StartOffset returns the offset from start of payload to the
// beginning of Arg2 in bytes.
Arg2StartOffset() int
// Arg2EndOffset returns the offset from start of payload to the end of
// Arg2 in bytes, and hasMore to indicate if there are more frames and
// Arg3 has not started (i.e. Arg2 is fragmented).
Arg2EndOffset() (_ int, hasMore bool)
}

// Conn contains information about the underlying connection.
Expand Down
23 changes: 23 additions & 0 deletions relay_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type lazyCallReq struct {
*Frame

caller, method, delegate, key []byte

arg2StartOffset, arg2EndOffset int
isArg2Fragmented bool
}

// TODO: Consider pooling lazyCallReq and using pointers to the struct.
Expand Down Expand Up @@ -132,6 +135,13 @@ func newLazyCallReq(f *Frame) lazyCallReq {
arg1Len := int(binary.BigEndian.Uint16(f.Payload[cur : cur+2]))
cur += 2
cr.method = f.Payload[cur : cur+arg1Len]

// arg2~2
cur += arg1Len
cr.arg2StartOffset = cur + 2
cr.arg2EndOffset = cr.arg2StartOffset + int(binary.BigEndian.Uint16(f.Payload[cur:cur+2]))
// arg2 is fragmented if we don't see arg3 in this frame.
cr.isArg2Fragmented = int(cr.Header.PayloadSize()) <= cr.arg2EndOffset && cr.HasMoreFragments()
return cr
}

Expand Down Expand Up @@ -183,6 +193,19 @@ func (f lazyCallReq) HasMoreFragments() bool {
return f.Payload[_flagsIndex]&hasMoreFragmentsFlag != 0
}

// Arg2EndOffset returns the offset from start of payload to the end of Arg2
// in bytes, and hasMore to be true if there are more frames and arg3 has
// not started.
func (f lazyCallReq) Arg2EndOffset() (_ int, hasMore bool) {
return f.arg2EndOffset, f.isArg2Fragmented
}

// Arg2StartOffset returns the offset from start of payload to the beginning
// of Arg2 in bytes.
func (f lazyCallReq) Arg2StartOffset() int {
return f.arg2StartOffset
}

// finishesCall checks whether this frame is the last one we should expect for
// this RPC req-res.
func finishesCall(f *Frame) bool {
Expand Down
82 changes: 81 additions & 1 deletion relay_messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tchannel

import (
"fmt"
"testing"
"time"

Expand All @@ -41,17 +42,27 @@ const (
reqHasAll testCallReq = reqTotalCombinations - 1
)

type testCallReqParams struct {
flags byte
arg2Buf []byte
}

func (cr testCallReq) req() lazyCallReq {
return cr.reqWithParams(testCallReqParams{})
}

func (cr testCallReq) reqWithParams(p testCallReqParams) lazyCallReq {
// TODO: Constructing a frame is ugly because the initial flags byte is
// written in reqResWriter instead of callReq. We should instead handle that
// in callReq, which will allow our tests to be sane.
f := NewFrame(200)
fh := fakeHeader()
fh.size = 0xD8 // 200 + 16 bytes of header = 216 (0xD8)
f.Header = fh
fh.write(typed.NewWriteBuffer(f.headerBuffer))

payload := typed.NewWriteBuffer(f.Payload)
payload.WriteSingleByte(0) // flags
payload.WriteSingleByte(p.flags) // flags
payload.WriteUint32(42) // TTL
payload.WriteBytes(make([]byte, 25)) // tracing
payload.WriteLen8String("bankmoji") // service
Expand Down Expand Up @@ -79,6 +90,9 @@ func (cr testCallReq) req() lazyCallReq {
payload.WriteUint32(0) // checksum contents
}
payload.WriteLen16String("moneys") // method

payload.WriteUint16(uint16(len(p.arg2Buf)))
payload.WriteBytes(p.arg2Buf)
return newLazyCallReq(f)
}

Expand Down Expand Up @@ -272,6 +286,72 @@ func TestLazyCallReqSetTTL(t *testing.T) {
})
}

func TestLazyCallArg2Offset(t *testing.T) {
wantArg2Buf := []byte("test arg2 buf")
tests := []struct {
msg string
flags byte
arg2Buf []byte
}{
{
msg: "arg2 is fully contained in frame",
arg2Buf: wantArg2Buf,
},
{
msg: "has no arg2",
},
{
msg: "frame fragmented but arg2 is fully contained",
flags: hasMoreFragmentsFlag,
arg2Buf: wantArg2Buf,
},
}

for _, tt := range tests {
t.Run(tt.msg, func(t *testing.T) {
withLazyCallReqCombinations(func(crt testCallReq) {
cr := crt.reqWithParams(testCallReqParams{
flags: tt.flags,
arg2Buf: tt.arg2Buf,
})
arg2EndOffset, hasMore := cr.Arg2EndOffset()
assert.False(t, hasMore)
if len(tt.arg2Buf) == 0 {
assert.Zero(t, arg2EndOffset-cr.Arg2StartOffset())
return
}

arg2Payload := cr.Payload[cr.Arg2StartOffset():arg2EndOffset]
assert.Equal(t, tt.arg2Buf, arg2Payload)
})
})
}

t.Run("no arg3 set", func(t *testing.T) {
for _, testHasMore := range []bool{true, false} {
t.Run(fmt.Sprintf("hasMore flag is set=%v", testHasMore), func(t *testing.T) {
withLazyCallReqCombinations(func(crt testCallReq) {
// For each CallReq, we first get the remaining space left, and
// fill up the remaining space with arg2.
crNoArg2 := crt.req()
arg2Size := int(crNoArg2.Header.PayloadSize()) - crNoArg2.Arg2StartOffset()
var flags byte
if testHasMore {
flags |= hasMoreFragmentsFlag
}
cr := crt.reqWithParams(testCallReqParams{
flags: flags,
arg2Buf: make([]byte, arg2Size),
})
endOffset, hasMore := cr.Arg2EndOffset()
assert.Equal(t, hasMore, testHasMore)
assert.EqualValues(t, crNoArg2.Header.PayloadSize(), endOffset)
})
})
}
})
}

func TestLazyCallResRejectsOtherFrames(t *testing.T) {
assertWrappingPanics(
t,
Expand Down
15 changes: 15 additions & 0 deletions testutils/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func NewIncomingCall(callerName string) tchannel.IncomingCall {
// FakeCallFrame is a stub implementation of the CallFrame interface.
type FakeCallFrame struct {
ServiceF, MethodF, CallerF, RoutingKeyF, RoutingDelegateF string

Arg2StartOffsetVal, Arg2EndOffsetVal int
IsArg2Fragmented bool
}

var _ relay.CallFrame = FakeCallFrame{}
Expand Down Expand Up @@ -122,3 +125,15 @@ func (f FakeCallFrame) RoutingKey() []byte {
func (f FakeCallFrame) RoutingDelegate() []byte {
return []byte(f.RoutingDelegateF)
}

// Arg2StartOffset returns the offset from start of payload to
// the beginning of Arg2.
func (f FakeCallFrame) Arg2StartOffset() int {
return f.Arg2StartOffsetVal
}

// Arg2EndOffset returns the offset from start of payload to the end
// of Arg2 and whether Arg2 is fragmented.
func (f FakeCallFrame) Arg2EndOffset() (int, bool) {
return f.Arg2EndOffsetVal, f.IsArg2Fragmented
}