-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix errors caught by Dialyzer #141
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,7 +80,7 @@ defmodule BroadwayKafka.BrodClient do | |
offset_reset_policy: offset_reset_policy, | ||
begin_offset: begin_offset, | ||
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], | ||
fetch_config: Map.new(fetch_config || []), | ||
fetch_config: Map.new(fetch_config), | ||
client_config: client_config, | ||
shared_client: shared_client, | ||
shared_client_id: build_shared_client_id(opts) | ||
|
@@ -109,10 +109,12 @@ defmodule BroadwayKafka.BrodClient do | |
|
||
@impl true | ||
def ack(group_coordinator, generation_id, topic, partition, offset, config) do | ||
:brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) | ||
if group_coordinator do | ||
:brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) | ||
|
||
if group_coordinator && config.offset_commit_on_ack do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to Dialyzer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it is getting this information from the callback but the callback is wrong. According to producer.ex the group_coordinator can definitely be nil. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both I think Dialyzer is complaining about this nil check because We can solve this by having both calls inside a It feels a little strange return
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think that would be better. Although, if this is the case, it seems group_coordinator cannot be nil in this particular branch indeed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PR updated. I have made the changes I described above. |
||
:brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) | ||
if config.offset_commit_on_ack do | ||
:brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) | ||
end | ||
end | ||
|
||
:ok | ||
|
@@ -188,7 +190,11 @@ defmodule BroadwayKafka.BrodClient do | |
|
||
@impl true | ||
def update_topics(group_coordinator, topics) do | ||
:brod_group_coordinator.update_topics(group_coordinator, topics) | ||
if group_coordinator do | ||
:brod_group_coordinator.update_topics(group_coordinator, topics) | ||
end | ||
|
||
:ok | ||
end | ||
|
||
defp start_link_group_coordinator(stage_pid, client_id, callback_module, config) do | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ defmodule BroadwayKafka.KafkaClient do | |
} | ||
|
||
@typep offset_reset_policy :: :earliest | :latest | ||
@typep brod_group_coordinator :: pid() | nil | ||
|
||
@callback init(opts :: any) :: {:ok, config} | {:error, any} | ||
@callback setup( | ||
|
@@ -23,9 +24,9 @@ defmodule BroadwayKafka.KafkaClient do | |
callback_module :: module, | ||
config | ||
) :: | ||
{:ok, group_coordinator :: pid} | {:error, any} | ||
{:ok, group_coordinator :: brod_group_coordinator()} | {:error, any} | ||
@callback ack( | ||
group_coordinator :: pid, | ||
group_coordinator :: brod_group_coordinator(), | ||
generation_id :: integer, | ||
topic :: binary, | ||
partition :: integer, | ||
|
@@ -51,7 +52,7 @@ defmodule BroadwayKafka.KafkaClient do | |
) :: | ||
offset :: integer | no_return() | ||
|
||
@callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't see this type defined in brod so I defined one in this module. |
||
@callback update_topics(brod_group_coordinator(), [:brod.topic()]) :: :ok | ||
@callback connected?(:brod.client()) :: boolean | ||
@callback disconnect(:brod.client()) :: :ok | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetch_config
is always going to be a list that passed validation in thevalidate_fetch_config/1
function, so the|| []
will never be used here.