diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index 6c7ac272..d29e1a8f 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -3,20 +3,19 @@ class Scope attr_accessor :object, :unpaginated_object attr_reader :pagination - @@thread_pool_executor_mutex = Mutex.new - - def self.thread_pool_executor - return @thread_pool_executor if @thread_pool_executor - + GLOBAL_THREAD_POOL_EXECUTOR = Concurrent::Delay.new do concurrency = Graphiti.config.concurrency_max_threads || 4 - @@thread_pool_executor_mutex.synchronize do - @@thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( - min_threads: 0, - max_threads: concurrency, - max_queue: concurrency * 4, - fallback_policy: :caller_runs - ) - end + Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end + private_constant :GLOBAL_THREAD_POOL_EXECUTOR + + def self.global_thread_pool_executor + GLOBAL_THREAD_POOL_EXECUTOR.value! end def initialize(object, resource, query, opts = {}) @@ -66,7 +65,7 @@ def resolve_sideloads(results) @resource.adapter.close if concurrent } if concurrent - promises << Concurrent::Promise.execute(executor: self.class.thread_pool_executor, &resolve_sideload) + promises << Concurrent::Promise.execute(executor: self.class.global_thread_pool_executor, &resolve_sideload) else resolve_sideload.call end diff --git a/spec/scope_spec.rb b/spec/scope_spec.rb index 2ae9dd75..a39f927d 100644 --- a/spec/scope_spec.rb +++ b/spec/scope_spec.rb @@ -112,6 +112,12 @@ expect(before_sideload).to receive(:call).with(hash_including(tenant_id: 1)) instance.resolve_sideloads(results) end + + it 'resolves sideloads concurrently with the threadpool' do + allow(sideload).to receive(:resolve).and_return(sideload) + expect(Concurrent::Promise).to receive(:execute).with(executor: an_instance_of(Concurrent::ThreadPoolExecutor)).and_call_original + instance.resolve_sideloads(results) + end end context "without concurrency" do @@ -135,4 +141,12 @@ end end end + + describe '.global_thread_pool_executor' do + it 'memoizes the thread pool executor' do + one = described_class.global_thread_pool_executor + two = described_class.global_thread_pool_executor + expect(one).to eq(two) + end + end end