RabbitMQ supports plugable backing queues by modules implementing the
rabbit_backing_queue
behaviour.
The backing queue init/3
callback expects an async_callback()
parameter which is a fun
callback which takes the backing queue
state, and returns a new state. Keep reading to understand what all
this callback mumbo-jumbo means.
TL;DR: due to the two problems explained below, this callback takes care of executing certain functions in the context of a particular Erlang process.
Understanding how this callback works is vital since the persistence
layer of the backing queue does heavy use of the process dictionary
and the use of self()
to track who opened which file handle. What
this means is that even tho the backing queue behaviour callbacks
seem to have the referential transparent property, they do not. Behind
the scenes, some of the backing queue behaviour callbacks will put/get
values to/from the process dictionary, but if one of said callbacks is
executed in a different process context, then those values won't be
found on the process dictionary, and everything else breaks havoc.
The same applies for the file_handle_cache
tracking who owns which
file handle by calling self()
inside its functions implementations
instead of expecting a Pid
as parameter for example. The call of
self()
again violates referential transparency. The function
behaviour now depends on the process context on which it's
called. This means that closing file handles must be done from the
same caller that issued the file open.
The function rabbit_amqqueue_process:bq_init/3
takes care of
initializing the backing queue implementation, whether it is the
rabbit_variable_queue
, the rabbit_mirror_queue_master
, the
rabbit_priority_queue
or your own backing queue behaviour
implementation.
The async callback passed into BQ:init
is defined as:
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end
This fun
will take a module argument, which is usually an atom
referring to the backing queue module being used, for example
rabbit_variable_queue
, or rabbit_mirror_queue_master
. The second
argument expected by this callback is a fun
that will be passed
along to rabbit_amqqueue:run_backing_queue/3
. Now lets see what
rabbit_amqqueue:run_backing_queue
does.
The function body is like this:
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
It sends a {run_backing_queue, Mod, Fun}
message to whatever process
was provided as QPid
. This is important, since that process'
context is the one which will get its process dictionary modified
indirectly, and at the same time will own file handles when they are
opened by the msg_store for example.
Back to rabbit_amqqueue_process
we will see that this module has a
callback for the message mentioned above:
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
This function takes care of extracting the current backing_queue
module and backing_queue_state
from its own process state, and then
calling BQ:invoke(Mod, Fun, BQS)
.
This is what BQ:invoke/3
does:
invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
invoke( _, _, State) -> State.
Invoke's implementation is pretty simple, if the Mod
argument
provided to it matches the current module, in this example
rabbit_variable_queue
, then the Fun
will be executed with
rabbit_variable_queue
as first parameter and the backing queue
State
as the second argument. To reiterate, what's important to
understand is that Fun
will be executed in the context of whatever
QPid
was referring to above. In the case we are analyzing so far,
this is the rabbit_amqqueue_process
pid.
Now let's try to find out what Fun
actually is. To get to this we
need to see how rabbit_variable_queue
is initialized`.
rabbit_variable_queue:init/6
will call into
msg_store_client_init/3
passing our initial callback as the third
parameter (msg_store_client_init/3
then expands into
msg_store_client_init/4
). Let's refresh what that callback was:
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end
That callback will be now wrapped into yet another fun
like this:
fun () -> Callback(?MODULE, CloseFDsFun) end
To see that in context:
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
fun () -> Callback(?MODULE, CloseFDsFun) end).
So now we have a clue of what the Fun
passed into our callback might
be. It is whatever msg_store_close_fds_fun
returned as
CloseFDsFun
. Let's check:
msg_store_close_fds_fun(IsPersistent) ->
fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
State #vqstate { msg_store_clients = MSCState1 }
end.
We get a fun
that will only be executed if the Mod
argument
matches, in this case rabbit_variable_queue
. That fun
takes as
second argument our rabbit_variable_queue
state.
On msg_store_client_init/4
above we said that our initial callback
gets wrapped like this:
fun () -> Callback(?MODULE, CloseFDsFun) end
This means inside the msg_store, at various places, that fun
closure
gets called without arguments which in turn calls our callback with
the CloseFDsFun
. We end up with something like what's below after
some expansions:
fun (rabbit_variable_queue, Fun) ->
rabbit_amqqueue:run_backing_queue(QPid, rabbit_variable_queue,
fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
State #vqstate { msg_store_clients = MSCState1 }
end)
end
So our rabbit_amqqueue_process
will ask the backing queue module
to invoke that expanded fun in the context of the
rabbit_amqqueue_process
Pid:
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
This very same technique is used on rabbit_variable_queue:init/3
to
setup the functions that will write messages to disk (see
rabbit_variable_queue:msgs_written_to_disk/3
) and the ones that will
write the message indexes to disk (see
rabbit_variable_queue:msg_indices_written_to_disk/2
).
From all these layers of indirection, what's important to understand
is that the Pid
passed into rabbit_amqqueue:run_backing_queue/3
determines the context on which all the functions implementing message
persistence will be run. Unless your rabbit_backing_queue
behaviour
implementation is just a proxy like that of rabbit_priority_queue
,
you must take that Pid
context into account, since it will hold file
handles references and its process dictionary will be the one where
the file_handle_cache
will store its information.
If you want a second example of what we outlined above, take a look at
rabbit_mirror_queue_slave:bq_init/3
where the Pid provided to
run_backing_queue/3
in this case is the slave Pid. The slave
process implements it's own handle_cast({run_backing_queue, Mod, Fun}, State)
function clause, on which funs from
rabbit_variable_queue
like msg_store_close_fds_fun
,
msgs_written_to_disk
, msg_indices_written_to_disk
and
msgs_and_indices_written_to_disk
will be run.