Skip to content

Commit

Permalink
Merge pull request #640 from Shopify/vs/make_messages_async
Browse files Browse the repository at this point in the history
Make sending client messages async
  • Loading branch information
vinistock authored Apr 21, 2023
2 parents 694ca5c + 1a2763f commit 31361a2
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 72 deletions.
4 changes: 3 additions & 1 deletion bin/benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ITERATIONS = 1000
CACHE_FILE_PATH = "/tmp/ruby_lsp_benchmark_results.json"

def avg_bench(method, params)
message_queue = Thread::Queue.new
results = (0...ITERATIONS).map do
# Create a new store every time to prevent caching
store = RubyLsp::Store.new
Expand All @@ -43,12 +44,13 @@ def avg_bench(method, params)

GC.disable
result = Benchmark.measure do
RubyLsp::Executor.new(store).execute({
RubyLsp::Executor.new(store, message_queue).execute({
method: method,
params: params,
})
end.utime
GC.enable
message_queue.close
result
end

Expand Down
26 changes: 13 additions & 13 deletions lib/ruby_lsp/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ module RubyLsp
class Executor
extend T::Sig

sig { params(store: Store).void }
def initialize(store)
sig { params(store: Store, message_queue: Thread::Queue).void }
def initialize(store, message_queue)
# Requests that mutate the store must be run sequentially! Parallel requests only receive a temporary copy of the
# store
@store = store
@messages = T.let([], T::Array[Message])
@message_queue = message_queue
end

sig { params(request: T::Hash[Symbol, T.untyped]).returns(Result) }
Expand All @@ -25,7 +25,7 @@ def execute(request)
error = e
end

Result.new(response: response, error: error, request_time: request_time, messages: @messages)
Result.new(response: response, error: error, request_time: request_time)
end

private
Expand All @@ -43,7 +43,7 @@ def run(request)
errored_extensions = Extension.extensions.select(&:error?)

if errored_extensions.any?
@messages << Notification.new(
@message_queue << Notification.new(
message: "window/showMessage",
params: Interface::ShowMessageParams.new(
type: Constant::MessageType::WARNING,
Expand All @@ -63,7 +63,7 @@ def run(request)
request.dig(:params, :textDocument, :version),
)
when "textDocument/didClose"
@messages << Notification.new(
@message_queue << Notification.new(
message: "textDocument/publishDiagnostics",
params: Interface::PublishDiagnosticsParams.new(uri: uri, diagnostics: []),
)
Expand Down Expand Up @@ -91,7 +91,7 @@ def run(request)
begin
formatting(uri)
rescue Requests::Formatting::InvalidFormatter => error
@messages << Notification.new(
@message_queue << Notification.new(
message: "window/showMessage",
params: Interface::ShowMessageParams.new(
type: Constant::MessageType::ERROR,
Expand All @@ -101,7 +101,7 @@ def run(request)

nil
rescue StandardError => error
@messages << Notification.new(
@message_queue << Notification.new(
message: "window/showMessage",
params: Interface::ShowMessageParams.new(
type: Constant::MessageType::ERROR,
Expand All @@ -127,7 +127,7 @@ def run(request)
begin
diagnostic(uri)
rescue StandardError => error
@messages << Notification.new(
@message_queue << Notification.new(
message: "window/showMessage",
params: Interface::ShowMessageParams.new(
type: Constant::MessageType::ERROR,
Expand Down Expand Up @@ -177,8 +177,8 @@ def hover(uri, position)
end

# Instantiate all listeners
base_listener = Requests::Hover.new
listeners = Requests::Hover.listeners.map(&:new)
base_listener = Requests::Hover.new(@message_queue)
listeners = Requests::Hover.listeners.map { |l| l.new(@message_queue) }

# Emit events for all listeners
T.unsafe(EventEmitter).new(base_listener, *listeners).emit_for_target(target)
Expand Down Expand Up @@ -314,7 +314,7 @@ def code_action_resolve(params)

case result
when Requests::CodeActionResolve::Error::EmptySelection
@messages << Notification.new(
@message_queue << Notification.new(
message: "window/showMessage",
params: Interface::ShowMessageParams.new(
type: Constant::MessageType::ERROR,
Expand All @@ -323,7 +323,7 @@ def code_action_resolve(params)
)
raise Requests::CodeActionResolve::CodeActionError
when Requests::CodeActionResolve::Error::InvalidTargetRange
@messages << Notification.new(
@message_queue << Notification.new(
message: "window/showMessage",
params: Interface::ShowMessageParams.new(
type: Constant::MessageType::ERROR,
Expand Down
5 changes: 5 additions & 0 deletions lib/ruby_lsp/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class Listener

abstract!

sig { params(message_queue: Thread::Queue).void }
def initialize(message_queue)
@message_queue = message_queue
end

class << self
extend T::Sig

Expand Down
7 changes: 4 additions & 3 deletions lib/ruby_lsp/requests/hover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class Hover < Listener
sig { override.returns(ResponseType) }
attr_reader :response

sig { void }
def initialize
sig { params(message_queue: Thread::Queue).void }
def initialize(message_queue)
@response = T.let(nil, ResponseType)
super()

super
end

# Merges responses from other hover listeners
Expand Down
48 changes: 32 additions & 16 deletions lib/ruby_lsp/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,32 @@ def initialize
@jobs = T.let({}, T::Hash[T.any(String, Integer), Job])
@mutex = T.let(Mutex.new, Mutex)
@worker = T.let(new_worker, Thread)
@current_request_id = T.let(1, Integer)

# The messages queue includes requests and notifications to be sent to the client
@message_queue = T.let(Thread::Queue.new, Thread::Queue)

# Create a thread to watch the messages queue and send them to the client
@message_dispatcher = T.let(
Thread.new do
current_request_id = 1

loop do
message = @message_queue.pop
break if message.nil?

@mutex.synchronize do
case message
when Notification
@writer.write(method: message.message, params: message.params)
when Request
@writer.write(id: current_request_id, method: message.message, params: message.params)
current_request_id += 1
end
end
end
end,
Thread,
)

Thread.main.priority = 1
end
Expand All @@ -38,24 +63,26 @@ def start
case request[:method]
when "initialize", "initialized", "textDocument/didOpen", "textDocument/didClose", "textDocument/didChange",
"textDocument/formatting", "textDocument/onTypeFormatting", "codeAction/resolve"
result = Executor.new(@store).execute(request)
result = Executor.new(@store, @message_queue).execute(request)
finalize_request(result, request)
when "$/cancelRequest"
# Cancel the job if it's still in the queue
@mutex.synchronize { @jobs[request[:params][:id]]&.cancel }
when "shutdown"
warn("Shutting down Ruby LSP...")

@message_queue.close
# Close the queue so that we can no longer receive items
@job_queue.close
# Clear any remaining jobs so that the thread can terminate
@job_queue.clear
@jobs.clear
# Wait until the thread is finished
@worker.join
@message_dispatcher.join
@store.clear

finalize_request(Result.new(response: nil, messages: []), request)
finalize_request(Result.new(response: nil), request)
when "exit"
# We return zero if shutdown has already been received or one otherwise as per the recommendation in the spec
# https://microsoft.github.io/language-server-protocol/specification/#exit
Expand Down Expand Up @@ -89,9 +116,9 @@ def new_worker

result = if job.cancelled
# We need to return nil to the client even if the request was cancelled
Result.new(response: nil, messages: [])
Result.new(response: nil)
else
Executor.new(@store).execute(request)
Executor.new(@store, @message_queue).execute(request)
end

finalize_request(result, request)
Expand Down Expand Up @@ -119,17 +146,6 @@ def finalize_request(result, request)
@writer.write(id: request[:id], result: response)
end

# If the response include any messages, go through them and publish each one
result.messages.each do |n|
case n
when Notification
@writer.write(method: n.message, params: n.params)
when Request
@writer.write(id: @current_request_id, method: n.message, params: n.params)
@current_request_id += 1
end
end

request_time = result.request_time
if request_time
@writer.write(method: "telemetry/event", params: telemetry_params(request, request_time, error))
Expand Down
7 changes: 1 addition & 6 deletions lib/ruby_lsp/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ class Result
sig { returns(T.untyped) }
attr_reader :response

sig { returns(T::Array[Message]) }
attr_reader :messages

sig { returns(T.nilable(Exception)) }
attr_reader :error

Expand All @@ -50,14 +47,12 @@ class Result
sig do
params(
response: T.untyped,
messages: T::Array[Message],
error: T.nilable(Exception),
request_time: T.nilable(Float),
).void
end
def initialize(response:, messages:, error: nil, request_time: nil)
def initialize(response:, error: nil, request_time: nil)
@response = response
@messages = messages
@error = error
@request_time = request_time
end
Expand Down
43 changes: 41 additions & 2 deletions test/executor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@

class ExecutorTest < Minitest::Test
def setup
store = RubyLsp::Store.new
@executor = RubyLsp::Executor.new(store)
@store = RubyLsp::Store.new
@message_queue = Thread::Queue.new
@executor = RubyLsp::Executor.new(@store, @message_queue)
end

def teardown
@message_queue.close
end

def test_initialize_enabled_features_with_array
Expand Down Expand Up @@ -102,4 +107,38 @@ def test_initialize_uses_utf_16_if_no_encodings_are_specified
# All features are enabled by default
assert_includes("utf-16", hash.dig("capabilities", "positionEncoding"))
end

def test_rubocop_errors_push_window_notification
@executor.expects(:formatting).raises(StandardError, "boom").once

@executor.execute({
method: "textDocument/formatting",
params: {
textDocument: { uri: "file:///foo.rb" },
},
})

notification = T.must(@message_queue.pop)
assert_equal("window/showMessage", notification.message)
assert_equal(
"Formatting error: boom",
T.cast(notification.params, RubyLsp::Interface::ShowMessageParams).message,
)
end

def test_did_close_clears_diagnostics
@store.set(uri: "file:///foo.rb", source: "", version: 1)
@executor.execute({
method: "textDocument/didClose",
params: {
textDocument: { uri: "file:///foo.rb" },
},
})

notification = T.must(@message_queue.pop)
assert_equal("textDocument/publishDiagnostics", notification.message)
assert_empty(T.cast(notification.params, RubyLsp::Interface::PublishDiagnosticsParams).diagnostics)
ensure
@store.delete("file:///foo.rb")
end
end
5 changes: 4 additions & 1 deletion test/extension_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ def teardown
end

def test_registering_an_extension_invokes_activate_on_initialized
Executor.new(RubyLsp::Store.new).execute({ method: "initialized" })
message_queue = Thread::Queue.new
Executor.new(RubyLsp::Store.new, message_queue).execute({ method: "initialized" })

extension_instance = T.must(Extension.extensions.find { |ext| ext.is_a?(@extension) })
assert_predicate(extension_instance, :activated)
ensure
T.must(message_queue).close
end

def test_extensions_are_automatically_tracked
Expand Down
27 changes: 0 additions & 27 deletions test/integration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ def test_document_did_close
assert_telemetry("textDocument/didOpen")

assert(send_request("textDocument/didClose", { textDocument: { uri: "file://#{__FILE__}" } }))

diagnostics = read_response("textDocument/publishDiagnostics")
assert_empty(diagnostics.dig(:params, :diagnostics), "Did not clear diagnostics after closing file")
end

def test_document_did_change
Expand Down Expand Up @@ -328,30 +325,6 @@ def test_selection_ranges_with_syntax_error
assert_nil(response[:error])
end

def test_incorrect_rubocop_configuration
initialize_lsp([])
open_file_with("class Foo\nend")

# Add an invalid cop to the configuration, but save a backup since we're modifying the real file
FileUtils.cp(".rubocop.yml", ".rubocop.yml.bak")
File.write(".rubocop.yml", "\nInvalidCop:\n Enabled: true", mode: "a")

make_request("textDocument/diagnostic", { textDocument: { uri: "file://#{__FILE__}" } })
read_response("textDocument/diagnostic")
response = read_response("window/showMessage")

assert_equal("window/showMessage", response.dig(:method))
assert_equal(LanguageServer::Protocol::Constant::MessageType::ERROR, response.dig(:params, :type))
assert_equal(
"Error running diagnostics: unrecognized cop or department InvalidCop found in .rubocop.yml",
response.dig(:params, :message),
)
ensure
# Restore the original configuration file
FileUtils.rm(".rubocop.yml")
FileUtils.mv(".rubocop.yml.bak", ".rubocop.yml")
end

def test_diagnostics
initialize_lsp([])
open_file_with("class Foo\nend")
Expand Down
Loading

0 comments on commit 31361a2

Please sign in to comment.