Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: use next instead of return to continue listen for events #867

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/lavinmq/clustering/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ class LavinMQ::Clustering::Controller

def run
spawn(follow_leader, name: "Follower monitor")
wait_to_be_insync
lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
lease = loop do
wait_to_be_insync
Log.info { "Campaigning for leader..." }
lease2 = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
break lease2 if in_sync_to_be_leader?
lease2.close
Log.info { "Not in sync to be leader" }
end
replicator = Clustering::Server.new(@config, @etcd)
@launcher = l = Launcher.new(@config, replicator, lease)
l.run
Expand Down Expand Up @@ -45,18 +51,20 @@ class LavinMQ::Clustering::Controller
# Listens for leader change events
private def follow_leader
repli_client = nil
Log.info { "follow_leader Listening for leader changes..." }
@etcd.elect_listen("#{@config.clustering_etcd_prefix}/leader") do |uri|
Log.debug { "follow_leader Leader event: #{uri}" }
next if repli_client.try &.follows?(uri) # if lost connection to etcd we continue follow the leader as is
repli_client.try &.close
if uri == @advertised_uri # if this instance has become leader
Log.debug { "Is leader, don't replicate from self" }
return
Log.debug { "follow_leader Is leader, don't replicate from self" }
next
end
Log.info { "Leader: #{uri}" }
key = "#{@config.clustering_etcd_prefix}/clustering_secret"
secret = @etcd.get(key)
until secret # the leader might not have had time to set the secret yet
Log.debug { "Clustering secret is missing, watching for it" }
Log.debug { "follow_leader Clustering secret is missing, watching for it" }
@etcd.watch(key) do |value|
secret = value
break
Expand All @@ -66,9 +74,12 @@ class LavinMQ::Clustering::Controller
spawn r.follow(uri), name: "Clustering client #{uri}"
SystemD.notify_ready
end
ensure
Log.debug { "follow_leader exiting" }
end

def wait_to_be_insync
Log.debug { "Waiting to be in sync..." }
if isr = @etcd.get("#{@config.clustering_etcd_prefix}/isr")
unless isr.split(",").map(&.to_i(36)).includes?(@id)
Log.info { "ISR: #{isr}" }
Expand All @@ -80,4 +91,10 @@ class LavinMQ::Clustering::Controller
end
end
end

def in_sync_to_be_leader?
return true unless isr = @etcd.get("#{@config.clustering_etcd_prefix}/isr")
Log.debug { "#in_sync_to_be_leader? isr=#{isr} id=#{@id.to_s(36)}" }
isr.split(",").map(&.to_i(36)).includes?(@id)
end
end
Loading