Skip to content

Commit

Permalink
CQ: Update long description at the top of the module
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoguin committed Sep 27, 2022
1 parent e09cbeb commit 1eb1710
Showing 1 changed file with 76 additions and 183 deletions.
259 changes: 76 additions & 183 deletions deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,204 +41,65 @@
-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).

%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
%% disk, or both. Persistent messages will have both message and
%% position pushed to disk as soon as they arrive; transient messages
%% can be written to disk (and thus both types can be evicted from
%% memory) under memory pressure. The question of whether a message is
%% in RAM and whether it is persistent are orthogonal.
%% Messages, their metadata and their position in the queue (SeqId),
%% can be in memory or on disk, or both. Persistent messages in
%% durable queues are always written to disk when they arrive.
%% Transient messages as well as persistent messages in non-durable
%% queues may be kept only in memory.
%%
%% Messages are persisted using the queue index and the message
%% store. Normally the queue index holds the position of the message
%% *within this queue* along with a couple of small bits of metadata,
%% while the message store holds the message itself (including headers
%% and other properties).
%% The number of messages kept in memory is dependent on the consume
%% rate of the queue. At a minimum 1 message is kept (necessary because
%% we often need to check the expiration at the head of the queue) and
%% at a maximum the semi-arbitrary number 2048.
%%
%% However, as an optimisation, small messages can be embedded
%% directly in the queue index and bypass the message store
%% altogether.
%% Messages are never written back to disk after they have been read
%% into memory. Instead the queue is designed to avoid keeping too
%% much to begin with.
%%
%% Definitions:
%% Messages are persisted using a queue index and a message store.
%% A few different scenarios may play out depending on the message
%% size and the queue-version argument.
%%
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM
%%
%% beta: this is a message where the message itself is only held on
%% disk (if persisted to the message store) but its position
%% within the queue is held in RAM.
%%
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.
%%
%% delta: this is a collection of messages, represented by a single
%% term, where the messages and their position are only held on
%% disk.
%%
%% Note that for persistent messages, the message and its position
%% within the queue are always held on disk, *in addition* to being in
%% one of the above classifications.
%%
%% Also note that within this code, the term gamma seldom
%% appears. It's frequently the case that gammas are defined by betas
%% who have had their queue position recorded on disk.
%%
%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
%% many of these steps are frequently skipped. q1 and q4 only hold
%% alphas, q2 and q3 hold both betas and gammas. When a message
%% arrives, its classification is determined. It is then added to the
%% rightmost appropriate queue.
%%
%% If a new message is determined to be a beta or gamma, q1 is
%% empty. If a new message is determined to be a delta, q1 and q2 are
%% empty (and actually q4 too).
%%
%% When removing messages from a queue, if q4 is empty then q3 is read
%% directly. If q3 becomes empty then the next segment's worth of
%% messages from delta are read into q3, reducing the size of
%% delta. If the queue is non empty, either q4 or q3 contain
%% entries. It is never permitted for delta to hold all the messages
%% in the queue.
%%
%% The duration indicated to us by the memory_monitor is used to
%% calculate, given our current ingress and egress rates, how many
%% messages we should hold in RAM (i.e. as alphas). We track the
%% ingress and egress rates for both messages and pending acks and
%% rates for both are considered when calculating the number of
%% messages to hold in RAM. When we need to push alphas to betas or
%% betas to gammas, we favour writing out messages that are further
%% from the head of the queue. This minimises writes to disk, as the
%% messages closer to the tail of the queue stay in the queue for
%% longer, thus do not need to be replaced as quickly by sending other
%% messages to disk.
%%
%% Whilst messages are pushed to disk and forgotten from RAM as soon
%% as requested by a new setting of the queue RAM duration, the
%% inverse is not true: we only load messages back into RAM as
%% demanded as the queue is read from. Thus only publishes to the
%% queue will take up available spare capacity.
%%
%% When we report our duration to the memory monitor, we calculate
%% average ingress and egress rates over the last two samples, and
%% then calculate our duration based on the sum of the ingress and
%% egress rates. More than two samples could be used, but it's a
%% balance between responding quickly enough to changes in
%% producers/consumers versus ignoring temporary blips. The problem
%% with temporary blips is that with just a few queues, they can have
%% substantial impact on the calculation of the average duration and
%% hence cause unnecessary I/O. Another alternative is to increase the
%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
%% seconds. However, that then runs the risk of being too slow to
%% inform the memory monitor of changes. Thus a 5 second interval,
%% plus a rolling average over the last two samples seems to work
%% well in practice.
%% - queue-version=1, size < qi_msgs_embed_below: both the message
%% metadata and content are stored in rabbit_queue_index
%%
%% The sum of the ingress and egress rates is used because the egress
%% rate alone is not sufficient. Adding in the ingress rate means that
%% queues which are being flooded by messages are given more memory,
%% resulting in them being able to process the messages faster (by
%% doing less I/O, or at least deferring it) and thus helping keep
%% their mailboxes empty and thus the queue as a whole is more
%% responsive. If such a queue also has fast but previously idle
%% consumers, the consumer can then start to be driven as fast as it
%% can go, whereas if only egress rate was being used, the incoming
%% messages may have to be written to disk and then read back in,
%% resulting in the hard disk being a bottleneck in driving the
%% consumers. Generally, we want to give Rabbit every chance of
%% getting rid of messages as fast as possible and remaining
%% responsive, and using only the egress rate impacts that goal.
%% - queue-version=1, size >= qi_msgs_embed_below: the metadata
%% is stored in rabbit_queue_index, while the content is stored
%% in the per-vhost shared rabbit_msg_store
%%
%% Once the queue has more alphas than the target_ram_count, the
%% surplus must be converted to betas, if not gammas, if not rolled
%% into delta. The conditions under which these transitions occur
%% reflect the conflicting goals of minimising RAM cost per msg, and
%% minimising CPU cost per msg. Once the msg has become a beta, its
%% payload is no longer in RAM, thus a read from the msg_store must
%% occur before the msg can be delivered, but the RAM cost of a beta
%% is the same as a gamma, so converting a beta to gamma will not free
%% up any further RAM. To reduce the RAM cost further, the gamma must
%% be rolled into delta. Whilst recovering a beta or a gamma to an
%% alpha requires only one disk read (from the msg_store), recovering
%% a msg from within delta will require two reads (queue_index and
%% then msg_store). But delta has a near-0 per-msg RAM cost. So the
%% conflict is between using delta more, which will free up more
%% memory, but require additional CPU and disk ops, versus using delta
%% less and gammas and betas more, which will cost more memory, but
%% require fewer disk ops and less CPU overhead.
%% - queue-version=2, size < qi_msgs_embed_below: the metadata
%% is stored in rabbit_classic_queue_index_v2, while the content
%% is stored in the per-queue rabbit_classic_queue_store_v2
%%
%% In the case of a persistent msg published to a durable queue, the
%% msg is immediately written to the msg_store and queue_index. If
%% then additionally converted from an alpha, it'll immediately go to
%% a gamma (as it's already in queue_index), and cannot exist as a
%% beta. Thus a durable queue with a mixture of persistent and
%% transient msgs in it which has more messages than permitted by the
%% target_ram_count may contain an interspersed mixture of betas and
%% gammas in q2 and q3.
%% - queue-version=2, size >= qi_msgs_embed_below: the metadata
%% is stored in rabbit_classic_queue_index_v2, while the content
%% is stored in the per-vhost shared rabbit_msg_store
%%
%% There is then a ratio that controls how many betas and gammas there
%% can be. This is based on the target_ram_count and thus expresses
%% the fact that as the number of permitted alphas in the queue falls,
%% so should the number of betas and gammas fall (i.e. delta
%% grows). If q2 and q3 contain more than the permitted number of
%% betas and gammas, then the surplus are forcibly converted to gammas
%% (as necessary) and then rolled into delta. The ratio is that
%% delta/(betas+gammas+delta) equals
%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
%% the target_ram_count shrinks to 0, so must betas and gammas.
%% When messages must be read from disk, message bodies will
%% also be read from disk except if the message in stored
%% in the per-vhost shared rabbit_msg_store. In that case
%% the message gets read before it needs to be sent to the
%% consumer. Messages are read from rabbit_msg_store one
%% at a time currently.
%%
%% The conversion of betas to deltas is done if there are at least
%% ?IO_BATCH_SIZE betas in q2 & q3. This value should not be too small,
%% otherwise the frequent operations on the queues of q2 and q3 will not be
%% effectively amortised (switching the direction of queue access defeats
%% amortisation). Note that there is a natural upper bound due to credit_flow
%% limits on the alpha to beta conversion.
%% The queue also keeps track of messages that were delivered
%% but for which the ack has not been received. Pending acks
%% are currently kept in memory although the message may be
%% on disk.
%%
%% The conversion from alphas to betas is chunked due to the
%% credit_flow limits of the msg_store. This further smooths the
%% effects of changes to the target_ram_count and ensures the queue
%% remains responsive even when there is a large amount of IO work to
%% do. The 'resume' callback is utilised to ensure that conversions
%% are done as promptly as possible whilst ensuring the queue remains
%% responsive.
%%
%% In the queue we keep track of both messages that are pending
%% delivery and messages that are pending acks. In the event of a
%% queue purge, we only need to load qi segments if the queue has
%% elements in deltas (i.e. it came under significant memory
%% pressure). In the event of a queue deletion, in addition to the
%% preceding, by keeping track of pending acks in RAM, we do not need
%% to search through qi segments looking for messages that are yet to
%% be acknowledged.
%%
%% Pending acks are recorded in memory by storing the message itself.
%% If the message has been sent to disk, we do not store the message
%% content. During memory reduction, pending acks containing message
%% content have that content removed and the corresponding messages
%% are pushed out to disk.
%%
%% Messages from pending acks are returned to q4, q3 and delta during
%% requeue, based on the limits of seq_id contained in each. Requeued
%% messages retain their original seq_id, maintaining order
%% when requeued.
%%
%% The order in which alphas are pushed to betas and pending acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
%% fastest (with growth measured as avg. ingress - avg. egress).
%%
%% Notes on Clean Shutdown
%% (This documents behaviour in variable_queue, queue_index and
%% msg_store.)
%% Messages being requeued are returned to their position in
%% the queue using their SeqId value.
%%
%% In order to try to achieve as fast a start-up as possible, if a
%% clean shutdown occurs, we try to save out state to disk to reduce
%% work on startup. In the msg_store this takes the form of the
%% work on startup. In rabbit_msg_store this takes the form of the
%% index_module's state, plus the file_summary ets table, and client
%% refs. In the VQ, this takes the form of the count of persistent
%% messages in the queue and references into the msg_stores. The
%% queue_index adds to these terms the details of its segments and
%% queue index adds to these terms the details of its segments and
%% stores the terms in the queue directory.
%%
%% Two message stores are used. One is created for persistent messages
%% Two rabbit_msg_store(s) are used. One is created for persistent messages
%% to durable queues that must survive restarts, and the other is used
%% for all other messages that just happen to need to be written to
%% disk. On start up we can therefore nuke the transient message
Expand All @@ -248,7 +109,7 @@
%% The references to the msg_stores are there so that the msg_store
%% knows to only trust its saved state if all of the queues it was
%% previously talking to come up cleanly. Likewise, the queues
%% themselves (esp queue_index) skips work in init if all the queues
%% themselves (especially indexes) skip work in init if all the queues
%% and msg_store were shutdown cleanly. This gives both good speed
%% improvements and also robustness so that if anything possibly went
%% wrong in shutdown (or there was subsequent manual tampering), all
Expand All @@ -263,9 +124,9 @@
%% stored in the transient msg_store which would have had its saved
%% state nuked on startup). This avoids the expensive operation of
%% scanning the entire queue on startup in order to delete transient
%% messages that were only pushed to disk to save memory.
%% messages that were written to disk.
%%
%% v2 UPDATE: The queue is keeping track of delivers via the
%% The queue is keeping track of delivers via the
%% next_deliver_seq_id variable. This variable gets increased
%% with every (first-time) delivery. When delivering messages
%% the seq_id of the message is checked against this variable
Expand All @@ -275,6 +136,38 @@
%% message in the queue (effectively marking all messages as
%% delivered, like the v1 index was doing).
%%
%% Previous versions of classic queues had a much more complex
%% way of working. Messages were categorized into four groups,
%% and remnants of these terms remain in the code at the time
%% of writing:
%%
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM
%%
%% beta: this is a message where the message itself is only held on
%% disk (if persisted to the message store) but its position
%% within the queue is held in RAM.
%%
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.
%%
%% delta: this is a collection of messages, represented by a single
%% term, where the messages and their position are only held on
%% disk.
%%
%% Messages may have been stored in q1, q2, delta, q3 or q4 depending
%% on their location in the queue. The current version of classic
%% queues only use delta (on-disk, for the tail of the queue) or
%% q3 (in-memory, head of the queue). Messages used to move from
%% q1 -> q2 -> delta -> q3 -> q4 (and sometimes q3 -> delta or
%% q4 -> delta to reduce memory use). Now messages only move
%% from delta to q3. Full details on the old mechanisms can be
%% found in previous versions of this file (such as the 3.11 version).
%%
%% In the current version of classic queues, there is no distinction
%% between default and lazy queues. The current behavior is close to
%% lazy queues, except we avoid some write to disks when queues are
%% empty.
%%----------------------------------------------------------------------------

-behaviour(rabbit_backing_queue).
Expand Down

0 comments on commit 1eb1710

Please sign in to comment.