From 1a2763f84ca305f37a73b96b7f1926e0bcb1a6ff Mon Sep 17 00:00:00 2001 From: Vinicius Stock Date: Wed, 19 Apr 2023 13:58:29 -0400 Subject: [PATCH] Make client requests and notifications async --- bin/benchmark | 4 +- lib/ruby_lsp/executor.rb | 26 ++++++------- lib/ruby_lsp/listener.rb | 5 +++ lib/ruby_lsp/requests/hover.rb | 7 ++-- lib/ruby_lsp/server.rb | 48 ++++++++++++++++-------- lib/ruby_lsp/utils.rb | 7 +--- test/executor_test.rb | 43 ++++++++++++++++++++- test/extension_test.rb | 5 ++- test/integration_test.rb | 27 ------------- test/requests/hover_expectations_test.rb | 14 +++++-- 10 files changed, 114 insertions(+), 72 deletions(-) diff --git a/bin/benchmark b/bin/benchmark index f1c609ff6..c60831a99 100755 --- a/bin/benchmark +++ b/bin/benchmark @@ -34,6 +34,7 @@ ITERATIONS = 1000 CACHE_FILE_PATH = "/tmp/ruby_lsp_benchmark_results.json" def avg_bench(method, params) + message_queue = Thread::Queue.new results = (0...ITERATIONS).map do # Create a new store every time to prevent caching store = RubyLsp::Store.new @@ -43,12 +44,13 @@ def avg_bench(method, params) GC.disable result = Benchmark.measure do - RubyLsp::Executor.new(store).execute({ + RubyLsp::Executor.new(store, message_queue).execute({ method: method, params: params, }) end.utime GC.enable + message_queue.close result end diff --git a/lib/ruby_lsp/executor.rb b/lib/ruby_lsp/executor.rb index 515e75a34..54b6b8ec8 100644 --- a/lib/ruby_lsp/executor.rb +++ b/lib/ruby_lsp/executor.rb @@ -6,12 +6,12 @@ module RubyLsp class Executor extend T::Sig - sig { params(store: Store).void } - def initialize(store) + sig { params(store: Store, message_queue: Thread::Queue).void } + def initialize(store, message_queue) # Requests that mutate the store must be run sequentially! Parallel requests only receive a temporary copy of the # store @store = store - @messages = T.let([], T::Array[Message]) + @message_queue = message_queue end sig { params(request: T::Hash[Symbol, T.untyped]).returns(Result) } @@ -25,7 +25,7 @@ def execute(request) error = e end - Result.new(response: response, error: error, request_time: request_time, messages: @messages) + Result.new(response: response, error: error, request_time: request_time) end private @@ -43,7 +43,7 @@ def run(request) errored_extensions = Extension.extensions.select(&:error?) if errored_extensions.any? - @messages << Notification.new( + @message_queue << Notification.new( message: "window/showMessage", params: Interface::ShowMessageParams.new( type: Constant::MessageType::WARNING, @@ -63,7 +63,7 @@ def run(request) request.dig(:params, :textDocument, :version), ) when "textDocument/didClose" - @messages << Notification.new( + @message_queue << Notification.new( message: "textDocument/publishDiagnostics", params: Interface::PublishDiagnosticsParams.new(uri: uri, diagnostics: []), ) @@ -91,7 +91,7 @@ def run(request) begin formatting(uri) rescue Requests::Formatting::InvalidFormatter => error - @messages << Notification.new( + @message_queue << Notification.new( message: "window/showMessage", params: Interface::ShowMessageParams.new( type: Constant::MessageType::ERROR, @@ -101,7 +101,7 @@ def run(request) nil rescue StandardError => error - @messages << Notification.new( + @message_queue << Notification.new( message: "window/showMessage", params: Interface::ShowMessageParams.new( type: Constant::MessageType::ERROR, @@ -127,7 +127,7 @@ def run(request) begin diagnostic(uri) rescue StandardError => error - @messages << Notification.new( + @message_queue << Notification.new( message: "window/showMessage", params: Interface::ShowMessageParams.new( type: Constant::MessageType::ERROR, @@ -177,8 +177,8 @@ def hover(uri, position) end # Instantiate all listeners - base_listener = Requests::Hover.new - listeners = Requests::Hover.listeners.map(&:new) + base_listener = Requests::Hover.new(@message_queue) + listeners = Requests::Hover.listeners.map { |l| l.new(@message_queue) } # Emit events for all listeners T.unsafe(EventEmitter).new(base_listener, *listeners).emit_for_target(target) @@ -314,7 +314,7 @@ def code_action_resolve(params) case result when Requests::CodeActionResolve::Error::EmptySelection - @messages << Notification.new( + @message_queue << Notification.new( message: "window/showMessage", params: Interface::ShowMessageParams.new( type: Constant::MessageType::ERROR, @@ -323,7 +323,7 @@ def code_action_resolve(params) ) raise Requests::CodeActionResolve::CodeActionError when Requests::CodeActionResolve::Error::InvalidTargetRange - @messages << Notification.new( + @message_queue << Notification.new( message: "window/showMessage", params: Interface::ShowMessageParams.new( type: Constant::MessageType::ERROR, diff --git a/lib/ruby_lsp/listener.rb b/lib/ruby_lsp/listener.rb index 40400ba4a..2674a8888 100644 --- a/lib/ruby_lsp/listener.rb +++ b/lib/ruby_lsp/listener.rb @@ -14,6 +14,11 @@ class Listener abstract! + sig { params(message_queue: Thread::Queue).void } + def initialize(message_queue) + @message_queue = message_queue + end + class << self extend T::Sig diff --git a/lib/ruby_lsp/requests/hover.rb b/lib/ruby_lsp/requests/hover.rb index 86952059d..29352db83 100644 --- a/lib/ruby_lsp/requests/hover.rb +++ b/lib/ruby_lsp/requests/hover.rb @@ -35,10 +35,11 @@ class Hover < Listener sig { override.returns(ResponseType) } attr_reader :response - sig { void } - def initialize + sig { params(message_queue: Thread::Queue).void } + def initialize(message_queue) @response = T.let(nil, ResponseType) - super() + + super end # Merges responses from other hover listeners diff --git a/lib/ruby_lsp/server.rb b/lib/ruby_lsp/server.rb index 681a5accb..6073f4999 100644 --- a/lib/ruby_lsp/server.rb +++ b/lib/ruby_lsp/server.rb @@ -23,7 +23,32 @@ def initialize @jobs = T.let({}, T::Hash[T.any(String, Integer), Job]) @mutex = T.let(Mutex.new, Mutex) @worker = T.let(new_worker, Thread) - @current_request_id = T.let(1, Integer) + + # The messages queue includes requests and notifications to be sent to the client + @message_queue = T.let(Thread::Queue.new, Thread::Queue) + + # Create a thread to watch the messages queue and send them to the client + @message_dispatcher = T.let( + Thread.new do + current_request_id = 1 + + loop do + message = @message_queue.pop + break if message.nil? + + @mutex.synchronize do + case message + when Notification + @writer.write(method: message.message, params: message.params) + when Request + @writer.write(id: current_request_id, method: message.message, params: message.params) + current_request_id += 1 + end + end + end + end, + Thread, + ) Thread.main.priority = 1 end @@ -38,7 +63,7 @@ def start case request[:method] when "initialize", "initialized", "textDocument/didOpen", "textDocument/didClose", "textDocument/didChange", "textDocument/formatting", "textDocument/onTypeFormatting", "codeAction/resolve" - result = Executor.new(@store).execute(request) + result = Executor.new(@store, @message_queue).execute(request) finalize_request(result, request) when "$/cancelRequest" # Cancel the job if it's still in the queue @@ -46,6 +71,7 @@ def start when "shutdown" warn("Shutting down Ruby LSP...") + @message_queue.close # Close the queue so that we can no longer receive items @job_queue.close # Clear any remaining jobs so that the thread can terminate @@ -53,9 +79,10 @@ def start @jobs.clear # Wait until the thread is finished @worker.join + @message_dispatcher.join @store.clear - finalize_request(Result.new(response: nil, messages: []), request) + finalize_request(Result.new(response: nil), request) when "exit" # We return zero if shutdown has already been received or one otherwise as per the recommendation in the spec # https://microsoft.github.io/language-server-protocol/specification/#exit @@ -89,9 +116,9 @@ def new_worker result = if job.cancelled # We need to return nil to the client even if the request was cancelled - Result.new(response: nil, messages: []) + Result.new(response: nil) else - Executor.new(@store).execute(request) + Executor.new(@store, @message_queue).execute(request) end finalize_request(result, request) @@ -119,17 +146,6 @@ def finalize_request(result, request) @writer.write(id: request[:id], result: response) end - # If the response include any messages, go through them and publish each one - result.messages.each do |n| - case n - when Notification - @writer.write(method: n.message, params: n.params) - when Request - @writer.write(id: @current_request_id, method: n.message, params: n.params) - @current_request_id += 1 - end - end - request_time = result.request_time if request_time @writer.write(method: "telemetry/event", params: telemetry_params(request, request_time, error)) diff --git a/lib/ruby_lsp/utils.rb b/lib/ruby_lsp/utils.rb index 3942823a8..37212fce2 100644 --- a/lib/ruby_lsp/utils.rb +++ b/lib/ruby_lsp/utils.rb @@ -38,9 +38,6 @@ class Result sig { returns(T.untyped) } attr_reader :response - sig { returns(T::Array[Message]) } - attr_reader :messages - sig { returns(T.nilable(Exception)) } attr_reader :error @@ -50,14 +47,12 @@ class Result sig do params( response: T.untyped, - messages: T::Array[Message], error: T.nilable(Exception), request_time: T.nilable(Float), ).void end - def initialize(response:, messages:, error: nil, request_time: nil) + def initialize(response:, error: nil, request_time: nil) @response = response - @messages = messages @error = error @request_time = request_time end diff --git a/test/executor_test.rb b/test/executor_test.rb index d9eacb19f..39297864c 100644 --- a/test/executor_test.rb +++ b/test/executor_test.rb @@ -5,8 +5,13 @@ class ExecutorTest < Minitest::Test def setup - store = RubyLsp::Store.new - @executor = RubyLsp::Executor.new(store) + @store = RubyLsp::Store.new + @message_queue = Thread::Queue.new + @executor = RubyLsp::Executor.new(@store, @message_queue) + end + + def teardown + @message_queue.close end def test_initialize_enabled_features_with_array @@ -102,4 +107,38 @@ def test_initialize_uses_utf_16_if_no_encodings_are_specified # All features are enabled by default assert_includes("utf-16", hash.dig("capabilities", "positionEncoding")) end + + def test_rubocop_errors_push_window_notification + @executor.expects(:formatting).raises(StandardError, "boom").once + + @executor.execute({ + method: "textDocument/formatting", + params: { + textDocument: { uri: "file:///foo.rb" }, + }, + }) + + notification = T.must(@message_queue.pop) + assert_equal("window/showMessage", notification.message) + assert_equal( + "Formatting error: boom", + T.cast(notification.params, RubyLsp::Interface::ShowMessageParams).message, + ) + end + + def test_did_close_clears_diagnostics + @store.set(uri: "file:///foo.rb", source: "", version: 1) + @executor.execute({ + method: "textDocument/didClose", + params: { + textDocument: { uri: "file:///foo.rb" }, + }, + }) + + notification = T.must(@message_queue.pop) + assert_equal("textDocument/publishDiagnostics", notification.message) + assert_empty(T.cast(notification.params, RubyLsp::Interface::PublishDiagnosticsParams).diagnostics) + ensure + @store.delete("file:///foo.rb") + end end diff --git a/test/extension_test.rb b/test/extension_test.rb index 9b1451530..78d294e64 100644 --- a/test/extension_test.rb +++ b/test/extension_test.rb @@ -24,10 +24,13 @@ def teardown end def test_registering_an_extension_invokes_activate_on_initialized - Executor.new(RubyLsp::Store.new).execute({ method: "initialized" }) + message_queue = Thread::Queue.new + Executor.new(RubyLsp::Store.new, message_queue).execute({ method: "initialized" }) extension_instance = T.must(Extension.extensions.find { |ext| ext.is_a?(@extension) }) assert_predicate(extension_instance, :activated) + ensure + T.must(message_queue).close end def test_extensions_are_automatically_tracked diff --git a/test/integration_test.rb b/test/integration_test.rb index 76aeff5c8..8e193757a 100644 --- a/test/integration_test.rb +++ b/test/integration_test.rb @@ -238,9 +238,6 @@ def test_document_did_close assert_telemetry("textDocument/didOpen") assert(send_request("textDocument/didClose", { textDocument: { uri: "file://#{__FILE__}" } })) - - diagnostics = read_response("textDocument/publishDiagnostics") - assert_empty(diagnostics.dig(:params, :diagnostics), "Did not clear diagnostics after closing file") end def test_document_did_change @@ -328,30 +325,6 @@ def test_selection_ranges_with_syntax_error assert_nil(response[:error]) end - def test_incorrect_rubocop_configuration - initialize_lsp([]) - open_file_with("class Foo\nend") - - # Add an invalid cop to the configuration, but save a backup since we're modifying the real file - FileUtils.cp(".rubocop.yml", ".rubocop.yml.bak") - File.write(".rubocop.yml", "\nInvalidCop:\n Enabled: true", mode: "a") - - make_request("textDocument/diagnostic", { textDocument: { uri: "file://#{__FILE__}" } }) - read_response("textDocument/diagnostic") - response = read_response("window/showMessage") - - assert_equal("window/showMessage", response.dig(:method)) - assert_equal(LanguageServer::Protocol::Constant::MessageType::ERROR, response.dig(:params, :type)) - assert_equal( - "Error running diagnostics: unrecognized cop or department InvalidCop found in .rubocop.yml", - response.dig(:params, :message), - ) - ensure - # Restore the original configuration file - FileUtils.rm(".rubocop.yml") - FileUtils.mv(".rubocop.yml.bak", ".rubocop.yml") - end - def test_diagnostics initialize_lsp([]) open_file_with("class Foo\nend") diff --git a/test/requests/hover_expectations_test.rb b/test/requests/hover_expectations_test.rb index 156261c29..0f3f58f19 100644 --- a/test/requests/hover_expectations_test.rb +++ b/test/requests/hover_expectations_test.rb @@ -16,14 +16,17 @@ def assert_expectations(source, expected) end def test_search_index_being_nil + message_queue = Thread::Queue.new store = RubyLsp::Store.new store.set(uri: "file:///fake.rb", source: "belongs_to :foo", version: 1) RubyLsp::Requests::Support::RailsDocumentClient.stubs(search_index: nil) - RubyLsp::Executor.new(store).execute({ + RubyLsp::Executor.new(store, message_queue).execute({ method: "textDocument/hover", params: { textDocument: { uri: "file:///fake.rb" }, position: { line: 0, character: 0 } }, }).response + ensure + T.must(message_queue).close end class FakeHTTPResponse @@ -36,6 +39,7 @@ def initialize(code, body) end def run_expectations(source) + message_queue = Thread::Queue.new js_content = File.read(File.join(TEST_FIXTURES_DIR, "rails_search_index.js")) fake_response = FakeHTTPResponse.new("200", js_content) @@ -45,13 +49,16 @@ def run_expectations(source) store = RubyLsp::Store.new store.set(uri: "file:///fake.rb", source: source, version: 1) - RubyLsp::Executor.new(store).execute({ + RubyLsp::Executor.new(store, message_queue).execute({ method: "textDocument/hover", params: { textDocument: { uri: "file:///fake.rb" }, position: position }, }).response + ensure + T.must(message_queue).close end def test_after_request_hook + message_queue = Thread::Queue.new create_hover_hook_class js_content = File.read(File.join(TEST_FIXTURES_DIR, "rails_search_index.js")) fake_response = FakeHTTPResponse.new("200", js_content) @@ -64,7 +71,7 @@ class Post end RUBY - response = RubyLsp::Executor.new(store).execute({ + response = RubyLsp::Executor.new(store, message_queue).execute({ method: "textDocument/hover", params: { textDocument: { uri: "file:///fake.rb" }, position: { line: 1, character: 2 } }, }).response @@ -73,6 +80,7 @@ class Post assert_match("[Rails Document: `ActiveRecord::Associations::ClassMethods#belongs_to`]", response.contents.value) ensure RubyLsp::Requests::Hover.listeners.clear + T.must(message_queue).close end private