diff --git a/config.go b/config.go index ecac0dcb3..30658c1b6 100644 --- a/config.go +++ b/config.go @@ -276,6 +276,12 @@ func (c *Config) Validate() error { if c.Consumer.MaxWaitTime%time.Millisecond != 0 { Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.") } + if c.Group.Heartbeat.Interval%time.Millisecond != 0 { + Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.") + } + if c.Group.Session.Timeout%time.Millisecond != 0 { + Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.") + } if c.ClientID == "sarama" { Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.") } @@ -304,6 +310,14 @@ func (c *Config) Validate() error { return ConfigurationError("Metadata.RefreshFrequency must be >= 0") } + // validate the Group values + switch { + case c.Group.Heartbeat.Interval <= 0: + return ConfigurationError("Group.Heartbeat.Interval must be > 0") + case c.Group.Session.Timeout <= 0: + return ConfigurationError("Group.Session.Timeout must be > 0") + } + // validate the Producer values switch { case c.Producer.MaxMessageBytes <= 0: diff --git a/group_members.go b/group_members.go index 5e8066d70..bdf0e7343 100644 --- a/group_members.go +++ b/group_members.go @@ -38,15 +38,10 @@ func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) { type GroupMemberAssignment struct { Version int16 - Topics []GroupMemberAssignedTopic + Topics map[string][]int32 UserData []byte } -type GroupMemberAssignedTopic struct { - Topic string - Partitions []int32 -} - func (m *GroupMemberAssignment) encode(pe packetEncoder) error { pe.putInt16(m.Version) @@ -54,11 +49,11 @@ func (m *GroupMemberAssignment) encode(pe packetEncoder) error { return err } - for _, topic := range m.Topics { - if err := pe.putString(topic.Topic); err != nil { + for topic, partitions := range m.Topics { + if err := pe.putString(topic); err != nil { return err } - if err := pe.putInt32Array(topic.Partitions); err != nil { + if err := pe.putInt32Array(partitions); err != nil { return err } } @@ -80,12 +75,13 @@ func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) { return } - m.Topics = make([]GroupMemberAssignedTopic, topicLen) + m.Topics = make(map[string][]int32, topicLen) for i := 0; i < topicLen; i++ { - if m.Topics[i].Topic, err = pd.getString(); err != nil { + var topic string + if topic, err = pd.getString(); err != nil { return } - if m.Topics[i].Partitions, err = pd.getInt32Array(); err != nil { + if m.Topics[topic], err = pd.getInt32Array(); err != nil { return } } diff --git a/group_members_test.go b/group_members_test.go index 93afde9d9..599502e0d 100644 --- a/group_members_test.go +++ b/group_members_test.go @@ -53,9 +53,9 @@ func TestGroupMemberMetadata(t *testing.T) { func TestGroupMemberAssignment(t *testing.T) { amt := &GroupMemberAssignment{ Version: 1, - Topics: []GroupMemberAssignedTopic{ - {Topic: "one", Partitions: []int32{0, 2, 4}}, - {Topic: "two", Partitions: []int32{1, 3}}, + Topics: map[string][]int32{ + "one": []int32{0, 2, 4}, + "two": []int32{1, 3}, }, UserData: []byte{0x01, 0x02, 0x03}, }