diff --git a/async/kafka/kafka.go b/async/kafka/kafka.go index 6f66b1c65..1d2716a8c 100644 --- a/async/kafka/kafka.go +++ b/async/kafka/kafka.go @@ -158,7 +158,12 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er if err != nil { return nil, nil, errors.Wrap(err, "failed to get partitions") } - log.Infof("consuming messages for topic '%s'", c.topic) + // When kafka cluster is not fully initialized, we may get 0 partions. + if len(pcs) == 0 { + return nil, nil, errors.New("got 0 partitions") + } + + log.Infof("consuming messages for topic '%s' from %d partitions", c.topic, len(pcs)) chMsg := make(chan async.Message, c.buffer) chErr := make(chan error, c.buffer)