diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index 7a29a04..c9633ca 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -169,6 +169,13 @@ defmodule BroadwayKafka.BrodClient do defp lookup_offset(hosts, topic, partition, policy, client_config) do case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do + {:ok, -1} -> + # `:brod.resolve_offset` returns -1 when asked to resolve a timestamp newer + # than all the messages in the partition. + # -1 is not a valid offset you can use with `:brod.fetch` so we need to + # resolve the latest offset instead + lookup_offset(hosts, topic, partition, :latest, client_config) + {:ok, offset} -> offset @@ -246,11 +253,16 @@ defmodule BroadwayKafka.BrodClient do defp validate_option(:offset_commit_on_ack, value) when not is_boolean(value), do: validation_error(:offset_commit_on_ack, "a boolean", value) + defp validate_option(:offset_reset_policy, {:timestamp, timestamp}) + when is_integer(timestamp) and timestamp > 0 do + {:ok, {:timestamp, timestamp}} + end + defp validate_option(:offset_reset_policy, value) when value not in @offset_reset_policy_values do validation_error( :offset_reset_policy, - "one of #{inspect(@offset_reset_policy_values)}", + "one of #{inspect(@offset_reset_policy_values)} or `{:timestamp, timestamp}` where timestamp is a non-negative integer", value ) end @@ -397,6 +409,9 @@ defmodule BroadwayKafka.BrodClient do :latest -> -1 + + {:timestamp, timestamp} when is_integer(timestamp) and timestamp >= 0 -> + timestamp end end diff --git a/lib/broadway_kafka/producer.ex b/lib/broadway_kafka/producer.ex index 416c1da..a0cfda4 100644 --- a/lib/broadway_kafka/producer.ex +++ b/lib/broadway_kafka/producer.ex @@ -40,8 +40,8 @@ defmodule BroadwayKafka.Producer do since only one commit request will be performed per batch. * `:offset_reset_policy` - Optional. Defines the offset to be used when there's no initial - offset in Kafka or if the current offset has expired. Possible values are `:earliest` or - `:latest`. Default is `:latest`. + offset in Kafka or if the current offset has expired. Possible values are `:earliest`, + `:latest` or {:timestamp, timestamp} (in milliseconds). Default is `:latest`. * `:begin_offset` - Optional. Defines how to get the initial offset for the consumers. The possible values are `:assigned` or `:reset`. When set to `:assigned` the starting offset will be the diff --git a/test/brod_client_test.exs b/test/brod_client_test.exs index 8419d5b..65fc9c8 100644 --- a/test/brod_client_test.exs +++ b/test/brod_client_test.exs @@ -115,7 +115,7 @@ defmodule BroadwayKafka.BrodClientTest do assert BrodClient.init(opts) == {:error, - "expected :offset_reset_policy to be one of [:earliest, :latest], got: :an_atom"} + "expected :offset_reset_policy to be one of [:earliest, :latest] or `{:timestamp, timestamp}` where timestamp is a non-negative integer, got: :an_atom"} opts = Keyword.put(@opts, :offset_reset_policy, :earliest) assert {:ok, [], %{offset_reset_policy: :earliest}} = BrodClient.init(opts)