From c13efdd386b5ad083b06eeda920b5e79f5dace71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 9 Dec 2024 12:24:17 +0100 Subject: [PATCH 1/3] Sync release leadership on stop --- spec/etcd_spec.cr | 10 +++------- src/lavinmq/etcd.cr | 32 ++++++++++++++++++++++++++++---- src/lavinmq/launcher.cr | 37 +++++++++++++++++-------------------- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index 57d5c4f4ca..8d20e89b20 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -66,7 +66,7 @@ describe LavinMQ::Etcd do fail "should not lose the leadership to #{new}" when timeout(2.seconds) end - lease.close + lease.release end end @@ -77,9 +77,7 @@ describe LavinMQ::Etcd do key = "foo/#{rand}" lease = etcd.elect(key, "bar", 1) etcds.first(2).each &.terminate(graceful: false) - select - when lease.receive? - when timeout(15.seconds) + lease.wait(15.seconds) do fail "should lose the leadership" end end @@ -92,10 +90,8 @@ describe LavinMQ::Etcd do key = "foo/#{rand}" lease = etcd.elect(key, "bar", 1) etcds.sample.terminate(graceful: false) - select - when lease.receive? + lease.wait(6.seconds) do fail "should not lose the leadership" - when timeout(6.seconds) end end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index c585325342..38963c2cbe 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -78,9 +78,9 @@ module LavinMQ end # Campaign for an election - # Returns a Channel when elected leader, - # when the channel is closed the leadership is lost - def elect(name, value, ttl = 10) : Channel(Nil) + # Returns when elected leader + # Returns a `Leadership` instance + def elect(name, value, ttl = 10) : Leadership channel = Channel(Nil).new lease_id, ttl = lease_grant(ttl) wg = WaitGroup.new(1) @@ -103,7 +103,31 @@ module LavinMQ end election_campaign(name, value, lease_id) wg.wait - channel + Leadership.new(self, lease_id, channel) + end + + # Represents a holding a Leadership + # Can be revoked or wait until lost + class Leadership + def initialize(@etcd : Etcd, @lease_id : Int64, @lost_leadership_channel : Channel(Nil)) + end + + # Force release leadership + def release + @etcd.lease_revoke(@lease_id) + end + + # Wait until looses leadership + def wait(timeout : Time::Span, &) : Nil + loop do + select + when @lost_leadership_channel.receive? + break + when timeout(timeout) + yield + end + end + end end def elect_listen(name, &) diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 7b769b8c36..59123a580b 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -6,6 +6,7 @@ require "./server" require "./http/http_server" require "./in_memory_backend" require "./data_dir_lock" +require "./etcd" module LavinMQ class Launcher @@ -13,10 +14,10 @@ module LavinMQ @tls_context : OpenSSL::SSL::Context::Server? @first_shutdown_attempt = true @data_dir_lock : DataDirLock? - @lease : Channel(Nil)? - @running = false + @closed = false + @leadership : Etcd::Leadership? - def initialize(@config : Config, replicator = Clustering::NoopServer.new, @lease = nil) + def initialize(@config : Config, replicator = Clustering::NoopServer.new, @leadership = nil) print_environment_info print_max_map_count fd_limit = System.maximize_fd_limit @@ -38,22 +39,18 @@ module LavinMQ end def run - @running = true listen SystemD.notify_ready - loop do - if lease = @lease - select - when lease.receive? - break unless @running - Log.warn { "Lost leadership lease" } - stop - exit 1 - when timeout(30.seconds) - @data_dir_lock.try &.poll - GC.collect - end - else + if leadership = @leadership + leadership.wait(30.seconds) do + @data_dir_lock.try &.poll + GC.collect + end + Log.warn { "Lost leadership" } + stop + exit 1 + else + loop do sleep 30.seconds @data_dir_lock.try &.poll GC.collect @@ -62,14 +59,14 @@ module LavinMQ end def stop - return unless @running - @running = false + return if @closed + @closed = true Log.warn { "Stopping" } SystemD.notify_stopping @http_server.close rescue nil @amqp_server.close rescue nil @data_dir_lock.try &.release - @lease.try &.close + @leadership.try &.release end private def print_environment_info From 8ef6506aabe1ad18d2089dfd2837a1c042fa4dfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 9 Dec 2024 23:44:52 +0100 Subject: [PATCH 2/3] fixup! Sync release leadership on stop --- spec/etcd_spec.cr | 4 ++-- src/lavinmq/etcd.cr | 16 ++++++++-------- src/lavinmq/launcher.cr | 21 +++++++++++---------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index 8d20e89b20..f540bab081 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -77,7 +77,7 @@ describe LavinMQ::Etcd do key = "foo/#{rand}" lease = etcd.elect(key, "bar", 1) etcds.first(2).each &.terminate(graceful: false) - lease.wait(15.seconds) do + unless lease.wait(15.seconds) fail "should lose the leadership" end end @@ -90,7 +90,7 @@ describe LavinMQ::Etcd do key = "foo/#{rand}" lease = etcd.elect(key, "bar", 1) etcds.sample.terminate(graceful: false) - lease.wait(6.seconds) do + unless lease.wait(6.seconds) fail "should not lose the leadership" end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 38963c2cbe..d3bf07b941 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -1,6 +1,7 @@ require "http/client" require "wait_group" require "json" +require "./logger" module LavinMQ class Etcd @@ -118,14 +119,13 @@ module LavinMQ end # Wait until looses leadership - def wait(timeout : Time::Span, &) : Nil - loop do - select - when @lost_leadership_channel.receive? - break - when timeout(timeout) - yield - end + # Returns true when lost leadership, false when timeout occured + def wait(timeout : Time::Span) : Bool + select + when @lost_leadership_channel.receive? + return true + when timeout(timeout) + return false end end end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 59123a580b..898b9a5858 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -41,16 +41,17 @@ module LavinMQ def run listen SystemD.notify_ready - if leadership = @leadership - leadership.wait(30.seconds) do - @data_dir_lock.try &.poll - GC.collect - end - Log.warn { "Lost leadership" } - stop - exit 1 - else - loop do + loop do + if leadership = @leadership + if leadership.wait(30.seconds) + Log.warn { "Lost leadership" } + stop + exit 1 + else + @data_dir_lock.try &.poll + GC.collect + end + else sleep 30.seconds @data_dir_lock.try &.poll GC.collect From 8403419ce128cd6b3e7ce84ceb98375ca239299f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 10 Dec 2024 11:18:11 +0100 Subject: [PATCH 3/3] fixup! Sync release leadership on stop --- spec/etcd_spec.cr | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index f540bab081..a2e73d9359 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -75,11 +75,9 @@ describe LavinMQ::Etcd do cluster.run do |etcds| etcd = LavinMQ::Etcd.new(cluster.endpoints) key = "foo/#{rand}" - lease = etcd.elect(key, "bar", 1) + lease = etcd.elect(key, "bar", ttl: 1) etcds.first(2).each &.terminate(graceful: false) - unless lease.wait(15.seconds) - fail "should lose the leadership" - end + lease.wait(15.seconds).should be_true, "should lose the leadership" end end @@ -88,11 +86,9 @@ describe LavinMQ::Etcd do cluster.run do |etcds| etcd = LavinMQ::Etcd.new(cluster.endpoints) key = "foo/#{rand}" - lease = etcd.elect(key, "bar", 1) + lease = etcd.elect(key, "bar", ttl: 1) etcds.sample.terminate(graceful: false) - unless lease.wait(6.seconds) - fail "should not lose the leadership" - end + lease.wait(6.seconds).should be_false, "should not lose the leadership" end end