Skip to content

Commit

Permalink
Partitions offset by timestamp (#494)
Browse files Browse the repository at this point in the history
Co-authored-by: Teiva Harsanyi <[email protected]>
  • Loading branch information
Chaus Kostiantyn and teivah authored Apr 20, 2022
1 parent f5cfb8c commit 39078d0
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
4 changes: 3 additions & 1 deletion component/async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
"github.com/google/uuid"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -93,6 +93,8 @@ type ConsumerConfig struct {
DurationBasedConsumer bool
DurationOffset time.Duration
TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error)
TimestampBasedConsumer bool
TimestampOffset int64
SaramaConfig *sarama.Config
LatestOffsetReachedChan chan<- struct{}
}
Expand Down
68 changes: 68 additions & 0 deletions component/async/kafka/simple/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
simpleTopic3 = "simpleTopic3"
simpleTopic4 = "simpleTopic4"
simpleTopic5 = "simpleTopic5"
simpleTopic6 = "simpleTopic6"
broker = "127.0.0.1:9093"
)

Expand Down Expand Up @@ -189,6 +190,73 @@ func TestSimpleConsume_WithDurationOffset(t *testing.T) {
assert.Equal(t, sent[2:], received)
}

func TestSimpleConsume_WithTimestampOffset(t *testing.T) {
require.NoError(t, testkafka.CreateTopics(broker, simpleTopic6))
now := time.Now()
times := []time.Time{
now.Add(-10 * time.Hour),
now.Add(-5 * time.Hour),
now.Add(-3 * time.Hour),
now.Add(-2 * time.Hour),
now.Add(-1 * time.Hour),
}
sent := createTimestampPayload(times...)

messages := make([]*sarama.ProducerMessage, 0)
for i, tm := range times {
val := sent[i]
msg := testkafka.CreateProducerMessage(simpleTopic6, val)
msg.Timestamp = tm
messages = append(messages, msg)
}

err := testkafka.SendMessages(broker, messages...)
require.NoError(t, err)

chMessages := make(chan []string)
chErr := make(chan error)
go func() {
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer-w-timestamp", true)
require.NoError(t, err)

factory, err := New("test1", simpleTopic6, []string{broker}, saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()),
kafka.StartFromNewest(), WithTimestampOffset(4*time.Hour))
if err != nil {
chErr <- err
return
}

consumer, err := factory.Create()
if err != nil {
chErr <- err
return
}
defer func() {
_ = consumer.Close()
}()

received, err := testkafka.AsyncConsumeMessages(consumer, 3)
if err != nil {
chErr <- err
return
}

chMessages <- received
}()

time.Sleep(5 * time.Second)

var received []string

select {
case received = <-chMessages:
case err = <-chErr:
require.NoError(t, err)
}

assert.Equal(t, sent[2:], received)
}

func TestSimpleConsume_WithNotificationOnceReachingLatestOffset(t *testing.T) {
require.NoError(t, testkafka.CreateTopics(broker, simpleTopic4))
messages := make([]*sarama.ProducerMessage, 0)
Expand Down
65 changes: 65 additions & 0 deletions component/async/kafka/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/beatlabs/patron/log"
)

// unixNanoToTimestampDivider divides unix nano seconds to valid timestamp for kafka messages.
const unixNanoToTimestampDivider = 1000_000

// TimeExtractor defines a function extracting a time from a Kafka message.
type TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error)

Expand All @@ -35,6 +38,18 @@ func WithDurationOffset(since time.Duration, timeExtractor TimeExtractor) kafka.
}
}

// WithTimestampOffset allows creating a consumer from a given duration.
func WithTimestampOffset(since time.Duration) kafka.OptionFunc {
return func(c *kafka.ConsumerConfig) error {
if since < 0 {
return errors.New("duration must be positive")
}
c.TimestampBasedConsumer = true
c.TimestampOffset = time.Now().Add(-since).UnixNano() / unixNanoToTimestampDivider
return nil
}
}

// WithNotificationOnceReachingLatestOffset closes the input channel once all the partition consumers have reached the
// latest offset.
func WithNotificationOnceReachingLatestOffset(ch chan<- struct{}) kafka.OptionFunc {
Expand Down Expand Up @@ -107,6 +122,10 @@ func (f *Factory) Create() (async.Consumer, error) {
c.partitions = c.partitionsSinceDuration
}

if c.config.TimestampBasedConsumer {
c.partitions = c.partitionsSinceTimestamp
}

if c.config.LatestOffsetReachedChan != nil {
c.latestOffsetReachedChan = c.config.LatestOffsetReachedChan
}
Expand Down Expand Up @@ -326,6 +345,52 @@ func (c *consumer) partitionsSinceDuration(ctx context.Context) ([]sarama.Partit
return pcs, nil
}

func (c *consumer) partitionsSinceTimestamp(_ context.Context) ([]sarama.PartitionConsumer, error) {
client, err := sarama.NewClient(c.config.Brokers, c.config.SaramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}

consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return nil, fmt.Errorf("failed to create simple consumer: %w", err)
}
c.ms = consumer

partitions, err := c.ms.Partitions(c.topic)
if err != nil {
return nil, fmt.Errorf("failed to get partitions: %w", err)
}

pcs := make([]sarama.PartitionConsumer, len(partitions))

ts := c.config.TimestampOffset
c.startingOffsets = make(map[int32]int64, len(partitions))

for i, partition := range partitions {
offset, err := client.GetOffset(c.topic, partition, ts)
if err != nil {
return nil, fmt.Errorf("failed to get offset by timestamp %d for partition %d: %w", ts, partition, err)
}
c.startingOffsets[partition] = offset

pc, err := c.ms.ConsumePartition(c.topic, partition, offset)
if err != nil {
return nil, fmt.Errorf("failed to get partition consumer: %w", err)
}
pcs[i] = pc
}

if c.latestOffsetReachedChan != nil {
err := c.setLatestOffsets(client, partitions)
if err != nil {
return nil, fmt.Errorf("failed to set latest offsets: %w", err)
}
}

return pcs, nil
}

func (c *consumer) setLatestOffsets(client sarama.Client, partitions []int32) error {
offsets := make(map[int32]int64)
for _, partitionID := range partitions {
Expand Down

0 comments on commit 39078d0

Please sign in to comment.