Skip to content

Latest commit

 

History

History
203 lines (173 loc) · 9.63 KB

basic_publish.md

File metadata and controls

203 lines (173 loc) · 9.63 KB

Publishing Messages into RabbitMQ

One of the best ways to cover the various parts of RabbitMQ's architecture is to see what happens when a message gets published into the broker. In this document we are going to visit the different subsystems a message crosses inside the broker. Let's start by rabbit_reader.

The rabbit_reader process is the one that takes care of reading data from the network and forwarding it to the respective channel process. Messages get into the channel when the reader calls the function rabbit_channel:do_flow/3, this function will call the credit_flow module to track that a message was received from the reader, so it could eventually throttle down the reader in case the message publisher is sending more messages in than the amount the broker can handle at a particular time. Read more about Credit Flow here. More information about the Reader process can be found in the Networking and Connections guide.

Arriving into the Channel Process

Once Credit Flow is accounted for, then the do_flow/3 function will issue an asynchronous gen_server:cast/2 into the channel process passing in this Erlang message: {method, Method, Content, flow}. There we have the AMQP Method, then method Content, and the atom flow indicating the channel that credit flow is in use.

When the cast reaches the handle_cast/2 function inside the channel module, we are finally inside the channel process memory and execution path. If flow was in use, as is the case here, then the channel will issue a credit_flow:ack/1 to the reader process. Then the AMQP method that's being processed will be passed to the Interceptor defined for the channel, in case there are any. After the Interceptors are done processing the AMQP method, then the channel process will continue processing the method, in our case the function handle_method/3 will be called, with a basic.publish record.

Inside basic.publish

basic.publish works by receiving an AMQP message, an Exchange and a Routing Key, and it will use the exchange to route the message to one or various queues, based on the routing key. Let's see how's that accomplished.

The first thing the function does is to check the size of the message since RabbitMQ has an upper limit of 2GB for messages.

Then the function needs to build the resource record for the Exchange. Exchanges and Queues are represented internally with a resource record that keeps track of the name, and the vhost where the exchange or queue was declared. The type declaration record looks like this:

#resource{virtual_host :: VirtualHost,
          kind         :: Kind,
          name         :: Name}

So if a message was published to the default vhost to an exchange called "my_exchange", we will end up with the following record:

#resource{virtual_host = <<"/">>
          kind         = exchange,
          name         = <<"my_exchange">>}

Resources like that one are used everywhere in RabbitMQ, so it's a good idea to study their parts in the rabbit_types module where this declarations are defined.

Once we have the exchange record, basic.publish will use it to see if the user publishing the message has write permissions to this particular exchange by calling the function check_write_permitted/2. Read more about the different kind of permissions here: access-control

If the user does have permission to publish messages to this exchange, then the channel will query the Mnesia database trying to find out if the exchange actually exists, so the function rabbit_exchange:lookup_or_die/1 is called in order to retrieve the actual exchange record from the database, if the exchange is not found, then a channel error is raised by lookup_or_die/1. Keep in mind that one thing is the exchange resource we mentioned above, and another much different is the exchange record stored in mnesia. The latter holds up much more information about the actual exchange, like it's type for example (direct, fanout, topic, etc). Here's the exchange record definition from rabbit.hrl:

%% fields described as 'transient' here are cleared when writing to
%% rabbit_durable_<thing>
-record(exchange, {
          name, type, durable, auto_delete, internal, arguments, %% immutable
          scratches,    %% durable, explicitly updated via update_scratch/3
          policy,       %% durable, implicitly updated when policy changes
          decorators}). %% transient, recalculated in store/1 (i.e. recovery)

Then we need to check that the record returned by Mnesia is not an internal exchange, otherwise an error will be raised and the publish will fail.

The next thing to do is to validate the user id provided with the basic publish, if any. If provided, this user id has to be validated against the user that created the channel where the message is being published. More details here

Then we need to validate if the message expiration header that the user provided is correct. More info about the Per-Message-TTL here

Then it's time to check if the message was published as Mandatory or if the channel is in Transaction or Confirm Mode. If this is the case, then the publish_seqno field on the channel state will be incremented to account for the new publish that's being handled. This Message Sequence Number will be later used to reply back to the publisher in case the message was Mandatory and/or the channel was in Confirm Mode. See also the document Delivering Messages to Queues.

After all these steps have been completed, it's time to route the AMQP message, but in order to do that we need to wrap the message first into a #basic_message record, and then pass it to the exchange and queues as a #delivery{} record:

-record(basic_message,
        {exchange_name,     %% The exchange where the message was received
         routing_keys = [], %% Routing keys used during publish
         content,           %% The message content
         id,                %% A `rabbit_guid:gen()` generated id
         is_persistent}).   %% Whether the message was published as persistent

-record(delivery,
        {mandatory,  %% Whether the message was published as mandatory
         confirm,    %% Whether the message needs confirming
         sender,     %% The pid of the process that created the delivery
         message,    %% The #basic_message record
         msg_seq_no, %% Msg Sequence Number from the channel publish_seqno field
         flow}).     %% Should flow control be used for this delivery

Message Routing

The #delivery we just created on the previous step is now passed to the exchange via the function rabbit_exchange:route/2. If the exchange name used during basic.publish is the empty string <<"">>, then the default exchange is assumed, and the route/2 will just return the queue name associated with the routing key, per AMQP spec. If that's not the case, then the delivery will be processed first by the exchange decorators that are configured to the exchange that's handling the routing. The decorators will send back a list of destinations. At this point, delivery will finally reach the exchange, where the routing algorithm implemented by the exchange will take place. This process will return a new list of destinations which will be merged and deduplicated with the list returned before by the decorators. At this point, all the destinations proposed by the Exchange To Exchange bindings are also included in the list of destinations that will be returned to the channel.

Processing Routing Results

Now the channel has a list of queues to which it should deliver the messages. Before doing that, we need to see if the channel is in transaction mode, if that's the case, then the #delivery and the list of queues are enqueued for later until the transaction is committed. Keep in mind that transaction support in RabbitMQ are a very simple form of message batching. If the channel is not in transaction mode, then the message will be delivered to the queues returned by the routing function.

Summary

We saw in this guide that messages arrive via the network into the rabbit_reader process. This process forwards commands to rabbit_channel processes who take care of processing the various AMQP methods. In this case, we are seeing what happens when a message is published into RabbitMQ. Once credit flow has been acked back to the reader process, then it's time to take care of handling the message. First it will go to the interceptors, who might modify or augment the AMQP method received from the reader. Then the channel must make sure the message complies to the size limits set at the broker side. Once that's done, we need to see if the user has permission to publish message to the selected exchange. If that's fine and the user_id and expiration headers of the message are validated, then it's time to route the message. The exchange who handles the message will return back a list of queues to which the message must be delivered to. At this point we are done with the message and the channel is ready to keep processing commands.

Now we can continue with the next guide and see what happens when messages are delivered to queues: Delivering Messages to Queues