Skip to content

Commit

Permalink
Kafka consumer fixes (#206)
Browse files Browse the repository at this point in the history
- Allow ".commit()" to be called without parameters (it's valid)
- Change log format in on_commit and on_error callbacks
  • Loading branch information
daniil-quix authored Oct 28, 2023
1 parent 336b607 commit b7a3143
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 12 deletions.
8 changes: 2 additions & 6 deletions src/StreamingDataFrames/streamingdataframes/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@

def _default_error_cb(error: KafkaError):
logger.error(
"Kafka consumer error",
extra={"error_code": error.code(), "error_desc": error.str()},
f"Kafka consumer error: {error.str()} (code={error.code()})",
)


Expand All @@ -42,8 +41,7 @@ def _default_on_commit_cb(
):
if error is not None:
logger.error(
"Kafka commit error",
extra={"error_code": error.code(), "error_desc": error.str()},
f"Kafka commit error: {error.str()} (code={error.code()})",
)
if on_commit is not None:
on_commit(error, partitions)
Expand Down Expand Up @@ -307,8 +305,6 @@ def commit(
raise ValueError(
'Parameters "message" and "offsets" are mutually exclusive'
)
if message is None and offsets is None:
raise ValueError('One of "message" or "offsets" must be passed')
kwargs = {
"asynchronous": asynchronous,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,6 @@ def test_store_offsets_autocommit_disabled(


class TestConsumerCommit:
def test_commit_none_passed(self, consumer_factory):
with consumer_factory() as consumer:
with pytest.raises(ValueError) as raised:
consumer.commit()
assert str(raised.value) == 'One of "message" or "offsets" must be passed'

def test_commit_offsets_and_message_passed(self, consumer):
with consumer:
with pytest.raises(ValueError) as raised:
Expand Down

0 comments on commit b7a3143

Please sign in to comment.