Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync release leadership on stop #871

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions spec/etcd_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ describe LavinMQ::Etcd do
fail "should not lose the leadership to #{new}"
when timeout(2.seconds)
end
lease.close
lease.release
end
end

Expand All @@ -75,13 +75,9 @@ describe LavinMQ::Etcd do
cluster.run do |etcds|
etcd = LavinMQ::Etcd.new(cluster.endpoints)
key = "foo/#{rand}"
lease = etcd.elect(key, "bar", 1)
lease = etcd.elect(key, "bar", ttl: 1)
etcds.first(2).each &.terminate(graceful: false)
select
when lease.receive?
when timeout(15.seconds)
fail "should lose the leadership"
end
lease.wait(15.seconds).should be_true, "should lose the leadership"
end
end

Expand All @@ -90,13 +86,9 @@ describe LavinMQ::Etcd do
cluster.run do |etcds|
etcd = LavinMQ::Etcd.new(cluster.endpoints)
key = "foo/#{rand}"
lease = etcd.elect(key, "bar", 1)
lease = etcd.elect(key, "bar", ttl: 1)
etcds.sample.terminate(graceful: false)
select
when lease.receive?
fail "should not lose the leadership"
when timeout(6.seconds)
end
lease.wait(6.seconds).should be_false, "should not lose the leadership"
end
end

Expand Down
32 changes: 28 additions & 4 deletions src/lavinmq/etcd.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "http/client"
require "wait_group"
require "json"
require "./logger"

module LavinMQ
class Etcd
Expand Down Expand Up @@ -78,9 +79,9 @@ module LavinMQ
end

# Campaign for an election
# Returns a Channel when elected leader,
# when the channel is closed the leadership is lost
def elect(name, value, ttl = 10) : Channel(Nil)
# Returns when elected leader
# Returns a `Leadership` instance
def elect(name, value, ttl = 10) : Leadership
channel = Channel(Nil).new
lease_id, ttl = lease_grant(ttl)
wg = WaitGroup.new(1)
Expand All @@ -103,7 +104,30 @@ module LavinMQ
end
election_campaign(name, value, lease_id)
wg.wait
channel
Leadership.new(self, lease_id, channel)
end

# Represents a holding a Leadership
# Can be revoked or wait until lost
class Leadership
def initialize(@etcd : Etcd, @lease_id : Int64, @lost_leadership_channel : Channel(Nil))
end

# Force release leadership
def release
@etcd.lease_revoke(@lease_id)
end

# Wait until looses leadership
# Returns true when lost leadership, false when timeout occured
def wait(timeout : Time::Span) : Bool
select
when @lost_leadership_channel.receive?
return true
when timeout(timeout)
return false
end
end
end

def elect_listen(name, &)
Expand Down
24 changes: 11 additions & 13 deletions src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ require "./server"
require "./http/http_server"
require "./in_memory_backend"
require "./data_dir_lock"
require "./etcd"

module LavinMQ
class Launcher
Log = LavinMQ::Log.for "launcher"
@tls_context : OpenSSL::SSL::Context::Server?
@first_shutdown_attempt = true
@data_dir_lock : DataDirLock?
@lease : Channel(Nil)?
@running = false
@closed = false
@leadership : Etcd::Leadership?

def initialize(@config : Config, replicator = Clustering::NoopServer.new, @lease = nil)
def initialize(@config : Config, replicator = Clustering::NoopServer.new, @leadership = nil)
print_environment_info
print_max_map_count
fd_limit = System.maximize_fd_limit
Expand All @@ -38,18 +39,15 @@ module LavinMQ
end

def run
@running = true
listen
SystemD.notify_ready
loop do
if lease = @lease
select
when lease.receive?
break unless @running
Log.warn { "Lost leadership lease" }
if leadership = @leadership
if leadership.wait(30.seconds)
Log.warn { "Lost leadership" }
stop
exit 1
when timeout(30.seconds)
else
@data_dir_lock.try &.poll
GC.collect
end
Expand All @@ -62,14 +60,14 @@ module LavinMQ
end

def stop
return unless @running
@running = false
return if @closed
@closed = true
Log.warn { "Stopping" }
SystemD.notify_stopping
@http_server.close rescue nil
@amqp_server.close rescue nil
@data_dir_lock.try &.release
@lease.try &.close
@leadership.try &.release
end

private def print_environment_info
Expand Down
Loading