Skip to content

Commit

Permalink
Consume from kafka without group (#66)
Browse files Browse the repository at this point in the history
Signed-off-by: Leonidas Maroulis <[email protected]>
  • Loading branch information
pantrif authored and mantzas committed Jul 19, 2019
1 parent 5304bf6 commit 81524a7
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 47 deletions.
191 changes: 147 additions & 44 deletions async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (m *message) Decode(v interface{}) error {
}

func (m *message) Ack() error {
m.sess.MarkMessage(m.msg, "")
if m.sess != nil {
m.sess.MarkMessage(m.msg, "")
}
trace.SpanSuccess(m.span)
return nil
}
Expand Down Expand Up @@ -88,10 +90,6 @@ func New(name, ct, topic, group string, brokers []string, oo ...OptionFunc) (*Fa
return nil, errors.New("topic is required")
}

if group == "" {
return nil, errors.New("group is required")
}

return &Factory{name: name, ct: ct, topic: topic, group: group, brokers: brokers, oo: oo}, nil
}

Expand All @@ -111,11 +109,15 @@ func (f *Factory) Create() (async.Consumer, error) {
c := &consumer{
brokers: f.brokers,
topic: f.topic,
group: f.group,
traceTag: opentracing.Tag{Key: "group", Value: f.group},
cfg: config,
contentType: f.ct,
buffer: 0,
buffer: 1000,
}

if f.group != "" {
c.group = f.group
c.traceTag = opentracing.Tag{Key: "group", Value: f.group}
c.buffer = 0
}

for _, o := range f.oo {
Expand All @@ -138,20 +140,30 @@ type consumer struct {
contentType string
cnl context.CancelFunc
cg sarama.ConsumerGroup
ms sarama.Consumer
}

// Consume starts consuming messages from a Kafka topic.
func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan error, error) {
ctx, cnl := context.WithCancel(ctx)
c.cnl = cnl

if c.group != "" {
return consumeWithGroup(ctx, c)
}

return consume(ctx, c)
}

func consumeWithGroup(ctx context.Context, c *consumer) (<-chan async.Message, <-chan error, error) {

cg, err := sarama.NewConsumerGroup(c.brokers, c.group, c.cfg)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create consumer")
}
c.cg = cg

log.Infof("consuming messages from topic '%s' using group '%s'", c.topic, c.group)

chMsg := make(chan async.Message, c.buffer)
chErr := make(chan error, c.buffer)

Expand Down Expand Up @@ -185,6 +197,90 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
return chMsg, chErr, nil
}

func consume(ctx context.Context, c *consumer) (<-chan async.Message, <-chan error, error) {

chMsg := make(chan async.Message, c.buffer)
chErr := make(chan error, c.buffer)

log.Infof("consuming messages from topic '%s' without using consumer group", c.topic)
pcs, err := c.partitions()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get partitions")
}
// When kafka cluster is not fully initialized, we may get 0 partions.
if len(pcs) == 0 {
return nil, nil, errors.New("got 0 partitions")
}

for _, pc := range pcs {
go func(consumer sarama.PartitionConsumer) {
for {
select {
case <-ctx.Done():
log.Info("canceling consuming messages requested")
closePartitionConsumer(consumer)
return
case consumerError := <-consumer.Errors():
closePartitionConsumer(consumer)
chErr <- consumerError
return
case m := <-consumer.Messages():
topicPartitionOffsetDiffGaugeSet("", m.Topic, m.Partition, consumer.HighWaterMarkOffset(), m.Offset)

go func() {
msg, err := claimMessage(ctx, c, m)
if err != nil {
chErr <- err
return
}
chMsg <- msg
}()
}
}
}(pc)
}

return chMsg, chErr, nil

}

func claimMessage(ctx context.Context, c *consumer, msg *sarama.ConsumerMessage) (*message, error) {
log.Debugf("data received from topic %s", msg.Topic)

sp, chCtx := trace.ConsumerSpan(
ctx,
trace.ComponentOpName(trace.KafkaConsumerComponent, msg.Topic),
trace.KafkaConsumerComponent,
mapHeader(msg.Headers),
)
var ct string
var err error
if c.contentType != "" {
ct = c.contentType
} else {
ct, err = determineContentType(msg.Headers)
if err != nil {
trace.SpanError(sp)
return nil, errors.Wrap(err, "failed to determine content type")
}
}

dec, err := async.DetermineDecoder(ct)
if err != nil {
trace.SpanError(sp)
return nil, errors.Wrapf(err, "failed to determine decoder for %s", ct)
}

chCtx = log.WithContext(chCtx, log.Sub(map[string]interface{}{"messageID": uuid.New().String()}))

return &message{
ctx: chCtx,
dec: dec,
span: sp,
msg: msg,
}, nil
}

// Close handles closing consumer.
func (c *consumer) Close() error {
if c.cnl != nil {
Expand All @@ -198,6 +294,33 @@ func (c *consumer) Close() error {
return errors.Wrap(c.cg.Close(), "failed to close consumer")
}

func (c *consumer) partitions() ([]sarama.PartitionConsumer, error) {

ms, err := sarama.NewConsumer(c.brokers, c.cfg)
if err != nil {
return nil, errors.Wrap(err, "failed to create consumer")
}
c.ms = ms

partitions, err := c.ms.Partitions(c.topic)
if err != nil {
return nil, errors.Wrap(err, "failed to get partitions")
}

pcs := make([]sarama.PartitionConsumer, len(partitions))

for i, partition := range partitions {

pc, err := c.ms.ConsumePartition(c.topic, partition, c.cfg.Consumer.Offsets.Initial)
if nil != err {
return nil, errors.Wrap(err, "failed to get partition consumer")
}
pcs[i] = pc
}

return pcs, nil
}

type handler struct {
consumer *consumer
messages chan async.Message
Expand All @@ -208,43 +331,13 @@ func (h handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := sess.Context()
for msg := range claim.Messages() {
log.Debugf("data received from topic %s", msg.Topic)
sp, chCtx := trace.ConsumerSpan(
ctx,
trace.ComponentOpName(trace.KafkaConsumerComponent, msg.Topic),
trace.KafkaConsumerComponent,
mapHeader(msg.Headers),
h.consumer.traceTag,
)

var ct string
if h.consumer.contentType != "" {
ct = h.consumer.contentType
} else {
ctTemp, err := determineContentType(msg.Headers)
if err != nil {
trace.SpanError(sp)
return errors.Wrap(err, "failed to determine content type")
}
ct = ctTemp
}

dec, err := async.DetermineDecoder(ct)
if err != nil {
trace.SpanError(sp)
return errors.Wrapf(err, "failed to determine decoder for %s", ct)
}

topicPartitionOffsetDiffGaugeSet(h.consumer.group, msg.Topic, msg.Partition, claim.HighWaterMarkOffset(), msg.Offset)

chCtx = log.WithContext(chCtx, log.Sub(map[string]interface{}{"messageID": uuid.New().String()}))
h.messages <- &message{
sess: sess,
msg: msg,
ctx: chCtx,
dec: dec,
span: sp,
m, err := claimMessage(ctx, h.consumer, msg)
if err != nil {
return err
}
m.sess = sess
h.messages <- m
}
return nil
}
Expand All @@ -259,6 +352,16 @@ func closeConsumer(cns sarama.ConsumerGroup) {
}
}

func closePartitionConsumer(cns sarama.PartitionConsumer) {
if cns == nil {
return
}
err := cns.Close()
if err != nil {
log.Errorf("failed to close partition consumer: %v", err)
}
}

func determineContentType(hdr []*sarama.RecordHeader) (string, error) {
for _, h := range hdr {
if string(h.Key) == encoding.ContentTypeHeader {
Expand Down
38 changes: 35 additions & 3 deletions async/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestNew(t *testing.T) {
wantErr: true,
},
{
name: "fails with missing group",
name: "does not fail with missing group",
args: args{name: "test", brokers: brokers, topic: "topic1", group: ""},
wantErr: true,
wantErr: false,
},
{
name: "success",
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestConsumer_ConsumeFailedBroker(t *testing.T) {
assert.Error(t, err)
}

func TestConsumer_Consume(t *testing.T) {
func TestConsumer_ConsumeWithGroup(t *testing.T) {
broker := sarama.NewMockBroker(t, 0)
broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
Expand Down Expand Up @@ -283,3 +283,35 @@ func TestConsumer_Consume(t *testing.T) {

ctx.Done()
}

func TestConsumer_ConsumeWithoutGroup(t *testing.T) {
broker := sarama.NewMockBroker(t, 0)
topic := "foo_topic"
broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader(topic, 0, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetVersion(1).
SetOffset(topic, 0, sarama.OffsetNewest, 10).
SetOffset(topic, 0, sarama.OffsetOldest, 0),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetMessage(topic, 0, 9, sarama.StringEncoder("Foo")),
})

f, err := New("name", "application/json", topic, "", []string{broker.Addr()})
assert.NoError(t, err)
c, err := f.Create()
assert.NoError(t, err)
ctx := context.Background()
chMsg, chErr, err := c.Consume(ctx)
assert.NoError(t, err)
assert.NotNil(t, chMsg)
assert.NotNil(t, chErr)

err = c.Close()
assert.NoError(t, err)
broker.Close()

ctx.Done()
}
8 changes: 8 additions & 0 deletions async/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ func Timeout(timeout time.Duration) OptionFunc {
return nil
}
}

// Start option for adjusting the the starting offset
func Start(offset int64) OptionFunc {
return func(c *consumer) error {
c.cfg.Consumer.Offsets.Initial = offset
return nil
}
}
6 changes: 6 additions & 0 deletions async/kafka/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ func TestVersion(t *testing.T) {
})
}
}

func TestStart(t *testing.T) {
c := consumer{cfg: sarama.NewConfig()}
err := Start(sarama.OffsetOldest)(&c)
assert.NoError(t, err)
}

0 comments on commit 81524a7

Please sign in to comment.