diff --git a/lib/active_record/connection_adapters/readyset_adapter.rb b/lib/active_record/connection_adapters/readyset_adapter.rb new file mode 100644 index 0000000..6200810 --- /dev/null +++ b/lib/active_record/connection_adapters/readyset_adapter.rb @@ -0,0 +1,52 @@ +require 'active_record' +require 'active_record/connection_adapters/abstract_adapter' +require 'active_record/connection_adapters/postgresql_adapter' +require 'readyset/error' + +module ActiveRecord + module ConnectionAdapters + # The ReadySet adapter is a proxy object that delegates all its methods to an inner + # PostgreSQLAdapter instance. + class ReadysetAdapter + ADAPTER_NAME = 'Readyset'.freeze + + # Finds the root cause of the given error and includes the Readyset::Error module in that + # error's singleton class if the root cause was a `PG::Error`. This allows us to invoke + # `#is_a?` on the error to determine if the error came from a connection to ReadySet. + # + # @param e [Exception] the error whose cause should be annotated + # @return [void] + def self.annotate_error(e) + if e.cause + annotate_error(e.cause) + else + if e.is_a?(::PG::Error) + e.singleton_class.instance_eval do + include ::Readyset::Error + end + end + end + + nil + end + + def self.method_missing(...) + PostgreSQLAdapter.send(...) + rescue => e + annotate_error(e) + raise e + end + + def initialize(pg_conn) + @inner = pg_conn + end + + def method_missing(...) + @inner.send(...) + rescue => e + self.class.annotate_error(e) + raise e + end + end + end +end diff --git a/lib/active_record/readyset_connection_handling.rb b/lib/active_record/readyset_connection_handling.rb new file mode 100644 index 0000000..8d97470 --- /dev/null +++ b/lib/active_record/readyset_connection_handling.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module ActiveRecord + # The methods in these modules are required for Rails to recognize our custom adapter + module ReadysetConnectionHandling + def readyset_adapter_class + ConnectionAdapters::ReadysetAdapter + end + + def readyset_connection(config) # :nodoc: + pg_conn = postgresql_connection(config) + readyset_adapter_class.new(pg_conn) + rescue => e + readyset_adapter_class.annotate_error(e) + raise e + end + end +end diff --git a/lib/readyset.rb b/lib/readyset.rb index f9f744c..027fb91 100644 --- a/lib/readyset.rb +++ b/lib/readyset.rb @@ -1,8 +1,10 @@ # lib/readyset.rb +require 'active_record/connection_adapters/readyset_adapter' require 'readyset/caches' require 'readyset/configuration' require 'readyset/controller_extension' +require 'readyset/health/healthchecker' require 'readyset/model_extension' require 'readyset/explain' require 'readyset/query' @@ -10,6 +12,7 @@ require 'readyset/query/proxied_query' require 'readyset/railtie' if defined?(Rails::Railtie) require 'readyset/relation_extension' +require 'readyset/utils/window_counter' # The Readyset module provides functionality to integrate ReadySet caching # with Ruby on Rails applications. @@ -116,12 +119,21 @@ def self.raw_query(query) # :nodoc: # @yield a block whose queries should be routed to ReadySet. # @return the value of the last line of the block. def self.route(prevent_writes: true, &block) - if prevent_writes - ActiveRecord::Base.connected_to(role: reading_role, shard: shard, prevent_writes: true, - &block) + if healthchecker.healthy? + begin + if prevent_writes + ActiveRecord::Base.connected_to(role: reading_role, shard: shard, prevent_writes: true, + &block) + else + ActiveRecord::Base.connected_to(role: writing_role, shard: shard, prevent_writes: false, + &block) + end + rescue => e + healthchecker.process_exception(e) + raise e + end else - ActiveRecord::Base.connected_to(role: writing_role, shard: shard, prevent_writes: false, - &block) + yield end end @@ -132,6 +144,14 @@ class << self private(*delegate(:shard, to: :configuration)) end + def self.healthchecker + @healthchecker ||= Readyset::Health::Healthchecker.new( + config.failover, + shard: shard, + ) + end + private_class_method :healthchecker + # Returns the reading role for ActiveRecord connections. # @return [Symbol] the reading role. def self.reading_role diff --git a/lib/readyset/configuration.rb b/lib/readyset/configuration.rb index 038a952..84c74b1 100644 --- a/lib/readyset/configuration.rb +++ b/lib/readyset/configuration.rb @@ -9,5 +9,22 @@ def initialize @migration_path = File.join(Rails.root, 'db/readyset_caches.rb') @shard = :readyset end + + def failover + if @failover + @failover + else + inner = ActiveSupport::OrderedOptions.new + inner.enabled = false + inner.healthcheck_interval = 5.seconds + inner.error_window_period = 1.minute + inner.error_window_size = 10 + @failover = inner + end + end + + def hostname + ActiveRecord::Base.configurations.configs_for(name: shard.to_s).configuration_hash[:host] + end end end diff --git a/lib/readyset/error.rb b/lib/readyset/error.rb new file mode 100644 index 0000000..5dff548 --- /dev/null +++ b/lib/readyset/error.rb @@ -0,0 +1,3 @@ +module Readyset + module Error; end +end diff --git a/lib/readyset/health/healthchecker.rb b/lib/readyset/health/healthchecker.rb new file mode 100644 index 0000000..6d092a2 --- /dev/null +++ b/lib/readyset/health/healthchecker.rb @@ -0,0 +1,127 @@ +require 'net/http' +require 'uri' + +require 'readyset/health/healthchecks' + +module Readyset + module Health + # Processes the given exceptions to determine whether ReadySet is currently unhealthy. If + # ReadySet is indeed unhealthy, a background task is spawned that periodically checks + # ReadySet's health directly until a healthy state has been restored. While ReadySet is in an + # unhealthy state, `Healthchecker#healthy?` will return false. + class Healthchecker + UNHEALTHY_ERRORS = [::PG::UnableToSend, ::PG::ConnectionBad].freeze + + def initialize(config, shard:) + @healthy = Concurrent::AtomicBoolean.new(true) + @healthcheck_interval = config.healthcheck_interval! + @healthchecks = Health::Healthchecks.new(shard: shard) + @lock = Mutex.new + @shard = shard + @window_counter = Readyset::Utils::WindowCounter.new( + window_size: config.error_window_size!, + time_period: config.error_window_period!, + ) + end + + # Returns true only if the connection to ReadySet is healthy. ReadySet's health is gauged by + # keeping track of the number of connection errors that have occurred over a given time + # period. If the number of errors in that time period exceeds the preconfigured threshold, + # ReadySet is considered to be unhealthy. + # + # @return [Boolean] whether ReadySet is healthy + def healthy? + healthy.true? + end + + # Checks if the given exception is a connection error that occurred on a ReadySet connection, + # and if so, logs the error internally. If ReadySet is unhealthy, a background task is + # spawned that periodically tries to connect to ReadySet and check its status. When this task + # determines that ReadySet is healthy again, the task is shut down and the state of the + # healthchecker is switched back to "healthy". + # + # @param [Exception] the exception to be processed + def process_exception(exception) + is_readyset_connection_error = is_readyset_connection_error?(exception) + window_counter.log if is_readyset_connection_error + + # We lock here to ensure that only one thread starts the healthcheck task + lock.lock + if healthy.true? && window_counter.threshold_crossed? + healthy.make_false + lock.unlock + + logger.warn('ReadySet unhealthy: Routing queries to their original destination until ' \ + 'ReadySet becomes healthy again') + + disconnect_readyset_pool! + task.execute + end + ensure + lock.unlock if lock.locked? + end + + private + + attr_reader :healthcheck_interval, :healthchecks, :healthy, :lock, :shard, :window_counter + + def build_task + @task ||= Concurrent::TimerTask.new(execution_interval: healthcheck_interval) do |t| + if healthchecks.healthy? + # We disconnect the ReadySet connection pool here to ensure that any pre-existing + # connections to ReadySet are re-established. This fixes an issue where connections + # return "PQsocket() can't get socket descriptor" errors even after ReadySet comes + # back up. See this stackoverflow post for more details: + # https://stackoverflow.com/q/36582380 + disconnect_readyset_pool! + + # We need to disconnect the pool before making `healthy` true to ensure that, once we + # start routing queries back to ReadySet, they are using fresh connections + lock.synchronize { healthy.make_true } + + logger.info('ReadySet healthy again') + + # We clear out the window counter here to ensure that errors from ReadySet's previous + # unhealthy state don't bias the healthchecker towards determining that ReadySet is + # unhealthy after only a small number of new errors + window_counter.clear + + t.shutdown + end + end + + observer = Object.new.instance_eval do + def update(_time, _result, e) + logger.debug("ReadySet still unhealthy: #{e}") if e + end + end + task.add_observer(observer) + + task + end + + def disconnect_readyset_pool! + ActiveRecord::Base.connected_to(shard: shard) do + ActiveRecord::Base.connection_pool.disconnect! + end + end + + def is_readyset_connection_error?(exception) + if exception.cause + is_readyset_connection_error?(exception.cause) + else + UNHEALTHY_ERRORS.any? { |e| exception.is_a?(e) } && + exception.is_a?(Readyset::Error) + end + end + + def logger + @logger ||= Rails.logger + end + + def task + @task ||= build_task + end + end + end +end diff --git a/lib/readyset/health/healthchecks.rb b/lib/readyset/health/healthchecks.rb new file mode 100644 index 0000000..ac68748 --- /dev/null +++ b/lib/readyset/health/healthchecks.rb @@ -0,0 +1,41 @@ +module Readyset + module Health + # Represents healthchecks that are run against ReadySet to determine whether ReadySet is in a + # state where it can serve queries. + class Healthchecks + def initialize(shard:) + @shard = shard + end + + # Checks if ReadySet is healthy by invoking `SHOW READYSET STATUS` and checking if + # ReadySet is connected to the upstream database. + # + # @return [Boolean] whether ReadySet is healthy + def healthy? + connection.execute('SHOW READYSET STATUS').any? do |row| + row['name'] == 'Database Connection' && row['value'] == 'Connected' + end + rescue + false + end + + private + + attr_reader :shard + + def connection + @connection ||= ActiveRecord::Base.connected_to(shard: shard) do + ActiveRecord::Base.retrieve_connection + end + + # We reconnect with each healthcheck to ensure that connection state is not cached across + # uses + @connection.reconnect! + + @connection + rescue + false + end + end + end +end diff --git a/lib/readyset/railtie.rb b/lib/readyset/railtie.rb index 1949b1d..1653dad 100644 --- a/lib/readyset/railtie.rb +++ b/lib/readyset/railtie.rb @@ -1,5 +1,7 @@ # lib/readyset/railtie.rb +require 'active_record/readyset_connection_handling' + module Readyset class Railtie < Rails::Railtie initializer 'readyset.action_controller' do @@ -11,6 +13,8 @@ class Railtie < Rails::Railtie initializer 'readyset.active_record' do |app| ActiveSupport.on_load(:active_record) do ActiveRecord::Base.prepend(Readyset::ModelExtension) + ActiveRecord::Base.extend(ActiveRecord::ReadysetConnectionHandling) + ActiveRecord::Relation.prepend(Readyset::RelationExtension) end end diff --git a/lib/readyset/utils/window_counter.rb b/lib/readyset/utils/window_counter.rb new file mode 100644 index 0000000..3b6cb57 --- /dev/null +++ b/lib/readyset/utils/window_counter.rb @@ -0,0 +1,58 @@ +module Readyset + module Utils + # Keeps track of events that occur over time to see if the number of logged events exceeds a + # preconfigured threshold in a preconfigured window of time. For example, if `window_size` is + # 10 and `time_period` is 1 minute, the number of events logged in the last minute must exceed + # 10 in order for `WindowCounter#threshold_crossed?` to return true. + class WindowCounter + def initialize(window_size: 10, time_period: 1.minute) + @lock = Mutex.new + @time_period = time_period + @times = [] + @window_size = window_size + end + + delegate :clear, to: :times + + # Logs a new event + def log + lock.synchronize do + remove_times_out_of_threshold! + times << Time.zone.now + end + + nil + end + + # Returns the current number of events logged in the configured `time_period` + # + # @return [Integer] + def size + lock.synchronize do + remove_times_out_of_threshold! + times.size + end + end + + # Returns true only if the number of events logged in the configured `time_period` has + # exceeded the configured `window_size`. + # + # @return [Boolean] + def threshold_crossed? + lock.synchronize do + remove_times_out_of_threshold! + times.size > window_size + end + end + + private + + attr_reader :lock, :time_period, :times, :window_size + + def remove_times_out_of_threshold! + times.select! { |time| time >= time_period.ago } + nil + end + end + end +end diff --git a/readyset.gemspec b/readyset.gemspec index 479326e..7dfc27e 100644 --- a/readyset.gemspec +++ b/readyset.gemspec @@ -35,6 +35,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'activerecord', '>= 6.1' spec.add_dependency 'activesupport', '>= 6.1' spec.add_dependency 'colorize', '~> 1.1' + spec.add_dependency 'concurrent-ruby', '~> 1.2' spec.add_dependency 'progressbar', '~> 1.13' spec.add_dependency 'rake', '~> 13.0' @@ -45,4 +46,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec', '~> 3.2' spec.add_development_dependency 'rspec-rails', '~> 6.0' spec.add_development_dependency 'rubocop-airbnb' + spec.add_development_dependency 'timecop', '~> 0.9' end diff --git a/spec/active_record/connection_adapters/readyset_adapter_spec.rb b/spec/active_record/connection_adapters/readyset_adapter_spec.rb new file mode 100644 index 0000000..b57be8d --- /dev/null +++ b/spec/active_record/connection_adapters/readyset_adapter_spec.rb @@ -0,0 +1,150 @@ +require 'spec_helper' + +require 'active_record/connection_adapters/postgresql_adapter' + +RSpec.describe ActiveRecord::ConnectionAdapters::ReadysetAdapter do + describe '.annotate_error' do + context 'when the root cause of the error is a PG::Error' do + it "includes the Readyset::Error module into the root cause's singleton class" do + root_error = PG::Error.new + error = build_error_with_root_cause(root_error) + + ActiveRecord::ConnectionAdapters::ReadysetAdapter.annotate_error(error) + + expect(root_error).to be_a(Readyset::Error) + end + end + + context 'when the root cause of the error is not a PG::Error' do + it "does not include the Readyset::Error module into the root cause's singleton class" do + root_error = StandardError.new + error = build_error_with_root_cause(root_error) + + ActiveRecord::ConnectionAdapters::ReadysetAdapter.annotate_error(error) + + expect(root_error).not_to be_a(Readyset::Error) + end + end + + def build_error_with_root_cause(root_cause) + begin + begin + raise root_cause + rescue + raise NoMethodError + end + rescue + raise ArgumentError + end + rescue => e + e + end + end + + describe '.method_missing' do + it 'delegates class methods to PostgreSQLAdapter' do + config = instance_double(Hash) + allow(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter).to receive(:database_exists?). + with(config).and_return(true) + + result = ActiveRecord::ConnectionAdapters::ReadysetAdapter.database_exists?(config) + + expect(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter). + to have_received(:database_exists?).with(config) + expect(result).to eq(true) + end + + context 'when the method raises an error' do + context 'when the error is a PG::Error' do + it 'annotates the singleton class of the root cause of the error with the ' \ + 'Readyset::Error module' do + config = instance_double(Hash) + allow(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter).to receive(:database_exists?). + with(config).and_raise(PG::ConnectionBad) + + result = begin + ActiveRecord::ConnectionAdapters::ReadysetAdapter.database_exists?(config) + rescue => e + e + end + + expect(result).to be_a(PG::ConnectionBad) + expect(result).to be_a(Readyset::Error) + end + end + + context 'when the error is not a PG::Error' do + it 'does not annotate the singleton class of the root cause of the error with the' \ + 'Readyset::Error module' do + config = instance_double(Hash) + allow(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter).to receive(:database_exists?). + with(config).and_raise(StandardError) + + result = begin + ActiveRecord::ConnectionAdapters::ReadysetAdapter.database_exists?(config) + rescue => e + e + end + + expect(result).to be_a(StandardError) + expect(result).not_to be_a(Readyset::Error) + end + end + end + end + + describe '#method_missing' do + it 'delegates instance methods to an inner PostgreSQLAdapter instance' do + pg_adapter = instance_double(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + readyset_adapter = ActiveRecord::ConnectionAdapters::ReadysetAdapter.new(pg_adapter) + query = 'SELECT * FROM t WHERE x = 1' + expected_result = instance_double(PG::Result) + allow(pg_adapter).to receive(:exec_query).with(query).and_return(expected_result) + + result = readyset_adapter.exec_query(query) + + expect(pg_adapter).to have_received(:exec_query).with(query) + expect(result).to eq(expected_result) + end + + context 'when the method raises an error' do + context 'when the error is a PG::Error' do + it 'annotates the singleton class of the root cause of the error with the ' \ + 'Readyset::Error module' do + pg_adapter = instance_double(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + readyset_adapter = ActiveRecord::ConnectionAdapters::ReadysetAdapter.new(pg_adapter) + query = 'SELECT * FROM t WHERE x = 1' + allow(pg_adapter).to receive(:exec_query).with(query).and_raise(PG::ConnectionBad) + + result = begin + readyset_adapter.exec_query(query) + rescue => e + e + end + + expect(result).to be_a(PG::ConnectionBad) + expect(result).to be_a(Readyset::Error) + end + end + + context 'when the error is not a PG::Error' do + it 'does not annotate the singleton class of the root cause of the error with the' \ + 'Readyset::Error module' do + pg_adapter = instance_double(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + readyset_adapter = ActiveRecord::ConnectionAdapters::ReadysetAdapter.new(pg_adapter) + query = 'SELECT * FROM t WHERE x = 1' + allow(pg_adapter).to receive(:exec_query).with(query).and_raise(StandardError) + + result = begin + readyset_adapter.exec_query(query) + rescue => e + e + end + + expect(result).to be_a(StandardError) + expect(result).not_to be_a(Readyset::Error) + end + end + end + end +end diff --git a/spec/active_record/readyset_connection_handling_spec.rb b/spec/active_record/readyset_connection_handling_spec.rb new file mode 100644 index 0000000..deb99ee --- /dev/null +++ b/spec/active_record/readyset_connection_handling_spec.rb @@ -0,0 +1,66 @@ +RSpec.describe ActiveRecord::ReadysetConnectionHandling do + class ConnectionHandler + include ActiveRecord::ReadysetConnectionHandling + end + + describe '#readyset_adapter_class' do + subject { ConnectionHandler.new.readyset_adapter_class } + + it { should eq ActiveRecord::ConnectionAdapters::ReadysetAdapter } + end + + describe '#readyset_connection' do + context 'when the creation of the underlying Postgres connection raises an error' do + context 'when the error is a PG::Error' do + it 'annotates the singleton class of the root cause of the error with the ' \ + 'Readyset::Error module' do + handler = ConnectionHandler.new + config = instance_double(Hash) + allow(handler).to receive(:postgresql_connection).with(config). + and_raise(PG::ConnectionBad) + + result = begin + handler.readyset_connection(config) + rescue => e + e + end + + expect(result).to be_a(PG::ConnectionBad) + expect(result).to be_a(Readyset::Error) + end + end + + context 'when the error is not a PG::Error' do + it 'does not annotate the singleton class of the root cause of the error with the' \ + 'Readyset::Error module' do + handler = ConnectionHandler.new + config = instance_double(Hash) + allow(handler).to receive(:postgresql_connection).with(config).and_raise(StandardError) + + result = begin + handler.readyset_connection(config) + rescue => e + e + end + + expect(result).to be_a(StandardError) + expect(result).not_to be_a(Readyset::Error) + end + end + end + + context "when the creation of the underlying Postgres connection doesn't raise an error" do + it 'creates a new Postgres connection and returns a Readyset connection that wraps it' do + handler = ConnectionHandler.new + config = instance_double(Hash) + pg_conn = instance_double(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + allow(handler).to receive(:postgresql_connection).with(config).and_return(pg_conn) + + result = handler.readyset_connection(config) + + expect(handler).to have_received(:postgresql_connection).with(config) + expect(result).to be_a(ActiveRecord::ConnectionAdapters::ReadysetAdapter) + end + end + end +end diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index e20d512..e13ae2a 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -16,5 +16,25 @@ expected = File.join(Rails.root, 'db/readyset_caches.rb') expect(config.migration_path).to eq(expected) end + + it 'initializes failover.enabled with false' do + config = Readyset::Configuration.new + expect(config.failover.enabled).to eq(false) + end + + it 'initializes failover.healthcheck_interval to be 5 seconds' do + config = Readyset::Configuration.new + expect(config.failover.healthcheck_interval).to eq(5.seconds) + end + + it 'initializes failover.error_window_period to be 1 minute' do + config = Readyset::Configuration.new + expect(config.failover.error_window_period).to eq(1.minute) + end + + it 'initializes failover.error_window_size to be 10' do + config = Readyset::Configuration.new + expect(config.failover.error_window_size).to eq(10) + end end end diff --git a/spec/health/healthchecker_spec.rb b/spec/health/healthchecker_spec.rb new file mode 100644 index 0000000..262506e --- /dev/null +++ b/spec/health/healthchecker_spec.rb @@ -0,0 +1,155 @@ +require 'spec_helper' + +RSpec.describe Readyset::Health::Healthchecker do + describe '.new' do + it 'initializes the state of the system to be healthy' do + healthchecker = build_healthchecker + + result = healthchecker.healthy? + + expect(result).to eq(true) + end + end + + describe '#healthy?' do + context 'when the error threshold has not been crossed' do + it 'returns true' do + healthchecker = build_healthchecker + (error_window_size + 1).times do + healthchecker.process_exception(readyset_error) + Timecop.travel(Time.now + 10.seconds) + end + + result = healthchecker.healthy? + + expect(result).to eq(true) + end + end + + context 'when the error threshold has been crossed' do + it 'returns false until the healthchecks are run again and then returns true' do + interval = 0.05.seconds + healthchecker = build_healthchecker(healthcheck_interval: interval) + allow(healthchecker.send(:healthchecks)).to receive(:healthy?).and_return(true) + (error_window_size + 1).times { healthchecker.process_exception(readyset_error) } + + first_result = healthchecker.healthy? + sleep(interval * 2) + second_result = healthchecker.healthy? + + expect(first_result).to eq(false) + expect(second_result).to eq(true) + end + end + end + + describe '#process_exception' do + context 'when the error threshold has not been crossed' do + it 'does not set the state of the healthchecker to be unhealthy' do + healthchecker = setup + + error_window_size.times { healthchecker.process_exception(readyset_error) } + + result = healthchecker.healthy? + expect(result).to eq(true) + end + + it 'does not disconnect the ReadySet connection pool' do + healthchecker = setup + allow(readyset_pool).to receive(:disconnect!) + + error_window_size.times { healthchecker.process_exception(readyset_error) } + + expect(readyset_pool).not_to have_received(:disconnect!) + end + + it 'does not execute the task' do + healthchecker = setup + + error_window_size.times { healthchecker.process_exception(readyset_error) } + + expect(healthchecker.send(:task)).not_to have_received(:execute) + end + end + + context 'when the error threshold has been crossed' do + it 'sets the state of the healthchecker to be unhealthy' do + healthchecker = setup + + (error_window_size + 1).times { healthchecker.process_exception(readyset_error) } + + result = healthchecker.healthy? + expect(result).to eq(false) + end + + it 'disconnects the ReadySet connection pool' do + healthchecker = setup + allow(readyset_pool).to receive(:disconnect!) + + (error_window_size + 1).times { healthchecker.process_exception(readyset_error) } + + expect(readyset_pool).to have_received(:disconnect!) + end + + it 'executes the task' do + healthchecker = setup + + (error_window_size + 1).times { healthchecker.process_exception(readyset_error) } + + expect(healthchecker.send(:task)).to have_received(:execute) + end + + it 'clears the window counter when the healthchecks indicate that ReadySet is healthy ' \ + 'again' do + interval = 0.05.seconds + healthchecker = build_healthchecker(healthcheck_interval: interval) + allow(healthchecker.send(:healthchecks)).to receive(:healthy?).and_return(true) + + (error_window_size + 1).times { healthchecker.process_exception(readyset_error) } + sleep(interval * 2) + + expect(healthchecker.send(:window_counter).size).to eq(0) + end + end + + def readyset_pool + ActiveRecord::Base.connected_to(shard: Readyset.config.shard) do + ActiveRecord::Base.connection_pool + end + end + + def setup + healthchecker = build_healthchecker + allow(healthchecker.send(:task)).to receive(:execute) + + healthchecker + end + end + + private + + def build_healthchecker(healthcheck_interval: 5.seconds) + config = ActiveSupport::OrderedOptions.new + config.healthcheck_interval = healthcheck_interval + config.error_window_period = error_window_period + config.error_window_size = error_window_size + + Readyset::Health::Healthchecker.new(config, shard: :readyset) + end + + def error_window_period + 30.seconds + end + + def error_window_size + 3 + end + + def readyset_error + @readyset_error ||= PG::ConnectionBad.new.tap do |error| + error.singleton_class.instance_eval do + include Readyset::Error + end + end + end +end diff --git a/spec/health/healthchecks_spec.rb b/spec/health/healthchecks_spec.rb new file mode 100644 index 0000000..c00d110 --- /dev/null +++ b/spec/health/healthchecks_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + +RSpec.describe Readyset::Health::Healthchecks do + describe '#healthy?' do + it 'reconnects the connection with every invocation' do + healthchecks = Readyset::Health::Healthchecks.new(shard: :readyset) + connection = stub_connection + allow(connection).to receive(:reconnect!) + allow(connection).to receive(:execute).with('SHOW READYSET STATUS'). + and_return([{ 'name' => 'Database Connection', 'value' => 'Connected' }]) + + healthchecks.healthy? + + expect(connection).to have_received(:reconnect!) + end + + context 'when an error is thrown in the context of the method' do + it 'returns false' do + healthchecks = Readyset::Health::Healthchecks.new(shard: :readyset) + connection = stub_connection + allow(connection).to receive(:reconnect!) + allow(connection).to receive(:execute).with('SHOW READYSET STATUS'). + and_raise(StandardError) + + result = healthchecks.healthy? + + expect(result).to eq(false) + end + end + + context "when the status of ReadySet's database connection is something other than " \ + '"Connected"' do + it 'returns false' do + healthchecks = Readyset::Health::Healthchecks.new(shard: :readyset) + connection = stub_connection + allow(connection).to receive(:reconnect!) + allow(connection).to receive(:execute).with('SHOW READYSET STATUS'). + and_return([{ 'name' => 'Database Connection', 'value' => 'Not Connected' }]) + + result = healthchecks.healthy? + + expect(result).to eq(false) + end + end + + context "when the status of ReadySet's database connection is \"Connected\"" do + it 'returns true' do + healthchecks = Readyset::Health::Healthchecks.new(shard: :readyset) + connection = stub_connection + allow(connection).to receive(:reconnect!) + allow(connection).to receive(:execute).with('SHOW READYSET STATUS'). + and_return([{ 'name' => 'Database Connection', 'value' => 'Connected' }]) + + result = healthchecks.healthy? + + expect(result).to eq(true) + end + end + + def stub_connection + pg_conn = instance_double(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + conn = ActiveRecord::ConnectionAdapters::ReadysetAdapter.new(pg_conn) + allow(ActiveRecord::Base).to receive(:connected_to).with(shard: :readyset).and_yield + allow(ActiveRecord::Base).to receive(:retrieve_connection).and_return(conn) + + conn + end + end +end diff --git a/spec/ready_set_spec.rb b/spec/ready_set_spec.rb index c722ead..68b5b32 100644 --- a/spec/ready_set_spec.rb +++ b/spec/ready_set_spec.rb @@ -211,78 +211,118 @@ end describe '.route' do - context 'when prevent_writes is true' do - context 'when the block contains a write query' do - it 'raises an ActiveRecord::ReadOnlyError' do - expect { Readyset.route(prevent_writes: true) { create(:cat) } }. - to raise_error(ActiveRecord::ReadOnlyError) - end + context 'when the healthchecker reports that ReadySet is healthy' do + before do + healthchecker = Readyset.send(:healthchecker) + allow(healthchecker).to receive(:healthy?).and_return(true) end - context 'when the block contains a read query' do - it 'returns the result of the block' do - expected_cat = create(:cat) - - cat = Readyset.route(prevent_writes: true) do - Cat.find(expected_cat.id) + context 'when an exception is raised during the execution of the block' do + it 'passes the exception to the healthchecker' do + error = StandardError.new + allow(Readyset.send(:healthchecker)).to receive(:process_exception).with(error) + begin + Readyset.route do + raise error + end + rescue end - expect(cat).to eq(expected_cat) + expect(Readyset.send(:healthchecker)).to have_received(:process_exception).with(error) + end + + it 're-raises the error' do + expect { Readyset.route { raise StandardError } }.to raise_error(StandardError) + end + end + + context 'when prevent_writes is true' do + context 'when the block contains a write query' do + it 'raises an ActiveRecord::ReadOnlyError' do + expect { Readyset.route(prevent_writes: true) { create(:cat) } }. + to raise_error(ActiveRecord::ReadOnlyError) + end end - it 'executes the query against ReadySet' do - expected_cache = build_and_create_cache(:cached_query) + context 'when the block contains a read query' do + it 'returns the result of the block' do + expected_cat = create(:cat) + + cat = Readyset.route(prevent_writes: true) do + Cat.find(expected_cat.id) + end - result = Readyset.route(prevent_writes: true) do - ActiveRecord::Base.connection.execute('SHOW CACHES').to_a + expect(cat).to eq(expected_cat) end - expect(result.size).to eq(1) - cache = Readyset::Query::CachedQuery. - send(:from_readyset_result, **result.first.symbolize_keys) - expect(cache).to eq(expected_cache) + it 'executes the query against ReadySet' do + expected_cache = build_and_create_cache(:cached_query) + + result = Readyset.route(prevent_writes: true) do + ActiveRecord::Base.connection.execute('SHOW CACHES').to_a + end + + expect(result.size).to eq(1) + cache = Readyset::Query::CachedQuery. + send(:from_readyset_result, **result.first.symbolize_keys) + expect(cache).to eq(expected_cache) + end end end - end - context 'when prevent_writes is false' do - context 'when the block contains a write query' do - it 'returns the result of the block' do - result = Readyset.route(prevent_writes: false) do - create(:cat) - 'test' + context 'when prevent_writes is false' do + context 'when the block contains a write query' do + it 'returns the result of the block' do + result = Readyset.route(prevent_writes: false) do + create(:cat) + 'test' + end + + expect(result).to eq('test') end - expect(result).to eq('test') - end + it 'executes the write against ReadySet' do + proxied_query = build(:proxied_query) - it 'executes the write against ReadySet' do - proxied_query = build(:proxied_query) + Readyset.route(prevent_writes: false) do + sanitized = ActiveRecord::Base. + sanitize_sql_array(['CREATE CACHE FROM %s', proxied_query.text]) + ActiveRecord::Base.connection.execute(sanitized) + end - Readyset.route(prevent_writes: false) do - sanitized = ActiveRecord::Base. - sanitize_sql_array(['CREATE CACHE FROM %s', proxied_query.text]) - ActiveRecord::Base.connection.execute(sanitized) + expected_cache = build(:cached_query) + cache = Readyset::Query::CachedQuery.find(expected_cache.id) + expect(cache).to eq(expected_cache) end + end + + context 'when the block contains a read query' do + it 'executes the read against ReadySet' do + expected_cache = build_and_create_cache(:cached_query) + + results = Readyset.route(prevent_writes: false) do + ActiveRecord::Base.connection.execute('SHOW CACHES').to_a + end - expected_cache = build(:cached_query) - cache = Readyset::Query::CachedQuery.find(expected_cache.id) - expect(cache).to eq(expected_cache) + cache = Readyset::Query::CachedQuery. + send(:from_readyset_result, **results.first.symbolize_keys) + expect(cache).to eq(expected_cache) + end end end + end - context 'when the block contains a read query' do - it 'executes the read against ReadySet' do - expected_cache = build_and_create_cache(:cached_query) + context 'when the healthchecker reports that ReadySet is unhealthy' do + before do + healthchecker = Readyset.send(:healthchecker) + allow(healthchecker).to receive(:healthy?).and_return(false) + end - results = Readyset.route(prevent_writes: false) do - ActiveRecord::Base.connection.execute('SHOW CACHES').to_a - end + it 'routes queries to their original destination' do + Readyset.route(prevent_writes: false) { Cat.where(name: 'whiskers') } - cache = Readyset::Query::CachedQuery. - send(:from_readyset_result, **results.first.symbolize_keys) - expect(cache).to eq(expected_cache) - end + proxied_queries = Readyset::Query::ProxiedQuery.all + expect(proxied_queries).to be_empty end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index a2a4b61..d08dc98 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,6 +6,8 @@ require 'combustion' require 'factory_bot' +require 'timecop' + require_relative 'shared_examples' Combustion.initialize! :action_controller, :active_record, database_reset: false do diff --git a/spec/utils/window_counter_spec.rb b/spec/utils/window_counter_spec.rb new file mode 100644 index 0000000..190540c --- /dev/null +++ b/spec/utils/window_counter_spec.rb @@ -0,0 +1,79 @@ +require 'spec_helper' + +RSpec.describe Readyset::Utils::WindowCounter do + describe '#log' do + it 'removes existing times that are outside of the window' do + window_counter = build_window_counter + window_counter.log + future_time = Time.now + time_period + + Timecop.freeze(future_time) do + window_counter.log + end + + expect(window_counter.send(:times)).to eq([future_time]) + end + + it 'logs a new time to the running window of times' do + window_counter = build_window_counter + time = Time.now + + Timecop.freeze(time) do + window_counter.log + end + + expect(window_counter.send(:times)).to eq([time]) + end + end + + describe '#threshold_crossed?' do + it 'removes existing times that are outside of the window' do + window_counter = build_window_counter + window_counter.log + future_time = Time.now + time_period + + Timecop.freeze(future_time) do + window_counter.threshold_crossed? + end + + expect(window_counter.size).to eq(0) + end + + context 'when the number of times logged in the given time period exceeds the window size' do + it 'returns true' do + window_counter = build_window_counter + (window_size + 1).times { window_counter.log } + + Timecop.freeze do + window_counter.threshold_crossed? + + expect(window_counter.threshold_crossed?).to eq(true) + end + end + end + + context 'when the number of times logged in the given time period does not exceed the ' \ + 'window size' do + it 'returns false' do + window_counter = build_window_counter + window_size.times { window_counter.log } + + window_counter.threshold_crossed? + + expect(window_counter.threshold_crossed?).to eq(false) + end + end + end + + def build_window_counter + Readyset::Utils::WindowCounter.new(window_size: window_size, time_period: time_period) + end + + def time_period + 1.minute + end + + def window_size + 3 + end +end