Skip to content

Commit

Permalink
Beginning of replication settings spec
Browse files Browse the repository at this point in the history
Co-authored-by: Jon Börjesson [email protected]
  • Loading branch information
kickster97 committed Nov 14, 2023
1 parent cf406aa commit ad09683
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
61 changes: 61 additions & 0 deletions spec/replication_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,64 @@ describe LavinMQ::Replication::Client do
end
end
end

describe LavinMQ::Replication::Server do
data_dir = "/tmp/lavinmq-follower"

before_each do
FileUtils.rm_rf data_dir
Dir.mkdir_p data_dir
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
Server.vhosts["/"].declare_queue("repli", true, false)
LavinMQ::Config.instance.min_followers = 1
end

after_each do
FileUtils.rm_rf data_dir
LavinMQ::Config.instance.min_followers = 0
end

it "+min_followers" do
done = Channel(Nil).new
client : AMQP::Client::Connection? = nil
spawn do
with_channel do |ch, conn|
client = conn
q = ch.queue("repli")
q.publish_confirm "hello world"
done.send nil
end
end
select
when done.receive
fail "should not receive message"
when timeout(0.1.seconds)
pp "timeout"
client.try &.close(no_wait: true)
Server.close
end

# q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
# q.basic_get(true) { }.should be_false

# repli = LavinMQ::Replication::Client.new(data_dir)
# spawn do
# repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
# end
# with_channel do |ch|
# ch.basic_publish "hello world", "", "repli"
# end
# q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
# q.basic_get(true) { }.should be_true

# repli.close
end

it "-min_followers" do
end

it "+max_lag" do
end
it "-max_lag" do
end
end
2 changes: 1 addition & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def with_channel(**args, &)
args = {port: LavinMQ::Config.instance.amqp_port, name: name}.merge(args)
conn = AMQP::Client.new(**args).connect
ch = conn.channel
yield ch
yield ch, conn
ensure
conn.try &.close(no_wait: false)
end
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/replication/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ module LavinMQ
@data_dir_lock : DataDirLock?
@closed = false

def initialize(@data_dir : String)
def initialize(@data_dir : String, pwd : String? = nil)
System.maximize_fd_limit
@socket = TCPSocket.new
@socket.sync = true
@socket.read_buffering = false
@lz4 = Compress::LZ4::Reader.new(@socket)
@password = password
@password = pwd || password
@files = Hash(String, File).new do |h, k|
path = File.join(@data_dir, k)
Dir.mkdir_p File.dirname(path)
Expand Down
25 changes: 19 additions & 6 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ module LavinMQ
@followers = Array(Follower).new
@password : String
@files = Hash(String, MFile?).new
@closing = false

def initialize
@password = password
def initialize(@password = password)
@tcp = TCPServer.new
@tcp.sync = false
@tcp.reuse_address = true
Expand Down Expand Up @@ -101,13 +101,18 @@ module LavinMQ
end

def wait_for_max_lag
until @followers.size >= Config.instance.min_followers
break unless @followers_changed.receive?
# was_closing = @closing
until @closing || @followers.size >= Config.instance.min_followers
@followers_changed.receive
end
# unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers
# raise Exception.new("Not enough followers")
# end
@followers.each_with_index do |f, i|
break if i > Config.instance.min_followers
f.wait_for_max_lag
end
rescue Channel::ClosedError
end

private def password : String
Expand Down Expand Up @@ -147,12 +152,14 @@ module LavinMQ
follower.full_sync
@followers << follower
end
followers_changed.try_send nil
begin
follower.read_acks
ensure
@lock.synchronize do
@followers.delete(follower)
end
followers_changed.try_send nil
end
rescue ex : AuthenticationError
Log.warn { "Follower negotiation error" }
Expand All @@ -165,14 +172,22 @@ module LavinMQ
end

def close
Log.debug { "closing" }
@tcp.close
@lock.synchronize do
@followers.each &.close
@followers.clear
end
@followers_changed.close
Log.debug { "closed" }
Fiber.yield # required for follower/listener fibers to actually finish
end

def closing
@closing = true
@followers_changed.send nil
end

private def each_follower(& : Follower -> Nil) : Nil
@lock.synchronize do
@followers.each do |f|
Expand Down Expand Up @@ -220,7 +235,6 @@ module LavinMQ

def read_acks(socket = @socket) : Nil
spawn action_loop, name: "Follower#action_loop"
@server.followers_changed.try_send nil
loop do
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
Expand Down Expand Up @@ -413,7 +427,6 @@ module LavinMQ
Log.info { "Disconnected" }
@actions.close
@ack.close
@server.followers_changed.try_send nil
@lz4.close
@socket.close
rescue IO::Error
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,12 @@ module LavinMQ

def close
@closed = true
@replicator.closing
@log.debug { "Closing listeners" }
@listeners.each_key &.close
@log.debug { "Closing vhosts" }
@vhosts.close
@log.debug { "Closing replicator" }
@replicator.close
end

Expand Down
5 changes: 4 additions & 1 deletion src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,11 @@ module LavinMQ
sleep 0.1
end
# then force close the remaining (close tcp socket)
@log.debug {"force closing connection"} unless connections.empty?

@connections.each &.force_close
Fiber.yield # yield so that Client read_loops can shutdown
@log.debug { "Closing queues" }
@queues.each_value &.close
Fiber.yield
compact!
Expand Down Expand Up @@ -610,7 +613,7 @@ module LavinMQ
end
io.fsync
File.rename io.path, @definitions_file_path
@replicator.replace_file @definitions_file_path
# @replicator.replace_file @definitions_file_path
@definitions_file.close
@definitions_file = io
end
Expand Down

0 comments on commit ad09683

Please sign in to comment.