Skip to content

Commit

Permalink
WIP: Async and deferred loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
tgwizard committed Oct 6, 2024
1 parent d609126 commit 4027212
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
26 changes: 21 additions & 5 deletions examples/http_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,43 @@ def initialize(host:, size: 4, timeout: 4)
@host = host
@size = size
@timeout = timeout
@futures = {}
end

def perform(operations)
def perform_on_wait(operations)
# This fans out and starts off all the concurrent work, which starts and
# immediately returns Concurrent::Promises::Future` objects for each operation.
operations.each do |operation|
future(operation)
end
end

def perform(operations)
# Defer to let other non-async loaders run to completion first.
defer

# Collect the futures (and possibly trigger any newly added ones)
futures = operations.map do |operation|
Concurrent::Promises.future do
pool.with { |connection| operation.call(connection) }
end
future(operation)
end

# At this point, all of the concurrent work has been started.

# This converges back in, waiting on each concurrent future to finish, and fulfilling each
# (non-concurrent) Promise.rb promise.
operations.each_with_index.each do |operation, index|
fulfill(operation, futures[index].value) # .value is a blocking call
fulfill(operation, futures[index].value!) # .value is a blocking call
end
end

private

def future(operation)
@futures[operation] ||= Concurrent::Promises.future do
pool.with { |connection| operation.call(connection) }
end
end

def pool
@pool ||= ConnectionPool.new(size: @size, timeout: @timeout) do
HTTP.persistent(@host)
Expand Down
13 changes: 13 additions & 0 deletions lib/graphql/batch/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ def resolve(loader)
@loading = was_loading
end

def defer(loader)
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred })
resolve(non_deferred_loader)
end
end

def on_wait
# FIXME: Better name?
@loaders.each do |_, loader|
loader.on_any_wait
end
end

def tick
resolve(@loaders.shift.last)
end
Expand Down
49 changes: 48 additions & 1 deletion lib/graphql/batch/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ def current_executor
end
end

attr_accessor :loader_key, :executor
attr_accessor :loader_key, :executor, :deferred

def initialize
@loader_key = nil
@executor = nil
@queue = nil
@cache = nil
@deferred = false
end

def load(key)
Expand All @@ -66,6 +67,14 @@ def prime(key, value)
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
end

def on_any_wait
return if resolved?
load_keys = queue # "Peek" the queue, but don't consume it.
# TODO: Should we have a "peek queue" / "async queue", that we can consume here, to prevent
# duplicate calls to perform_on_wait? (perform_on_wait should be idempotent anyway, but...)
perform_on_wait(load_keys)
end

def resolve # :nodoc:
return if resolved?
load_keys = queue
Expand All @@ -88,6 +97,7 @@ def around_perform
# For Promise#sync
def wait # :nodoc:
if executor
executor.on_wait
executor.resolve(self)
else
resolve
Expand Down Expand Up @@ -126,6 +136,36 @@ def fulfilled?(key)
promise.pending? && promise.source != self
end

def perform_on_wait(keys)
# FIXME: Better name?
# Interface to add custom code to e.g. trigger async operations when any loader starts waiting.
# Example:
#
# def initialize
# super()
# @futures = {}
# end
#
# def perform_on_wait(keys)
# keys.each do |key|
# future(key)
# end
# end
#
# def perform(keys)
# defer # let other non-async loaders run to completion first.
# keys.each do |key|
# future(key).value!
# end
# end
#
# def future(key)
# @futures[key] ||= Concurrent::Promises.future do
# # Perform the async operation
# end
# end
end

# Must override to load the keys and call #fulfill for each key
def perform(keys)
raise NotImplementedError
Expand All @@ -146,6 +186,13 @@ def finish_resolve(key)
end
end

def defer
@deferred = true
executor.defer(self)
ensure
@deferred = false
end

def cache
@cache ||= {}
end
Expand Down

0 comments on commit 4027212

Please sign in to comment.