From 9f09706e731bd33e037bf43cbd8ca548eb325472 Mon Sep 17 00:00:00 2001 From: Trevor Brown Date: Fri, 26 Apr 2024 15:40:31 -0400 Subject: [PATCH 1/2] Fix errors caught by Dialyzer --- lib/broadway_kafka/brod_client.ex | 4 ++-- lib/broadway_kafka/kafka_client.ex | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index f0d9be5..fae2212 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -80,7 +80,7 @@ defmodule BroadwayKafka.BrodClient do offset_reset_policy: offset_reset_policy, begin_offset: begin_offset, group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], - fetch_config: Map.new(fetch_config || []), + fetch_config: Map.new(fetch_config), client_config: client_config, shared_client: shared_client, shared_client_id: build_shared_client_id(opts) @@ -111,7 +111,7 @@ defmodule BroadwayKafka.BrodClient do def ack(group_coordinator, generation_id, topic, partition, offset, config) do :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) - if group_coordinator && config.offset_commit_on_ack do + if config.offset_commit_on_ack do :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) end diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index dda9c3e..0976c56 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -15,6 +15,7 @@ defmodule BroadwayKafka.KafkaClient do } @typep offset_reset_policy :: :earliest | :latest + @typep brod_group_coordinator :: pid() @callback init(opts :: any) :: {:ok, config} | {:error, any} @callback setup( @@ -23,9 +24,9 @@ defmodule BroadwayKafka.KafkaClient do callback_module :: module, config ) :: - {:ok, group_coordinator :: pid} | {:error, any} + {:ok, group_coordinator :: brod_group_coordinator()} | {:error, any} @callback ack( - group_coordinator :: pid, + group_coordinator :: brod_group_coordinator, generation_id :: integer, topic :: binary, partition :: integer, @@ -51,7 +52,7 @@ defmodule BroadwayKafka.KafkaClient do ) :: offset :: integer | no_return() - @callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok + @callback update_topics(brod_group_coordinator(), [:brod.topic()]) :: :ok @callback connected?(:brod.client()) :: boolean @callback disconnect(:brod.client()) :: :ok end From 7fc87343f3758025d76a635a73c6939370346437 Mon Sep 17 00:00:00 2001 From: Trevor Brown Date: Mon, 29 Apr 2024 16:10:30 -0400 Subject: [PATCH 2/2] Address PR feedback --- lib/broadway_kafka/brod_client.ex | 14 ++++++++++---- lib/broadway_kafka/kafka_client.ex | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index fae2212..6171dd7 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -109,10 +109,12 @@ defmodule BroadwayKafka.BrodClient do @impl true def ack(group_coordinator, generation_id, topic, partition, offset, config) do - :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) + if group_coordinator do + :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) - if config.offset_commit_on_ack do - :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) + if config.offset_commit_on_ack do + :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) + end end :ok @@ -188,7 +190,11 @@ defmodule BroadwayKafka.BrodClient do @impl true def update_topics(group_coordinator, topics) do - :brod_group_coordinator.update_topics(group_coordinator, topics) + if group_coordinator do + :brod_group_coordinator.update_topics(group_coordinator, topics) + end + + :ok end defp start_link_group_coordinator(stage_pid, client_id, callback_module, config) do diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index 0976c56..4ae70fd 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -15,7 +15,7 @@ defmodule BroadwayKafka.KafkaClient do } @typep offset_reset_policy :: :earliest | :latest - @typep brod_group_coordinator :: pid() + @typep brod_group_coordinator :: pid() | nil @callback init(opts :: any) :: {:ok, config} | {:error, any} @callback setup( @@ -26,7 +26,7 @@ defmodule BroadwayKafka.KafkaClient do ) :: {:ok, group_coordinator :: brod_group_coordinator()} | {:error, any} @callback ack( - group_coordinator :: brod_group_coordinator, + group_coordinator :: brod_group_coordinator(), generation_id :: integer, topic :: binary, partition :: integer,