Skip to content

Commit

Permalink
Add thread pool and concurrency_max_threads configuration option
Browse files Browse the repository at this point in the history
This option allows to limit the maximum number of resources that can be
sideloaded concurrently. With a properly configured connection pool,
this ensures that the activerecord's connection pool is not exhausted by
the sideloading process.

The thread pool configuration is based on ActiveRecord's
global_thread_pool_async_query_executor.

This was previously attempted but there were reports of deadlocks. This
code differs from the original by using Concurrency::Delay assigned to a
constant instead of a regular Mutex. The Delay+constant is how
concurrent-ruby sets up their global thread pools so it's more likely to
be correct.

Closes #469

See: https://github.com/rails/rails/blob/94faed4dd4c915829afc2698e055a0265d9b5773/activerecord/lib/active_record.rb#L279-L287
See: #470
  • Loading branch information
MattFenelon committed Mar 27, 2024
1 parent 32a4cff commit d6fce34
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
27 changes: 13 additions & 14 deletions lib/graphiti/scope.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ class Scope
attr_accessor :object, :unpaginated_object
attr_reader :pagination

@@thread_pool_executor_mutex = Mutex.new

def self.thread_pool_executor
return @thread_pool_executor if @thread_pool_executor

GLOBAL_THREAD_POOL_EXECUTOR = Concurrent::Delay.new do
concurrency = Graphiti.config.concurrency_max_threads || 4
@@thread_pool_executor_mutex.synchronize do
@@thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
end
Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
end
private_constant :GLOBAL_THREAD_POOL_EXECUTOR

def self.global_thread_pool_executor
GLOBAL_THREAD_POOL_EXECUTOR.value!
end

def initialize(object, resource, query, opts = {})
Expand Down Expand Up @@ -66,7 +65,7 @@ def resolve_sideloads(results)
@resource.adapter.close if concurrent
}
if concurrent
promises << Concurrent::Promise.execute(executor: self.class.thread_pool_executor, &resolve_sideload)
promises << Concurrent::Promise.execute(executor: self.class.global_thread_pool_executor, &resolve_sideload)
else
resolve_sideload.call
end
Expand Down
14 changes: 14 additions & 0 deletions spec/scope_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@
expect(before_sideload).to receive(:call).with(hash_including(tenant_id: 1))
instance.resolve_sideloads(results)
end

it 'resolves sideloads concurrently with the threadpool' do
allow(sideload).to receive(:resolve).and_return(sideload)
expect(Concurrent::Promise).to receive(:execute).with(executor: an_instance_of(Concurrent::ThreadPoolExecutor)).and_call_original
instance.resolve_sideloads(results)
end
end

context "without concurrency" do
Expand All @@ -135,4 +141,12 @@
end
end
end

describe '.global_thread_pool_executor' do
it 'memoizes the thread pool executor' do
one = described_class.global_thread_pool_executor
two = described_class.global_thread_pool_executor
expect(one).to eq(two)
end
end
end

0 comments on commit d6fce34

Please sign in to comment.