Skip to content

Commit

Permalink
More 0.9 protocol additions. Add broker accessors
Browse files Browse the repository at this point in the history
  • Loading branch information
dim committed Dec 23, 2015
1 parent 87ec8d7 commit 2795466
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 0 deletions.
44 changes: 44 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return response, nil
}

func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
48 changes: 48 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,52 @@ var brokerTestTable = []struct {
t.Error("Offset request got no response!")
}
}},

{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
response, err := broker.JoinGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("JoinGroup request got no response!")
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
response, err := broker.SyncGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("SyncGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
response, err := broker.LeaveGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("LeaveGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
response, err := broker.Heartbeat(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Heartbeat request got no response!")
}
}},
}
17 changes: 17 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ type Config struct {
RefreshFrequency time.Duration
}

// Group is the namespace for group management properties
Group struct {
Session struct {
// The allowed session timeout for registered consumers (defaults to 30s).
// Must be within the allowed server range.
Timeout time.Duration
}
Heartbeat struct {
// Interval between each heartbeat (defaults to 3s). It should be no more
// than 1/3rd of the Group.Session.Timout setting
Interval time.Duration
}
}

// Producer is the namespace for configuration related to producing messages,
// used by the Producer.
Producer struct {
Expand Down Expand Up @@ -212,6 +226,9 @@ func NewConfig() *Config {
c.Metadata.Retry.Backoff = 250 * time.Millisecond
c.Metadata.RefreshFrequency = 10 * time.Minute

c.Group.Session.Timeout = 30 * time.Second
c.Group.Heartbeat.Interval = 3 * time.Second

c.Producer.MaxMessageBytes = 1000000
c.Producer.RequiredAcks = WaitForLocal
c.Producer.Timeout = 10 * time.Second
Expand Down
98 changes: 98 additions & 0 deletions group_members.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sarama

type GroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
}

func (m *GroupMemberMetadata) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putStringArray(m.Topics); err != nil {
return err
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

if m.Topics, err = pd.getStringArray(); err != nil {
return
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}

type GroupMemberAssignment struct {
Version int16
Topics []GroupMemberAssignedTopic
UserData []byte
}

type GroupMemberAssignedTopic struct {
Topic string
Partitions []int32
}

func (m *GroupMemberAssignment) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putArrayLength(len(m.Topics)); err != nil {
return err
}

for _, topic := range m.Topics {
if err := pe.putString(topic.Topic); err != nil {
return err
}
if err := pe.putInt32Array(topic.Partitions); err != nil {
return err
}
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

var topicLen int
if topicLen, err = pd.getArrayLength(); err != nil {
return
}

m.Topics = make([]GroupMemberAssignedTopic, topicLen)
for i := 0; i < topicLen; i++ {
if m.Topics[i].Topic, err = pd.getString(); err != nil {
return
}
if m.Topics[i].Partitions, err = pd.getInt32Array(); err != nil {
return
}
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}
77 changes: 77 additions & 0 deletions group_members_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sarama

import (
"bytes"
"reflect"
"testing"
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 2, // Topic two, partition array length
0, 0, 0, 1, 0, 0, 0, 3, // 1, 3
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
)

func TestGroupMemberMetadata(t *testing.T) {
meta := &GroupMemberMetadata{
Version: 1,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
}

meta2 := new(GroupMemberMetadata)
err = decode(buf, meta2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
}
}

func TestGroupMemberAssignment(t *testing.T) {
amt := &GroupMemberAssignment{
Version: 1,
Topics: []GroupMemberAssignedTopic{
{Topic: "one", Partitions: []int32{0, 2, 4}},
{Topic: "two", Partitions: []int32{1, 3}},
},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(amt)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
}

amt2 := new(GroupMemberAssignment)
err = decode(buf, amt2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
}
}
10 changes: 10 additions & 0 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {

r.GroupProtocols[name] = metadata
}

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *GroupMemberMetadata) error {
bin, err := encode(metadata)
if err != nil {
return err
}

r.AddGroupProtocol(name, bin)
return nil
}
12 changes: 12 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ type JoinGroupResponse struct {
Members map[string][]byte
}

func (r *JoinGroupResponse) GetMembers() (map[string]GroupMemberMetadata, error) {
members := make(map[string]GroupMemberMetadata, len(r.Members))
for id, bin := range r.Members {
meta := new(GroupMemberMetadata)
if err := decode(bin, meta); err != nil {
return nil, err
}
members[id] = *meta
}
return members, nil
}

func (r *JoinGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
pe.putInt32(r.GenerationId)
Expand Down
10 changes: 10 additions & 0 deletions sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment

r.GroupAssignments[memberId] = memberAssignment
}

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *GroupMemberAssignment) error {
bin, err := encode(memberAssignment)
if err != nil {
return err
}

r.AddGroupAssignment(memberId, bin)
return nil
}
6 changes: 6 additions & 0 deletions sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ type SyncGroupResponse struct {
MemberAssignment []byte
}

func (r *SyncGroupResponse) GetMemberAssignment() (*GroupMemberAssignment, error) {
assignment := new(GroupMemberAssignment)
err := decode(r.MemberAssignment, assignment)
return assignment, err
}

func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putBytes(r.MemberAssignment)
Expand Down

0 comments on commit 2795466

Please sign in to comment.