Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fixed-thread-executor 200) only has 8 threads? #137

Closed
lkrubner opened this issue Jul 12, 2017 · 4 comments
Closed

(fixed-thread-executor 200) only has 8 threads? #137

lkrubner opened this issue Jul 12, 2017 · 4 comments

Comments

@lkrubner
Copy link

I was given a simple assignment, which I figured I could knock out in a few hours, but I made some mistakes, and some of the mistakes were interesting.

I have some code that pulls a million records from MySQL and then fires them at an API to get some additional information. I have this code:

(def executor (me/fixed-thread-executor 200))
    
(defn enqueue
  [message]
  (slingshot/try+
   (->> (ms/->source [message])
        (ms/onto executor)
        (ms/map api/query))
   (catch Object o
          (println " message queue is not happy about the message we were given" o)
          )))

In api/query I do a HTTP post and I catch Exceptions, wherein I have the thread sleep and then try again.

I stupidly ran this app before I had started the app for the API. So I got:

java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.
java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.

Just these 8 lines. But I thought I had 200 threads:

      (def executor (me/fixed-thread-executor 200))

It makes sense that I got a SocketTimeout, because I had not started the API app. But I should have gotten 200 of the same error, yes? Why would I only get 8 errors?

@dm3
Copy link
Contributor

dm3 commented Jul 15, 2017

@lkrubner The idea behind streams is that they consume elements one by one. The executor merely specifies where you want your one-by-one processing to happen. If you want to parallelize the processing, consider using something like fork in #131.

@ztellman
Copy link
Collaborator

You could also try simply mapping over the stream using (d/future-with executor ...), which will enqueue all the work onto the executor, and the resulting stream of deferred values can be realized with realize-each, or something similar.

@ztellman
Copy link
Collaborator

And @dm3, I apologize for not responding to #131 earlier, I'll take a closer look.

@lkrubner
Copy link
Author

Thank you. (d/future-with) is a great idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants