Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able commit an offset or seek to an offset manually #3

Open
ckshekhar opened this issue Feb 4, 2021 · 3 comments
Open

Not able commit an offset or seek to an offset manually #3

ckshekhar opened this issue Feb 4, 2021 · 3 comments

Comments

@ckshekhar
Copy link

ckshekhar commented Feb 4, 2021

I am trying to commit offset manually using commit-async! function from ketu.clients.consumer namespace but I am getting this error.

Execution error (ConcurrentModificationException) at org.apache.kafka.clients.consumer.KafkaConsumer/acquire (KafkaConsumer.java:2421).
KafkaConsumer is not safe for multi-threaded access

Sample code

(defn consume-topic
  "Consume message from Kafka"
  [src-chan topic]
  (let [source-opts {:name "topic-consumer"
                     :brokers "localhost:9092"
                     :topic topic
                     :group-id "group-1"
                     :key-type :string
                     :auto-offset-reset "earliest"
                     :value-type :string
                     :internal-config {"enable.auto.commit" false}
                     :shape [:map :key :value :topic :offset]}]
    (source/source src-chan source-opts)))


(defn commit-offset
  "Commit offset"
  [consumer topic offset]
  (c/commit-async! consumer
                   {(TopicPartition. topic 0)
                    (OffsetAndMetadata. offset)}
                   (c/commit-callback (fn [x e]
                                        (prn "comitted" x)))))


(def channel (chan 100))

(def src (consume-topic channel "topic"))

(commit-offset (:ketu.source/consumer ksrc)
                          "topic1"
                          5)

I don't know what is wrong here. Any pointer would great. Thanks

@blak3mill3r
Copy link

blak3mill3r commented Feb 4, 2021

Only one thread, whichever thread calls .poll(), can acquire the lock on the KafkaConsumer instance. That includes calling .seek()

We've had some clojure code wrapping the consumer instance for a while (there being no suitable wrapper libraries around at the time); seek is one of the things we had to get right, and this was the first thing that bit me when I wrote that.

I ended up using channels to interact with a loop in a clojure.core.async/thread which wraps access to the consumer instance, and I pass it messages like [:seek "topic" partition offset] for example.

@ckshekhar
Copy link
Author

ckshekhar commented Feb 5, 2021

Thanks, @blak3mill3r

(>!! (:ketu.source/consumer-thread src) [:seek "topic-1" 0 7])

I tried to execute the above line but nothing happens. Do I need to create a wrapper to interact with clojure.core.async/thread or is it supported by ketu?

@yonatane
Copy link
Contributor

yonatane commented Jul 5, 2021

Hi @ckshekhar

  • A consumer control channel where you can put commit commands is in our roadmap, but currently not supported.
  • Also as @blak3mill3r mentioned you can't safely commit while another thread is continuously polling in a loop, which is what ketu source does.

For now unfortunately you'll have to implement the polling loop yourself, where you'll have total control of committing offsets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants