diff --git a/src/lavinmq/clustering/controller.cr b/src/lavinmq/clustering/controller.cr index 19f81b7c4..222296e14 100644 --- a/src/lavinmq/clustering/controller.cr +++ b/src/lavinmq/clustering/controller.cr @@ -16,8 +16,14 @@ class LavinMQ::Clustering::Controller def run spawn(follow_leader, name: "Follower monitor") - wait_to_be_insync - lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + lease = loop do + wait_to_be_insync + Log.info { "Campaigning for leader..." } + lease2 = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + break lease2 if in_sync_to_be_leader? + lease2.close + Log.info { "Not in sync to be leader" } + end replicator = Clustering::Server.new(@config, @etcd) @launcher = l = Launcher.new(@config, replicator, lease) l.run @@ -45,18 +51,20 @@ class LavinMQ::Clustering::Controller # Listens for leader change events private def follow_leader repli_client = nil + Log.info { "follow_leader Listening for leader changes..." } @etcd.elect_listen("#{@config.clustering_etcd_prefix}/leader") do |uri| + Log.debug { "follow_leader Leader event: #{uri}" } next if repli_client.try &.follows?(uri) # if lost connection to etcd we continue follow the leader as is repli_client.try &.close if uri == @advertised_uri # if this instance has become leader - Log.debug { "Is leader, don't replicate from self" } - return + Log.debug { "follow_leader Is leader, don't replicate from self" } + next end Log.info { "Leader: #{uri}" } key = "#{@config.clustering_etcd_prefix}/clustering_secret" secret = @etcd.get(key) until secret # the leader might not have had time to set the secret yet - Log.debug { "Clustering secret is missing, watching for it" } + Log.debug { "follow_leader Clustering secret is missing, watching for it" } @etcd.watch(key) do |value| secret = value break @@ -66,9 +74,12 @@ class LavinMQ::Clustering::Controller spawn r.follow(uri), name: "Clustering client #{uri}" SystemD.notify_ready end + ensure + Log.debug { "follow_leader exiting" } end def wait_to_be_insync + Log.debug { "Waiting to be in sync..." } if isr = @etcd.get("#{@config.clustering_etcd_prefix}/isr") unless isr.split(",").map(&.to_i(36)).includes?(@id) Log.info { "ISR: #{isr}" } @@ -80,4 +91,10 @@ class LavinMQ::Clustering::Controller end end end + + def in_sync_to_be_leader? + return true unless isr = @etcd.get("#{@config.clustering_etcd_prefix}/isr") + Log.debug { "#in_sync_to_be_leader? isr=#{isr} id=#{@id.to_s(36)}" } + isr.split(",").map(&.to_i(36)).includes?(@id) + end end