diff --git a/broker.go b/broker.go index 46f06a0f3..5bd1df2d6 100644 --- a/broker.go +++ b/broker.go @@ -239,6 +239,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, return response, nil } +func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) { + response := new(JoinGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) { + response := new(SyncGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) { + response := new(LeaveGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) { + response := new(HeartbeatResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/broker_test.go b/broker_test.go index f1aa2e8ba..590a4dc28 100644 --- a/broker_test.go +++ b/broker_test.go @@ -176,4 +176,52 @@ var brokerTestTable = []struct { t.Error("Offset request got no response!") } }}, + + {[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := JoinGroupRequest{} + response, err := broker.JoinGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("JoinGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := SyncGroupRequest{} + response, err := broker.SyncGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("SyncGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := LeaveGroupRequest{} + response, err := broker.LeaveGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("LeaveGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := HeartbeatRequest{} + response, err := broker.Heartbeat(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("Heartbeat request got no response!") + } + }}, } diff --git a/config.go b/config.go index 542c61172..ecac0dcb3 100644 --- a/config.go +++ b/config.go @@ -54,6 +54,20 @@ type Config struct { RefreshFrequency time.Duration } + // Group is the namespace for group management properties + Group struct { + Session struct { + // The allowed session timeout for registered consumers (defaults to 30s). + // Must be within the allowed server range. + Timeout time.Duration + } + Heartbeat struct { + // Interval between each heartbeat (defaults to 3s). It should be no more + // than 1/3rd of the Group.Session.Timout setting + Interval time.Duration + } + } + // Producer is the namespace for configuration related to producing messages, // used by the Producer. Producer struct { @@ -212,6 +226,9 @@ func NewConfig() *Config { c.Metadata.Retry.Backoff = 250 * time.Millisecond c.Metadata.RefreshFrequency = 10 * time.Minute + c.Group.Session.Timeout = 30 * time.Second + c.Group.Heartbeat.Interval = 3 * time.Second + c.Producer.MaxMessageBytes = 1000000 c.Producer.RequiredAcks = WaitForLocal c.Producer.Timeout = 10 * time.Second diff --git a/group_members.go b/group_members.go new file mode 100644 index 000000000..5e8066d70 --- /dev/null +++ b/group_members.go @@ -0,0 +1,98 @@ +package sarama + +type GroupMemberMetadata struct { + Version int16 + Topics []string + UserData []byte +} + +func (m *GroupMemberMetadata) encode(pe packetEncoder) error { + pe.putInt16(m.Version) + + if err := pe.putStringArray(m.Topics); err != nil { + return err + } + + if err := pe.putBytes(m.UserData); err != nil { + return err + } + + return nil +} + +func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) { + if m.Version, err = pd.getInt16(); err != nil { + return + } + + if m.Topics, err = pd.getStringArray(); err != nil { + return + } + + if m.UserData, err = pd.getBytes(); err != nil { + return + } + + return nil +} + +type GroupMemberAssignment struct { + Version int16 + Topics []GroupMemberAssignedTopic + UserData []byte +} + +type GroupMemberAssignedTopic struct { + Topic string + Partitions []int32 +} + +func (m *GroupMemberAssignment) encode(pe packetEncoder) error { + pe.putInt16(m.Version) + + if err := pe.putArrayLength(len(m.Topics)); err != nil { + return err + } + + for _, topic := range m.Topics { + if err := pe.putString(topic.Topic); err != nil { + return err + } + if err := pe.putInt32Array(topic.Partitions); err != nil { + return err + } + } + + if err := pe.putBytes(m.UserData); err != nil { + return err + } + + return nil +} + +func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) { + if m.Version, err = pd.getInt16(); err != nil { + return + } + + var topicLen int + if topicLen, err = pd.getArrayLength(); err != nil { + return + } + + m.Topics = make([]GroupMemberAssignedTopic, topicLen) + for i := 0; i < topicLen; i++ { + if m.Topics[i].Topic, err = pd.getString(); err != nil { + return + } + if m.Topics[i].Partitions, err = pd.getInt32Array(); err != nil { + return + } + } + + if m.UserData, err = pd.getBytes(); err != nil { + return + } + + return nil +} diff --git a/group_members_test.go b/group_members_test.go new file mode 100644 index 000000000..93afde9d9 --- /dev/null +++ b/group_members_test.go @@ -0,0 +1,77 @@ +package sarama + +import ( + "bytes" + "reflect" + "testing" +) + +var ( + groupMemberMetadata = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } + groupMemberAssignment = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 0, 0, 3, // Topic one, partition array length + 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4 + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 2, // Topic two, partition array length + 0, 0, 0, 1, 0, 0, 0, 3, // 1, 3 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } +) + +func TestGroupMemberMetadata(t *testing.T) { + meta := &GroupMemberMetadata{ + Version: 1, + Topics: []string{"one", "two"}, + UserData: []byte{0x01, 0x02, 0x03}, + } + + buf, err := encode(meta) + if err != nil { + t.Error("Failed to encode data", err) + } else if !bytes.Equal(groupMemberMetadata, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf) + } + + meta2 := new(GroupMemberMetadata) + err = decode(buf, meta2) + if err != nil { + t.Error("Failed to decode data", err) + } else if !reflect.DeepEqual(meta, meta2) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2) + } +} + +func TestGroupMemberAssignment(t *testing.T) { + amt := &GroupMemberAssignment{ + Version: 1, + Topics: []GroupMemberAssignedTopic{ + {Topic: "one", Partitions: []int32{0, 2, 4}}, + {Topic: "two", Partitions: []int32{1, 3}}, + }, + UserData: []byte{0x01, 0x02, 0x03}, + } + + buf, err := encode(amt) + if err != nil { + t.Error("Failed to encode data", err) + } else if !bytes.Equal(groupMemberAssignment, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf) + } + + amt2 := new(GroupMemberAssignment) + err = decode(buf, amt2) + if err != nil { + t.Error("Failed to decode data", err) + } else if !reflect.DeepEqual(amt, amt2) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2) + } +} diff --git a/join_group_request.go b/join_group_request.go index 8bb5ce826..13812449e 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { r.GroupProtocols[name] = metadata } + +func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *GroupMemberMetadata) error { + bin, err := encode(metadata) + if err != nil { + return err + } + + r.AddGroupProtocol(name, bin) + return nil +} diff --git a/join_group_response.go b/join_group_response.go index 037a9cd26..15e16cf1c 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -9,6 +9,18 @@ type JoinGroupResponse struct { Members map[string][]byte } +func (r *JoinGroupResponse) GetMembers() (map[string]GroupMemberMetadata, error) { + members := make(map[string]GroupMemberMetadata, len(r.Members)) + for id, bin := range r.Members { + meta := new(GroupMemberMetadata) + if err := decode(bin, meta); err != nil { + return nil, err + } + members[id] = *meta + } + return members, nil +} + func (r *JoinGroupResponse) encode(pe packetEncoder) error { pe.putInt16(int16(r.Err)) pe.putInt32(r.GenerationId) diff --git a/sync_group_request.go b/sync_group_request.go index 60be6f3f3..b0faed2ef 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment r.GroupAssignments[memberId] = memberAssignment } + +func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *GroupMemberAssignment) error { + bin, err := encode(memberAssignment) + if err != nil { + return err + } + + r.AddGroupAssignment(memberId, bin) + return nil +} diff --git a/sync_group_response.go b/sync_group_response.go index e10685ef8..406c27db5 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -5,6 +5,12 @@ type SyncGroupResponse struct { MemberAssignment []byte } +func (r *SyncGroupResponse) GetMemberAssignment() (*GroupMemberAssignment, error) { + assignment := new(GroupMemberAssignment) + err := decode(r.MemberAssignment, assignment) + return assignment, err +} + func (r *SyncGroupResponse) encode(pe packetEncoder) error { pe.putInt16(int16(r.Err)) return pe.putBytes(r.MemberAssignment)