From 32a4cff044ca886328334bd30196a014221d0167 Mon Sep 17 00:00:00 2001 From: Matt Fenelon Date: Wed, 27 Mar 2024 12:31:53 +0000 Subject: [PATCH] Revert "fix: Remove thread pool executor logic until we get a better handle on what's causing thread pool hangs. refs #469" This reverts commit 7941b6f75ce1001b034ed6e83c148b893e9f3d99. --- lib/graphiti/configuration.rb | 15 +++++++++++++++ lib/graphiti/scope.rb | 19 ++++++++++++++++++- spec/configuration_spec.rb | 15 +++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/lib/graphiti/configuration.rb b/lib/graphiti/configuration.rb index be7b4f19..d1a69098 100644 --- a/lib/graphiti/configuration.rb +++ b/lib/graphiti/configuration.rb @@ -8,6 +8,20 @@ class Configuration # Defaults to false OR if classes are cached (Rails-only) attr_accessor :concurrency + # This number must be considered in accordance with the database + # connection pool size configured in `database.yml`. The connection + # pool should be large enough to accommodate both the foreground + # threads (ie. web server or job worker threads) and background + # threads. For each process, Graphiti will create one global + # executor that uses this many threads to sideload resources + # asynchronously. Thus, the pool size should be at least + # `thread_count + concurrency_max_threads + 1`. For example, if your + # web server has a maximum of 3 threads, and + # `concurrency_max_threads` is set to 4, then your pool size should + # be at least 8. + # @return [Integer] Maximum number of threads to use when fetching sideloads concurrently + attr_accessor :concurrency_max_threads + attr_accessor :respond_to attr_accessor :context_for_endpoint attr_accessor :links_on_demand @@ -26,6 +40,7 @@ class Configuration def initialize @raise_on_missing_sideload = true @concurrency = false + @concurrency_max_threads = 4 @respond_to = [:json, :jsonapi, :xml] @links_on_demand = false @pagination_links_on_demand = false diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index af1f6ed0..6c7ac272 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -2,6 +2,23 @@ module Graphiti class Scope attr_accessor :object, :unpaginated_object attr_reader :pagination + + @@thread_pool_executor_mutex = Mutex.new + + def self.thread_pool_executor + return @thread_pool_executor if @thread_pool_executor + + concurrency = Graphiti.config.concurrency_max_threads || 4 + @@thread_pool_executor_mutex.synchronize do + @@thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end + end + def initialize(object, resource, query, opts = {}) @object = object @resource = resource @@ -49,7 +66,7 @@ def resolve_sideloads(results) @resource.adapter.close if concurrent } if concurrent - promises << Concurrent::Promise.execute(&resolve_sideload) + promises << Concurrent::Promise.execute(executor: self.class.thread_pool_executor, &resolve_sideload) else resolve_sideload.call end diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index 22acf1f4..42013fbe 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -150,6 +150,21 @@ end end + describe "#concurrency_max_threads" do + include_context "with config", :concurrency_max_threads + + it "defaults" do + expect(Graphiti.config.concurrency_max_threads).to eq(4) + end + + it "is overridable" do + Graphiti.configure do |c| + c.concurrency_max_threads = 1 + end + expect(Graphiti.config.concurrency_max_threads).to eq(1) + end + end + describe "#raise_on_missing_sideload" do include_context "with config", :raise_on_missing_sideload