QueueBatch is an Azure Functions trigger providing ability to process Azure Storage Queue messages in batches.
Azure Functions trigger for Azure Storage Queue obtains messages in batches but dispatches them in separate tasks. It's ok, if a function touches different resources. If a function does some processing and then appends to a single append blob it won't scale up nicely. The problems that might occur are:
- issuing many IO transactions,
- failing due to concurrency checks (hot resources)
- breaching throughput of a partition, etc.
Accessing the same resource with high frequency might simply not work. With QueueBatch
, you can address it, by processing all the messages from the batch in the same function, amortizing the cost of accessing other resources.
To use QueueBatch
in your Function application, you need to use a custom IMessageBatch
parameter type to accept a batch and mark it with an appropriate attribute. To resolve queue name from configuration file wrap it with %
sign, i.e. %queue-app-setting-key%
public static void MyFunc([QueueBatchTrigger("myqueue")] IMessageBatch batch)
{
foreach (var msg in batch.Messages)
{
// do something with payload
DoSomething(msg.Payload);
}
// acknowledge processing
batch.MarkAllAsProcessed ();
}
With SuccessOrFailAsBatch
set to false
, you can also acknowledge only some of the messages. The rest, will be retried in a similar manner to the regural [QueueTrigger]
public static void MyFunc([QueueBatchTrigger("myqueue", SuccessOrFailAsBatch = false)] IMessageBatch batch)
{
foreach (var msg in batch.Messages)
{
// do something with payload
if (DoSomething(msg.Payload))
{
// mark as processed only if successful
batch.MarkAsProcessed (msg);
}
}
}
QueueBatch
provides an alternative client for accessing Azure Storage Queues that is much faster then the one provided by SDK (up to 20x). To enable it (it's opt-in), you need to set UseFasterQueues
to true
.
public static void MyFunc([QueueBatchTrigger("myqueue", UseFasterQueues = true)] IMessageBatch batch)
{
// ...
}
As a single operation of getting messages can obtain no more than 32, you can request issuing multiple parallel gets. The maximum number of messages in a batch will be equal to 32 * ParallelGets
.
public static void MyFunc([QueueBatchTrigger("myqueue", ParallelGets = 2)] IMessageBatch batch)
{
// ...
}
Sometimes it might be useful to get notifications about empty batches as well. They can be used to do some other work, like compacting your data or sending a notification that there was a run with nothing to process. After each empty batch, the back-off strategy will delay the next query for messages even more. To enable calls with empty batches, specify the following property
public static void MyFunc([QueueBatchTrigger("myqueue", RunWithEmptyBatch = true)] IMessageBatch batch)
{
// now, if no message is retrieved from the queue, MyFunc will still be called
}
Connection
is an optional property to specify queue's storage account. It's a name of an app setting that contains the storage connection string to use for this binding. When connection property is empty then default AzureWebJobsStorage
connection string is used.
public static void MyFunc([QueueBatchTrigger("myqueue", Connection = "StorageConnectionAppSetting")] IMessageBatch batch)
{
// ...
}
QueueBatch is licensed under the Apache License 2.0 license.
Azure Webjobs SDK is licensed under the MIT license as described here. Azure Webjobs SDK sources are used and partially compiled into the QueueBatch distribution as allowed under the license terms found here.
Batch Download designed by Fatahillah from The Noun Project