From b929df1fcfc5c8c82d6064d6a65b60e745a7c9e8 Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Sun, 19 Nov 2023 08:07:45 +0200 Subject: [PATCH] Upgrades --- go.mod | 2 +- go.sum | 4 +- .../IBM/sarama/.pre-commit-config.yaml | 8 + vendor/github.com/IBM/sarama/CHANGELOG.md | 86 ++++++++ .../github.com/IBM/sarama/CODE_OF_CONDUCT.md | 128 ++++++++++++ vendor/github.com/IBM/sarama/Dockerfile.kafka | 27 +-- vendor/github.com/IBM/sarama/README.md | 2 + vendor/github.com/IBM/sarama/SECURITY.md | 11 ++ vendor/github.com/IBM/sarama/admin.go | 30 +-- vendor/github.com/IBM/sarama/broker.go | 9 +- vendor/github.com/IBM/sarama/client.go | 8 +- vendor/github.com/IBM/sarama/config.go | 11 +- .../github.com/IBM/sarama/docker-compose.yml | 40 ++-- vendor/github.com/IBM/sarama/entrypoint.sh | 19 +- vendor/github.com/IBM/sarama/errors.go | 186 +++++++++--------- vendor/github.com/IBM/sarama/fetch_request.go | 5 + vendor/github.com/IBM/sarama/mockresponses.go | 40 ++++ .../IBM/sarama/offset_fetch_request.go | 37 ++++ .../IBM/sarama/offset_fetch_response.go | 2 + .../github.com/IBM/sarama/offset_manager.go | 17 +- vendor/github.com/IBM/sarama/record_batch.go | 2 +- .../IBM/sarama/transaction_manager.go | 5 +- vendor/github.com/IBM/sarama/utils.go | 6 +- vendor/modules.txt | 2 +- 24 files changed, 497 insertions(+), 190 deletions(-) create mode 100644 vendor/github.com/IBM/sarama/CODE_OF_CONDUCT.md create mode 100644 vendor/github.com/IBM/sarama/SECURITY.md diff --git a/go.mod b/go.mod index cad465e88..30978c42a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/beatlabs/patron go 1.20 require ( - github.com/IBM/sarama v1.41.3 + github.com/IBM/sarama v1.42.1 github.com/aws/aws-sdk-go-v2 v1.22.1 github.com/aws/aws-sdk-go-v2/config v1.18.37 github.com/aws/aws-sdk-go-v2/credentials v1.15.1 diff --git a/go.sum b/go.sum index 9110647d3..8fba8b4d0 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= -github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= -github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.21.0/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= diff --git a/vendor/github.com/IBM/sarama/.pre-commit-config.yaml b/vendor/github.com/IBM/sarama/.pre-commit-config.yaml index d7271ee2e..1869b8160 100644 --- a/vendor/github.com/IBM/sarama/.pre-commit-config.yaml +++ b/vendor/github.com/IBM/sarama/.pre-commit-config.yaml @@ -31,3 +31,11 @@ repos: language: golang files: \.go$ args: [] + - repo: https://github.com/gitleaks/gitleaks + rev: v8.16.3 + hooks: + - id: gitleaks + - repo: https://github.com/golangci/golangci-lint + rev: v1.52.2 + hooks: + - id: golangci-lint diff --git a/vendor/github.com/IBM/sarama/CHANGELOG.md b/vendor/github.com/IBM/sarama/CHANGELOG.md index 59e3b35d9..513b76f91 100644 --- a/vendor/github.com/IBM/sarama/CHANGELOG.md +++ b/vendor/github.com/IBM/sarama/CHANGELOG.md @@ -1,5 +1,91 @@ # Changelog +## Version 1.42.1 (2023-11-07) + +## What's Changed +### :bug: Fixes +* fix: make fetchInitialOffset use correct protocol by @dnwe in https://github.com/IBM/sarama/pull/2705 +* fix(config): relax ClientID validation after 1.0.0 by @dnwe in https://github.com/IBM/sarama/pull/2706 + +**Full Changelog**: https://github.com/IBM/sarama/compare/v1.42.0...v1.42.1 + +## Version 1.42.0 (2023-11-02) + +## What's Changed +### :bug: Fixes +* Asynchronously close brokers during a RefreshBrokers by @bmassemin in https://github.com/IBM/sarama/pull/2693 +* Fix data race on Broker.done channel by @prestona in https://github.com/IBM/sarama/pull/2698 +* fix: data race in Broker.AsyncProduce by @lzakharov in https://github.com/IBM/sarama/pull/2678 +* Fix default retention time value in offset commit by @prestona in https://github.com/IBM/sarama/pull/2700 +* fix(txmgr): ErrOffsetsLoadInProgress is retriable by @dnwe in https://github.com/IBM/sarama/pull/2701 +### :wrench: Maintenance +* chore(ci): improve ossf scorecard result by @dnwe in https://github.com/IBM/sarama/pull/2685 +* chore(ci): add kafka 3.6.0 to FVT and versions by @dnwe in https://github.com/IBM/sarama/pull/2692 +### :heavy_plus_sign: Other Changes +* chore(ci): ossf scorecard.yml by @dnwe in https://github.com/IBM/sarama/pull/2683 +* fix(ci): always run CodeQL on every commit by @dnwe in https://github.com/IBM/sarama/pull/2689 +* chore(doc): add OpenSSF Scorecard badge by @dnwe in https://github.com/IBM/sarama/pull/2691 + +## New Contributors +* @bmassemin made their first contribution in https://github.com/IBM/sarama/pull/2693 +* @lzakharov made their first contribution in https://github.com/IBM/sarama/pull/2678 + +**Full Changelog**: https://github.com/IBM/sarama/compare/v1.41.3...v1.42.0 + +## Version 1.41.3 (2023-10-17) + +## What's Changed +### :bug: Fixes +* fix: pre-compile regex for parsing kafka version by @qshuai in https://github.com/IBM/sarama/pull/2663 +* fix(client): ignore empty Metadata responses when refreshing by @HaoSunUber in https://github.com/IBM/sarama/pull/2672 +### :package: Dependency updates +* chore(deps): bump the golang-org-x group with 2 updates by @dependabot in https://github.com/IBM/sarama/pull/2661 +* chore(deps): bump golang.org/x/net from 0.16.0 to 0.17.0 by @dependabot in https://github.com/IBM/sarama/pull/2671 +### :memo: Documentation +* fix(docs): correct topic name in rebalancing strategy example by @maksadbek in https://github.com/IBM/sarama/pull/2657 + +## New Contributors +* @maksadbek made their first contribution in https://github.com/IBM/sarama/pull/2657 +* @qshuai made their first contribution in https://github.com/IBM/sarama/pull/2663 + +**Full Changelog**: https://github.com/IBM/sarama/compare/v1.41.2...v1.41.3 + +## Version 1.41.2 (2023-09-12) + +## What's Changed +### :tada: New Features / Improvements +* perf: Alloc records in batch by @ronanh in https://github.com/IBM/sarama/pull/2646 +### :bug: Fixes +* fix(consumer): guard against nil client by @dnwe in https://github.com/IBM/sarama/pull/2636 +* fix(consumer): don't retry session if ctx canceled by @dnwe in https://github.com/IBM/sarama/pull/2642 +* fix: use least loaded broker to refresh metadata by @HaoSunUber in https://github.com/IBM/sarama/pull/2645 +### :package: Dependency updates +* chore(deps): bump the golang-org-x group with 1 update by @dependabot in https://github.com/IBM/sarama/pull/2641 + +## New Contributors +* @HaoSunUber made their first contribution in https://github.com/IBM/sarama/pull/2645 + +**Full Changelog**: https://github.com/IBM/sarama/compare/v1.41.1...v1.41.2 + +## Version 1.41.1 (2023-08-30) + +## What's Changed +### :bug: Fixes +* fix(proto): handle V3 member metadata and empty owned partitions by @dnwe in https://github.com/IBM/sarama/pull/2618 +* fix: make clear that error is configuration issue not server error by @hindessm in https://github.com/IBM/sarama/pull/2628 +* fix(client): force Event Hubs to use V1_0_0_0 by @dnwe in https://github.com/IBM/sarama/pull/2633 +* fix: add retries to alter user scram creds by @hindessm in https://github.com/IBM/sarama/pull/2632 +### :wrench: Maintenance +* chore(lint): bump golangci-lint and tweak config by @dnwe in https://github.com/IBM/sarama/pull/2620 +### :memo: Documentation +* fix(doc): add missing doc for mock consumer by @hsweif in https://github.com/IBM/sarama/pull/2386 +* chore(proto): doc CreateTopics/JoinGroup fields by @dnwe in https://github.com/IBM/sarama/pull/2627 +### :heavy_plus_sign: Other Changes +* chore(gh): add new style issue templates by @dnwe in https://github.com/IBM/sarama/pull/2624 + + +**Full Changelog**: https://github.com/IBM/sarama/compare/v1.41.0...v1.41.1 + ## Version 1.41.0 (2023-08-21) ## What's Changed diff --git a/vendor/github.com/IBM/sarama/CODE_OF_CONDUCT.md b/vendor/github.com/IBM/sarama/CODE_OF_CONDUCT.md new file mode 100644 index 000000000..8470ec5ce --- /dev/null +++ b/vendor/github.com/IBM/sarama/CODE_OF_CONDUCT.md @@ -0,0 +1,128 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +We as members, contributors, and leaders pledge to make participation in our +community a harassment-free experience for everyone, regardless of age, body +size, visible or invisible disability, ethnicity, sex characteristics, gender +identity and expression, level of experience, education, socio-economic status, +nationality, personal appearance, race, religion, or sexual identity +and orientation. + +We pledge to act and interact in ways that contribute to an open, welcoming, +diverse, inclusive, and healthy community. + +## Our Standards + +Examples of behavior that contributes to a positive environment for our +community include: + +* Demonstrating empathy and kindness toward other people +* Being respectful of differing opinions, viewpoints, and experiences +* Giving and gracefully accepting constructive feedback +* Accepting responsibility and apologizing to those affected by our mistakes, + and learning from the experience +* Focusing on what is best not just for us as individuals, but for the + overall community + +Examples of unacceptable behavior include: + +* The use of sexualized language or imagery, and sexual attention or + advances of any kind +* Trolling, insulting or derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or email + address, without their explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Enforcement Responsibilities + +Community leaders are responsible for clarifying and enforcing our standards of +acceptable behavior and will take appropriate and fair corrective action in +response to any behavior that they deem inappropriate, threatening, offensive, +or harmful. + +Community leaders have the right and responsibility to remove, edit, or reject +comments, commits, code, wiki edits, issues, and other contributions that are +not aligned to this Code of Conduct, and will communicate reasons for moderation +decisions when appropriate. + +## Scope + +This Code of Conduct applies within all community spaces, and also applies when +an individual is officially representing the community in public spaces. +Examples of representing our community include using an official e-mail address, +posting via an official social media account, or acting as an appointed +representative at an online or offline event. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported to the community leaders responsible for enforcement at +dominic.evans@uk.ibm.com. +All complaints will be reviewed and investigated promptly and fairly. + +All community leaders are obligated to respect the privacy and security of the +reporter of any incident. + +## Enforcement Guidelines + +Community leaders will follow these Community Impact Guidelines in determining +the consequences for any action they deem in violation of this Code of Conduct: + +### 1. Correction + +**Community Impact**: Use of inappropriate language or other behavior deemed +unprofessional or unwelcome in the community. + +**Consequence**: A private, written warning from community leaders, providing +clarity around the nature of the violation and an explanation of why the +behavior was inappropriate. A public apology may be requested. + +### 2. Warning + +**Community Impact**: A violation through a single incident or series +of actions. + +**Consequence**: A warning with consequences for continued behavior. No +interaction with the people involved, including unsolicited interaction with +those enforcing the Code of Conduct, for a specified period of time. This +includes avoiding interactions in community spaces as well as external channels +like social media. Violating these terms may lead to a temporary or +permanent ban. + +### 3. Temporary Ban + +**Community Impact**: A serious violation of community standards, including +sustained inappropriate behavior. + +**Consequence**: A temporary ban from any sort of interaction or public +communication with the community for a specified period of time. No public or +private interaction with the people involved, including unsolicited interaction +with those enforcing the Code of Conduct, is allowed during this period. +Violating these terms may lead to a permanent ban. + +### 4. Permanent Ban + +**Community Impact**: Demonstrating a pattern of violation of community +standards, including sustained inappropriate behavior, harassment of an +individual, or aggression toward or disparagement of classes of individuals. + +**Consequence**: A permanent ban from any sort of public interaction within +the community. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], +version 2.0, available at +https://www.contributor-covenant.org/version/2/0/code_of_conduct.html. + +Community Impact Guidelines were inspired by [Mozilla's code of conduct +enforcement ladder](https://github.com/mozilla/diversity). + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see the FAQ at +https://www.contributor-covenant.org/faq. Translations are available at +https://www.contributor-covenant.org/translations. diff --git a/vendor/github.com/IBM/sarama/Dockerfile.kafka b/vendor/github.com/IBM/sarama/Dockerfile.kafka index b11d899b9..186c2eb18 100644 --- a/vendor/github.com/IBM/sarama/Dockerfile.kafka +++ b/vendor/github.com/IBM/sarama/Dockerfile.kafka @@ -1,26 +1,27 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:latest +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.8@sha256:b93deceb59a58588d5b16429fc47f98920f84740a1f2ed6454e33275f0701b59 USER root -RUN microdnf update \ - && microdnf install curl gzip java-11-openjdk-headless tar tzdata-java \ - && microdnf reinstall tzdata \ - && microdnf clean all +RUN microdnf update -y \ + && microdnf install -y curl gzip java-11-openjdk-headless tar tzdata-java \ + && microdnf reinstall -y tzdata \ + && microdnf clean all ENV JAVA_HOME=/usr/lib/jvm/jre-11 # https://docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html # Ensure Java doesn't cache any dns results RUN cd /etc/java/java-11-openjdk/*/conf/security \ - && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ - && echo 'networkaddress.cache.ttl=0' >> java.security \ - && echo 'networkaddress.cache.negative.ttl=0' >> java.security + && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ + && echo 'networkaddress.cache.ttl=0' >> java.security \ + && echo 'networkaddress.cache.negative.ttl=0' >> java.security ARG SCALA_VERSION="2.13" -ARG KAFKA_VERSION="3.5.1" +ARG KAFKA_VERSION="3.6.0" -# https://github.com/apache/kafka/blob/53eeaad946cd053e9eb1a762972d4efeacb8e4fc/tests/docker/Dockerfile#L65-L69 +# https://github.com/apache/kafka/blob/9989b68d0d38c8f1357f78bf9d53a58c1476188d/tests/docker/Dockerfile#L46-L72 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" +SHELL ["/bin/bash", "-o", "pipefail", "-c"] RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ && chmod a+rw "/opt/kafka-${KAFKA_VERSION}" \ && curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" @@ -28,8 +29,8 @@ RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ # older kafka versions depend upon jaxb-api being bundled with the JDK, but it # was removed from Java 11 so work around that by including it in the kafka # libs dir regardless -RUN cd /tmp \ - && curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ +WORKDIR /tmp +RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ && for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \ && rm -f jaxb-api-2.3.0.jar @@ -41,4 +42,6 @@ RUN sed -e "s/JAVA_MAJOR_VERSION=.*/JAVA_MAJOR_VERSION=${JAVA_MAJOR_VERSION}/" - COPY entrypoint.sh / +USER 65534:65534 + ENTRYPOINT ["/entrypoint.sh"] diff --git a/vendor/github.com/IBM/sarama/README.md b/vendor/github.com/IBM/sarama/README.md index a47d6b71d..4534d7b41 100644 --- a/vendor/github.com/IBM/sarama/README.md +++ b/vendor/github.com/IBM/sarama/README.md @@ -1,6 +1,8 @@ # sarama [![Go Reference](https://pkg.go.dev/badge/github.com/IBM/sarama.svg)](https://pkg.go.dev/github.com/IBM/sarama) +[![OpenSSF Scorecard](https://api.securityscorecards.dev/projects/github.com/IBM/sarama/badge?style=flat)](https://securityscorecards.dev/viewer/?uri=github.com/IBM/sarama) +[![OpenSSF Best Practices](https://www.bestpractices.dev/projects/7996/badge)](https://www.bestpractices.dev/projects/7996) Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apache.org/). diff --git a/vendor/github.com/IBM/sarama/SECURITY.md b/vendor/github.com/IBM/sarama/SECURITY.md new file mode 100644 index 000000000..b2f6e61fe --- /dev/null +++ b/vendor/github.com/IBM/sarama/SECURITY.md @@ -0,0 +1,11 @@ +# Security + +## Reporting Security Issues + +**Please do not report security vulnerabilities through public GitHub issues.** + +The easiest way to report a security issue is privately through GitHub [here](https://github.com/IBM/sarama/security/advisories/new). + +See [Privately reporting a security vulnerability](https://docs.github.com/en/code-security/security-advisories/guidance-on-reporting-and-writing/privately-reporting-a-security-vulnerability) for full instructions. + +Alternatively, you can report them via e-mail or anonymous form to the IBM Product Security Incident Response Team (PSIRT) following the guidelines under the [IBM Security Vulnerability Management](https://www.ibm.com/support/pages/ibm-security-vulnerability-management) pages. diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go index 7dabd3737..0ddd031cf 100644 --- a/vendor/github.com/IBM/sarama/admin.go +++ b/vendor/github.com/IBM/sarama/admin.go @@ -1018,35 +1018,7 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m return nil, err } - request := &OffsetFetchRequest{ - ConsumerGroup: group, - partitions: topicPartitions, - } - - if ca.conf.Version.IsAtLeast(V2_5_0_0) { - // Version 7 is adding the require stable flag. - request.Version = 7 - } else if ca.conf.Version.IsAtLeast(V2_4_0_0) { - // Version 6 is the first flexible version. - request.Version = 6 - } else if ca.conf.Version.IsAtLeast(V2_1_0_0) { - // Version 3, 4, and 5 are the same as version 2. - request.Version = 5 - } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { - request.Version = 4 - } else if ca.conf.Version.IsAtLeast(V0_11_0_0) { - request.Version = 3 - } else if ca.conf.Version.IsAtLeast(V0_10_2_0) { - // Starting in version 2, the request can contain a null topics array to indicate that offsets - // for all topics should be fetched. It also returns a top level error code - // for group or coordinator level errors. - request.Version = 2 - } else if ca.conf.Version.IsAtLeast(V0_8_2_0) { - // In version 0, the request read offsets from ZK. - // - // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic. - request.Version = 1 - } + request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) return coordinator.FetchOffset(request) } diff --git a/vendor/github.com/IBM/sarama/broker.go b/vendor/github.com/IBM/sarama/broker.go index 7ec3022b3..268696cf4 100644 --- a/vendor/github.com/IBM/sarama/broker.go +++ b/vendor/github.com/IBM/sarama/broker.go @@ -260,6 +260,7 @@ func (b *Broker) Open(conf *Config) error { b.connErr = b.authenticateViaSASLv1() if b.connErr != nil { close(b.responses) + <-b.done err = b.conn.Close() if err == nil { DebugLogger.Printf("Closed connection to broker %s\n", b.addr) @@ -434,12 +435,16 @@ type ProduceCallback func(*ProduceResponse, error) // // Make sure not to Close the broker in the callback as it will lead to a deadlock. func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error { - metricRegistry := b.metricRegistry + b.lock.Lock() + defer b.lock.Unlock() + needAcks := request.RequiredAcks != NoResponse // Use a nil promise when no acks is required var promise *responsePromise if needAcks { + metricRegistry := b.metricRegistry + // Create ProduceResponse early to provide the header version res := new(ProduceResponse) promise = &responsePromise{ @@ -465,8 +470,6 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error } } - b.lock.Lock() - defer b.lock.Unlock() return b.sendWithPromise(request, promise) } diff --git a/vendor/github.com/IBM/sarama/client.go b/vendor/github.com/IBM/sarama/client.go index c6364eead..2decba7c5 100644 --- a/vendor/github.com/IBM/sarama/client.go +++ b/vendor/github.com/IBM/sarama/client.go @@ -519,16 +519,16 @@ func (client *client) RefreshBrokers(addrs []string) error { defer client.lock.Unlock() for _, broker := range client.brokers { - _ = broker.Close() - delete(client.brokers, broker.ID()) + safeAsyncClose(broker) } + client.brokers = make(map[int32]*Broker) for _, broker := range client.seedBrokers { - _ = broker.Close() + safeAsyncClose(broker) } for _, broker := range client.deadSeeds { - _ = broker.Close() + safeAsyncClose(broker) } client.seedBrokers = nil diff --git a/vendor/github.com/IBM/sarama/config.go b/vendor/github.com/IBM/sarama/config.go index f9299a8c9..ad970a3f0 100644 --- a/vendor/github.com/IBM/sarama/config.go +++ b/vendor/github.com/IBM/sarama/config.go @@ -15,7 +15,9 @@ import ( const defaultClientID = "sarama" -var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) +// validClientID specifies the permitted characters for a client.id when +// connecting to Kafka versions before 1.0.0 (KIP-190) +var validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) // Config is used to pass multiple configuration options to Sarama's constructors. type Config struct { @@ -846,8 +848,11 @@ func (c *Config) Validate() error { switch { case c.ChannelBufferSize < 0: return ConfigurationError("ChannelBufferSize must be >= 0") - case !validID.MatchString(c.ClientID): - return ConfigurationError("ClientID is invalid") + } + + // only validate clientID locally for Kafka versions before KIP-190 was implemented + if !c.Version.IsAtLeast(V1_0_0_0) && !validClientID.MatchString(c.ClientID) { + return ConfigurationError(fmt.Sprintf("ClientID value %q is not valid for Kafka versions before 1.0.0", c.ClientID)) } return nil diff --git a/vendor/github.com/IBM/sarama/docker-compose.yml b/vendor/github.com/IBM/sarama/docker-compose.yml index 4fdf1a517..e916416d5 100644 --- a/vendor/github.com/IBM/sarama/docker-compose.yml +++ b/vendor/github.com/IBM/sarama/docker-compose.yml @@ -38,18 +38,18 @@ services: ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' kafka-1: hostname: 'kafka-1' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-1:9091', ] @@ -64,7 +64,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091' @@ -83,18 +83,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-2: hostname: 'kafka-2' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-2:9091', ] @@ -109,7 +109,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092' @@ -128,18 +128,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-3: hostname: 'kafka-3' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-3:9091', ] @@ -154,7 +154,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093' @@ -173,18 +173,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-4: hostname: 'kafka-4' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-4:9091', ] @@ -199,7 +199,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094' @@ -218,18 +218,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-5: hostname: 'kafka-5' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-5:9091', ] @@ -244,7 +244,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095' diff --git a/vendor/github.com/IBM/sarama/entrypoint.sh b/vendor/github.com/IBM/sarama/entrypoint.sh index 7f405f92d..9fe9a44b1 100644 --- a/vendor/github.com/IBM/sarama/entrypoint.sh +++ b/vendor/github.com/IBM/sarama/entrypoint.sh @@ -1,6 +1,9 @@ #!/bin/bash -KAFKA_VERSION="${KAFKA_VERSION:-3.5.1}" +set -eu +set -o pipefail + +KAFKA_VERSION="${KAFKA_VERSION:-3.6.0}" KAFKA_HOME="/opt/kafka-${KAFKA_VERSION}" if [ ! -d "${KAFKA_HOME}" ]; then @@ -10,19 +13,19 @@ fi cd "${KAFKA_HOME}" || exit 1 -# discard all empty/commented lines -sed -e '/^#/d' -e '/^$/d' -i".orig" config/server.properties +# discard all empty/commented lines from default config and copy to /tmp +sed -e '/^#/d' -e '/^$/d' config/server.properties >/tmp/server.properties -echo "########################################################################" >>config/server.properties +echo "########################################################################" >>/tmp/server.properties # emulate kafka_configure_from_environment_variables from bitnami/bitnami-docker-kafka for var in "${!KAFKA_CFG_@}"; do key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' -e 's/.*/\L&/')" - sed -e '/^'$key'/d' -i"" config/server.properties + sed -e '/^'$key'/d' -i"" /tmp/server.properties value="${!var}" - echo "$key=$value" >>config/server.properties + echo "$key=$value" >>/tmp/server.properties done -sort config/server.properties +sort /tmp/server.properties -exec bin/kafka-server-start.sh config/server.properties +exec bin/kafka-server-start.sh /tmp/server.properties diff --git a/vendor/github.com/IBM/sarama/errors.go b/vendor/github.com/IBM/sarama/errors.go index 8d1d16834..2c431aecb 100644 --- a/vendor/github.com/IBM/sarama/errors.go +++ b/vendor/github.com/IBM/sarama/errors.go @@ -173,98 +173,98 @@ type KError int16 // Numeric error codes returned by the Kafka server. const ( - ErrNoError KError = 0 - ErrUnknown KError = -1 - ErrOffsetOutOfRange KError = 1 - ErrInvalidMessage KError = 2 - ErrUnknownTopicOrPartition KError = 3 - ErrInvalidMessageSize KError = 4 - ErrLeaderNotAvailable KError = 5 - ErrNotLeaderForPartition KError = 6 - ErrRequestTimedOut KError = 7 - ErrBrokerNotAvailable KError = 8 - ErrReplicaNotAvailable KError = 9 - ErrMessageSizeTooLarge KError = 10 - ErrStaleControllerEpochCode KError = 11 - ErrOffsetMetadataTooLarge KError = 12 - ErrNetworkException KError = 13 - ErrOffsetsLoadInProgress KError = 14 - ErrConsumerCoordinatorNotAvailable KError = 15 - ErrNotCoordinatorForConsumer KError = 16 - ErrInvalidTopic KError = 17 - ErrMessageSetSizeTooLarge KError = 18 - ErrNotEnoughReplicas KError = 19 - ErrNotEnoughReplicasAfterAppend KError = 20 - ErrInvalidRequiredAcks KError = 21 - ErrIllegalGeneration KError = 22 - ErrInconsistentGroupProtocol KError = 23 - ErrInvalidGroupId KError = 24 - ErrUnknownMemberId KError = 25 - ErrInvalidSessionTimeout KError = 26 - ErrRebalanceInProgress KError = 27 - ErrInvalidCommitOffsetSize KError = 28 - ErrTopicAuthorizationFailed KError = 29 - ErrGroupAuthorizationFailed KError = 30 - ErrClusterAuthorizationFailed KError = 31 - ErrInvalidTimestamp KError = 32 - ErrUnsupportedSASLMechanism KError = 33 - ErrIllegalSASLState KError = 34 - ErrUnsupportedVersion KError = 35 - ErrTopicAlreadyExists KError = 36 - ErrInvalidPartitions KError = 37 - ErrInvalidReplicationFactor KError = 38 - ErrInvalidReplicaAssignment KError = 39 - ErrInvalidConfig KError = 40 - ErrNotController KError = 41 - ErrInvalidRequest KError = 42 - ErrUnsupportedForMessageFormat KError = 43 - ErrPolicyViolation KError = 44 - ErrOutOfOrderSequenceNumber KError = 45 - ErrDuplicateSequenceNumber KError = 46 - ErrInvalidProducerEpoch KError = 47 - ErrInvalidTxnState KError = 48 - ErrInvalidProducerIDMapping KError = 49 - ErrInvalidTransactionTimeout KError = 50 - ErrConcurrentTransactions KError = 51 - ErrTransactionCoordinatorFenced KError = 52 - ErrTransactionalIDAuthorizationFailed KError = 53 - ErrSecurityDisabled KError = 54 - ErrOperationNotAttempted KError = 55 - ErrKafkaStorageError KError = 56 - ErrLogDirNotFound KError = 57 - ErrSASLAuthenticationFailed KError = 58 - ErrUnknownProducerID KError = 59 - ErrReassignmentInProgress KError = 60 - ErrDelegationTokenAuthDisabled KError = 61 - ErrDelegationTokenNotFound KError = 62 - ErrDelegationTokenOwnerMismatch KError = 63 - ErrDelegationTokenRequestNotAllowed KError = 64 - ErrDelegationTokenAuthorizationFailed KError = 65 - ErrDelegationTokenExpired KError = 66 - ErrInvalidPrincipalType KError = 67 - ErrNonEmptyGroup KError = 68 - ErrGroupIDNotFound KError = 69 - ErrFetchSessionIDNotFound KError = 70 - ErrInvalidFetchSessionEpoch KError = 71 - ErrListenerNotFound KError = 72 - ErrTopicDeletionDisabled KError = 73 - ErrFencedLeaderEpoch KError = 74 - ErrUnknownLeaderEpoch KError = 75 - ErrUnsupportedCompressionType KError = 76 - ErrStaleBrokerEpoch KError = 77 - ErrOffsetNotAvailable KError = 78 - ErrMemberIdRequired KError = 79 - ErrPreferredLeaderNotAvailable KError = 80 - ErrGroupMaxSizeReached KError = 81 - ErrFencedInstancedId KError = 82 - ErrEligibleLeadersNotAvailable KError = 83 - ErrElectionNotNeeded KError = 84 - ErrNoReassignmentInProgress KError = 85 - ErrGroupSubscribedToTopic KError = 86 - ErrInvalidRecord KError = 87 - ErrUnstableOffsetCommit KError = 88 - ErrThrottlingQuotaExceeded KError = 89 - ErrProducerFenced KError = 90 + ErrUnknown KError = -1 // Errors.UNKNOWN_SERVER_ERROR + ErrNoError KError = 0 // Errors.NONE + ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE + ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE + ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION + ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE + ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE + ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER + ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT + ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE + ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE + ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE + ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH + ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE + ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION + ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS + ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE + ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR + ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION + ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE + ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS + ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND + ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS + ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION + ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL + ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID + ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID + ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT + ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS + ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE + ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED + ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED + ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED + ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP + ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM + ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE + ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION + ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS + ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS + ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR + ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT + ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG + ErrNotController KError = 41 // Errors.NOT_CONTROLLER + ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST + ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT + ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION + ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER + ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER + ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH + ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE + ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING + ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT + ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS + ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED + ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED + ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED + ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED + ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR + ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND + ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED + ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID + ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS + ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED + ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND + ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH + ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED + ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED + ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED + ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE + ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP + ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND + ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND + ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH + ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND + ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED + ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH + ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH + ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE + ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH + ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE + ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED + ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE + ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED + ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID + ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE + ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED + ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS + ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC + ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD + ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT + ErrThrottlingQuotaExceeded KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED + ErrProducerFenced KError = 90 // Errors.PRODUCER_FENCED ) func (err KError) Error() string { @@ -302,7 +302,7 @@ func (err KError) Error() string { case ErrNetworkException: return "kafka server: The server disconnected before a response was received" case ErrOffsetsLoadInProgress: - return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition" + return "kafka server: The coordinator is still loading offsets and cannot currently process requests" case ErrConsumerCoordinatorNotAvailable: return "kafka server: Offset's topic has not yet been created" case ErrNotCoordinatorForConsumer: diff --git a/vendor/github.com/IBM/sarama/fetch_request.go b/vendor/github.com/IBM/sarama/fetch_request.go index d1fd81384..a5314b55c 100644 --- a/vendor/github.com/IBM/sarama/fetch_request.go +++ b/vendor/github.com/IBM/sarama/fetch_request.go @@ -1,5 +1,7 @@ package sarama +import "fmt" + type fetchRequestBlock struct { Version int16 // currentLeaderEpoch contains the current leader epoch of the partition. @@ -241,6 +243,9 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { if err != nil { return err } + if partitionCount < 0 { + return fmt.Errorf("partitionCount %d is invalid", partitionCount) + } r.forgotten[topic] = make([]int32, partitionCount) for j := 0; j < partitionCount; j++ { diff --git a/vendor/github.com/IBM/sarama/mockresponses.go b/vendor/github.com/IBM/sarama/mockresponses.go index 688835278..d09415b49 100644 --- a/vendor/github.com/IBM/sarama/mockresponses.go +++ b/vendor/github.com/IBM/sarama/mockresponses.go @@ -1467,3 +1467,43 @@ func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeade } return res } + +// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder. +type MockInitProducerIDResponse struct { + producerID int64 + producerEpoch int16 + err KError + t TestReporter +} + +func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse { + return &MockInitProducerIDResponse{ + t: t, + } +} + +func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse { + m.producerID = int64(id) + return m +} + +func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse { + m.producerEpoch = int16(epoch) + return m +} + +func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse { + m.err = err + return m +} + +func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*InitProducerIDRequest) + res := &InitProducerIDResponse{ + Version: req.Version, + Err: m.err, + ProducerID: m.producerID, + ProducerEpoch: m.producerEpoch, + } + return res +} diff --git a/vendor/github.com/IBM/sarama/offset_fetch_request.go b/vendor/github.com/IBM/sarama/offset_fetch_request.go index c37ae73e3..0c9b8405b 100644 --- a/vendor/github.com/IBM/sarama/offset_fetch_request.go +++ b/vendor/github.com/IBM/sarama/offset_fetch_request.go @@ -7,6 +7,43 @@ type OffsetFetchRequest struct { partitions map[string][]int32 } +func NewOffsetFetchRequest( + version KafkaVersion, + group string, + partitions map[string][]int32, +) *OffsetFetchRequest { + request := &OffsetFetchRequest{ + ConsumerGroup: group, + partitions: partitions, + } + if version.IsAtLeast(V2_5_0_0) { + // Version 7 is adding the require stable flag. + request.Version = 7 + } else if version.IsAtLeast(V2_4_0_0) { + // Version 6 is the first flexible version. + request.Version = 6 + } else if version.IsAtLeast(V2_1_0_0) { + // Version 3, 4, and 5 are the same as version 2. + request.Version = 5 + } else if version.IsAtLeast(V2_0_0_0) { + request.Version = 4 + } else if version.IsAtLeast(V0_11_0_0) { + request.Version = 3 + } else if version.IsAtLeast(V0_10_2_0) { + // Starting in version 2, the request can contain a null topics array to indicate that offsets + // for all topics should be fetched. It also returns a top level error code + // for group or coordinator level errors. + request.Version = 2 + } else if version.IsAtLeast(V0_8_2_0) { + // In version 0, the request read offsets from ZK. + // + // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic. + request.Version = 1 + } + + return request +} + func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { if r.Version < 0 || r.Version > 7 { return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"} diff --git a/vendor/github.com/IBM/sarama/offset_fetch_response.go b/vendor/github.com/IBM/sarama/offset_fetch_response.go index b412b25f1..7ce7927d8 100644 --- a/vendor/github.com/IBM/sarama/offset_fetch_response.go +++ b/vendor/github.com/IBM/sarama/offset_fetch_response.go @@ -22,6 +22,8 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err if err != nil { return err } + } else { + b.LeaderEpoch = -1 } if isFlexible { diff --git a/vendor/github.com/IBM/sarama/offset_manager.go b/vendor/github.com/IBM/sarama/offset_manager.go index 332679fd7..1bf545908 100644 --- a/vendor/github.com/IBM/sarama/offset_manager.go +++ b/vendor/github.com/IBM/sarama/offset_manager.go @@ -153,11 +153,8 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri return om.fetchInitialOffset(topic, partition, retries-1) } - req := new(OffsetFetchRequest) - req.Version = 1 - req.ConsumerGroup = om.group - req.AddPartition(topic, partition) - + partitions := map[string][]int32{topic: {partition}} + req := NewOffsetFetchRequest(om.conf.Version, om.group, partitions) resp, err := broker.FetchOffset(req) if err != nil { if retries <= 0 { @@ -318,9 +315,13 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest { // request controlled retention was only supported from V2-V4 (it became // broker-only after that) so if the user has set the config options then - // flow those through as retention time on the commit request - if r.Version >= 2 && r.Version < 5 && om.conf.Consumer.Offsets.Retention > 0 { - r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond) + // flow those through as retention time on the commit request. + if r.Version >= 2 && r.Version < 5 { + // Map Sarama's default of 0 to Kafka's default of -1 + r.RetentionTime = -1 + if om.conf.Consumer.Offsets.Retention > 0 { + r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond) + } } om.pomsLock.RLock() diff --git a/vendor/github.com/IBM/sarama/record_batch.go b/vendor/github.com/IBM/sarama/record_batch.go index c6f41b27a..c422c5c2f 100644 --- a/vendor/github.com/IBM/sarama/record_batch.go +++ b/vendor/github.com/IBM/sarama/record_batch.go @@ -58,7 +58,7 @@ func (b *RecordBatch) LastOffset() int64 { func (b *RecordBatch) encode(pe packetEncoder) error { if b.Version != 2 { - return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)} + return PacketEncodingError{fmt.Sprintf("unsupported record batch version (%d)", b.Version)} } pe.putInt64(b.FirstOffset) pe.push(&lengthField{}) diff --git a/vendor/github.com/IBM/sarama/transaction_manager.go b/vendor/github.com/IBM/sarama/transaction_manager.go index 546528f26..ca7e13dab 100644 --- a/vendor/github.com/IBM/sarama/transaction_manager.go +++ b/vendor/github.com/IBM/sarama/transaction_manager.go @@ -569,9 +569,8 @@ func (t *transactionManager) initProducerId() (int64, int16, error) { return response.ProducerID, response.ProducerEpoch, false, nil } switch response.Err { - case ErrConsumerCoordinatorNotAvailable: - fallthrough - case ErrNotCoordinatorForConsumer: + // Retriable errors + case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress: if t.isTransactional() { _ = coordinator.Close() _ = t.client.RefreshTransactionCoordinator(t.transactionalID) diff --git a/vendor/github.com/IBM/sarama/utils.go b/vendor/github.com/IBM/sarama/utils.go index fe5f0a52f..feadc0065 100644 --- a/vendor/github.com/IBM/sarama/utils.go +++ b/vendor/github.com/IBM/sarama/utils.go @@ -198,6 +198,7 @@ var ( V3_4_1_0 = newKafkaVersion(3, 4, 1, 0) V3_5_0_0 = newKafkaVersion(3, 5, 0, 0) V3_5_1_0 = newKafkaVersion(3, 5, 1, 0) + V3_6_0_0 = newKafkaVersion(3, 6, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -258,9 +259,10 @@ var ( V3_4_1_0, V3_5_0_0, V3_5_1_0, + V3_6_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_5_1_0 + MaxVersion = V3_6_0_0 DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test @@ -275,8 +277,8 @@ var ( V2_6_2_0, V2_8_2_0, V3_1_2_0, - V3_2_3_0, V3_3_2_0, + V3_6_0_0, } ) diff --git a/vendor/modules.txt b/vendor/modules.txt index c206c8b6a..ee40ac0f4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,6 +1,6 @@ # github.com/HdrHistogram/hdrhistogram-go v1.1.2 ## explicit; go 1.14 -# github.com/IBM/sarama v1.41.3 +# github.com/IBM/sarama v1.42.1 ## explicit; go 1.17 github.com/IBM/sarama # github.com/aws/aws-sdk-go-v2 v1.22.1