Skip to content

Commit

Permalink
routing: Add automatic failover (#65)
Browse files Browse the repository at this point in the history
This commit adds an optional automatic failover feature to the gem. If
the number of connection errors to ReadySet exceeds a preconfigured
threshold, ReadySet is considered to be unhealthy, all queries are
routed upstream, and a background task is spawned that periodically
checks ReadySet's health. When ReadySet is determined to be healthy
again, the task is stopped, and queries are routed back to ReadySet.

Closes #45
  • Loading branch information
ethan-readyset authored Jan 25, 2024
1 parent 59a6a1a commit 9841c19
Show file tree
Hide file tree
Showing 18 changed files with 977 additions and 54 deletions.
52 changes: 52 additions & 0 deletions lib/active_record/connection_adapters/readyset_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require 'active_record'
require 'active_record/connection_adapters/abstract_adapter'
require 'active_record/connection_adapters/postgresql_adapter'
require 'readyset/error'

module ActiveRecord
module ConnectionAdapters
# The ReadySet adapter is a proxy object that delegates all its methods to an inner
# PostgreSQLAdapter instance.
class ReadysetAdapter
ADAPTER_NAME = 'Readyset'.freeze

# Finds the root cause of the given error and includes the Readyset::Error module in that
# error's singleton class if the root cause was a `PG::Error`. This allows us to invoke
# `#is_a?` on the error to determine if the error came from a connection to ReadySet.
#
# @param e [Exception] the error whose cause should be annotated
# @return [void]
def self.annotate_error(e)
if e.cause
annotate_error(e.cause)
else
if e.is_a?(::PG::Error)
e.singleton_class.instance_eval do
include ::Readyset::Error
end
end
end

nil
end

def self.method_missing(...)
PostgreSQLAdapter.send(...)
rescue => e
annotate_error(e)
raise e
end

def initialize(pg_conn)
@inner = pg_conn
end

def method_missing(...)
@inner.send(...)
rescue => e
self.class.annotate_error(e)
raise e
end
end
end
end
18 changes: 18 additions & 0 deletions lib/active_record/readyset_connection_handling.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module ActiveRecord
# The methods in these modules are required for Rails to recognize our custom adapter
module ReadysetConnectionHandling
def readyset_adapter_class
ConnectionAdapters::ReadysetAdapter
end

def readyset_connection(config) # :nodoc:
pg_conn = postgresql_connection(config)
readyset_adapter_class.new(pg_conn)
rescue => e
readyset_adapter_class.annotate_error(e)
raise e
end
end
end
30 changes: 25 additions & 5 deletions lib/readyset.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# lib/readyset.rb

require 'active_record/connection_adapters/readyset_adapter'
require 'readyset/caches'
require 'readyset/configuration'
require 'readyset/controller_extension'
require 'readyset/health/healthchecker'
require 'readyset/model_extension'
require 'readyset/explain'
require 'readyset/query'
require 'readyset/query/cached_query'
require 'readyset/query/proxied_query'
require 'readyset/railtie' if defined?(Rails::Railtie)
require 'readyset/relation_extension'
require 'readyset/utils/window_counter'

# The Readyset module provides functionality to integrate ReadySet caching
# with Ruby on Rails applications.
Expand Down Expand Up @@ -116,12 +119,21 @@ def self.raw_query(query) # :nodoc:
# @yield a block whose queries should be routed to ReadySet.
# @return the value of the last line of the block.
def self.route(prevent_writes: true, &block)
if prevent_writes
ActiveRecord::Base.connected_to(role: reading_role, shard: shard, prevent_writes: true,
&block)
if healthchecker.healthy?
begin
if prevent_writes
ActiveRecord::Base.connected_to(role: reading_role, shard: shard, prevent_writes: true,
&block)
else
ActiveRecord::Base.connected_to(role: writing_role, shard: shard, prevent_writes: false,
&block)
end
rescue => e
healthchecker.process_exception(e)
raise e
end
else
ActiveRecord::Base.connected_to(role: writing_role, shard: shard, prevent_writes: false,
&block)
yield
end
end

Expand All @@ -132,6 +144,14 @@ class << self
private(*delegate(:shard, to: :configuration))
end

def self.healthchecker
@healthchecker ||= Readyset::Health::Healthchecker.new(
config.failover,
shard: shard,
)
end
private_class_method :healthchecker

# Returns the reading role for ActiveRecord connections.
# @return [Symbol] the reading role.
def self.reading_role
Expand Down
17 changes: 17 additions & 0 deletions lib/readyset/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,22 @@ def initialize
@migration_path = File.join(Rails.root, 'db/readyset_caches.rb')
@shard = :readyset
end

def failover
if @failover
@failover
else
inner = ActiveSupport::OrderedOptions.new
inner.enabled = false
inner.healthcheck_interval = 5.seconds
inner.error_window_period = 1.minute
inner.error_window_size = 10
@failover = inner
end
end

def hostname
ActiveRecord::Base.configurations.configs_for(name: shard.to_s).configuration_hash[:host]
end
end
end
3 changes: 3 additions & 0 deletions lib/readyset/error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module Readyset
module Error; end
end
127 changes: 127 additions & 0 deletions lib/readyset/health/healthchecker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
require 'net/http'
require 'uri'

require 'readyset/health/healthchecks'

module Readyset
module Health
# Processes the given exceptions to determine whether ReadySet is currently unhealthy. If
# ReadySet is indeed unhealthy, a background task is spawned that periodically checks
# ReadySet's health directly until a healthy state has been restored. While ReadySet is in an
# unhealthy state, `Healthchecker#healthy?` will return false.
class Healthchecker
UNHEALTHY_ERRORS = [::PG::UnableToSend, ::PG::ConnectionBad].freeze

def initialize(config, shard:)
@healthy = Concurrent::AtomicBoolean.new(true)
@healthcheck_interval = config.healthcheck_interval!
@healthchecks = Health::Healthchecks.new(shard: shard)
@lock = Mutex.new
@shard = shard
@window_counter = Readyset::Utils::WindowCounter.new(
window_size: config.error_window_size!,
time_period: config.error_window_period!,
)
end

# Returns true only if the connection to ReadySet is healthy. ReadySet's health is gauged by
# keeping track of the number of connection errors that have occurred over a given time
# period. If the number of errors in that time period exceeds the preconfigured threshold,
# ReadySet is considered to be unhealthy.
#
# @return [Boolean] whether ReadySet is healthy
def healthy?
healthy.true?
end

# Checks if the given exception is a connection error that occurred on a ReadySet connection,
# and if so, logs the error internally. If ReadySet is unhealthy, a background task is
# spawned that periodically tries to connect to ReadySet and check its status. When this task
# determines that ReadySet is healthy again, the task is shut down and the state of the
# healthchecker is switched back to "healthy".
#
# @param [Exception] the exception to be processed
def process_exception(exception)
is_readyset_connection_error = is_readyset_connection_error?(exception)
window_counter.log if is_readyset_connection_error

# We lock here to ensure that only one thread starts the healthcheck task
lock.lock
if healthy.true? && window_counter.threshold_crossed?
healthy.make_false
lock.unlock

logger.warn('ReadySet unhealthy: Routing queries to their original destination until ' \
'ReadySet becomes healthy again')

disconnect_readyset_pool!
task.execute
end
ensure
lock.unlock if lock.locked?
end

private

attr_reader :healthcheck_interval, :healthchecks, :healthy, :lock, :shard, :window_counter

def build_task
@task ||= Concurrent::TimerTask.new(execution_interval: healthcheck_interval) do |t|
if healthchecks.healthy?
# We disconnect the ReadySet connection pool here to ensure that any pre-existing
# connections to ReadySet are re-established. This fixes an issue where connections
# return "PQsocket() can't get socket descriptor" errors even after ReadySet comes
# back up. See this stackoverflow post for more details:
# https://stackoverflow.com/q/36582380
disconnect_readyset_pool!

# We need to disconnect the pool before making `healthy` true to ensure that, once we
# start routing queries back to ReadySet, they are using fresh connections
lock.synchronize { healthy.make_true }

logger.info('ReadySet healthy again')

# We clear out the window counter here to ensure that errors from ReadySet's previous
# unhealthy state don't bias the healthchecker towards determining that ReadySet is
# unhealthy after only a small number of new errors
window_counter.clear

t.shutdown
end
end

observer = Object.new.instance_eval do
def update(_time, _result, e)
logger.debug("ReadySet still unhealthy: #{e}") if e
end
end
task.add_observer(observer)

task
end

def disconnect_readyset_pool!
ActiveRecord::Base.connected_to(shard: shard) do
ActiveRecord::Base.connection_pool.disconnect!
end
end

def is_readyset_connection_error?(exception)
if exception.cause
is_readyset_connection_error?(exception.cause)
else
UNHEALTHY_ERRORS.any? { |e| exception.is_a?(e) } &&
exception.is_a?(Readyset::Error)
end
end

def logger
@logger ||= Rails.logger
end

def task
@task ||= build_task
end
end
end
end
41 changes: 41 additions & 0 deletions lib/readyset/health/healthchecks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
module Readyset
module Health
# Represents healthchecks that are run against ReadySet to determine whether ReadySet is in a
# state where it can serve queries.
class Healthchecks
def initialize(shard:)
@shard = shard
end

# Checks if ReadySet is healthy by invoking `SHOW READYSET STATUS` and checking if
# ReadySet is connected to the upstream database.
#
# @return [Boolean] whether ReadySet is healthy
def healthy?
connection.execute('SHOW READYSET STATUS').any? do |row|
row['name'] == 'Database Connection' && row['value'] == 'Connected'
end
rescue
false
end

private

attr_reader :shard

def connection
@connection ||= ActiveRecord::Base.connected_to(shard: shard) do
ActiveRecord::Base.retrieve_connection
end

# We reconnect with each healthcheck to ensure that connection state is not cached across
# uses
@connection.reconnect!

@connection
rescue
false
end
end
end
end
4 changes: 4 additions & 0 deletions lib/readyset/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# lib/readyset/railtie.rb

require 'active_record/readyset_connection_handling'

module Readyset
class Railtie < Rails::Railtie
initializer 'readyset.action_controller' do
Expand All @@ -11,6 +13,8 @@ class Railtie < Rails::Railtie
initializer 'readyset.active_record' do |app|
ActiveSupport.on_load(:active_record) do
ActiveRecord::Base.prepend(Readyset::ModelExtension)
ActiveRecord::Base.extend(ActiveRecord::ReadysetConnectionHandling)

ActiveRecord::Relation.prepend(Readyset::RelationExtension)
end
end
Expand Down
Loading

0 comments on commit 9841c19

Please sign in to comment.