messaging

Messaging in .Net Core with RabbitMQ - Publisher Confirm

Messaging in .Net Core with RabbitMQ - Publisher Confirm

Messaging is an essential part of modern distributed systems, and RabbitMQ is one of the most popular messaging brokers used by developers around the world. In this blog post, we’ll explore how to implement publisher confirms with RabbitMQ and .NET Core and we will examine various strategies for using publisher confirms with RabbitMQ and discuss the advantages and drawbacks of each approach.

Publisher confirms are an important feature of RabbitMQ that allows publishers to confirm that messages have been successfully received by the broker, ensuring reliable delivery of messages in distributed systems. We’ll cover the basics of setting up a RabbitMQ client in .NET Core, configuring publisher confirms, handling confirmations, and handling errors that may occur during messaging. By the end of this post, you’ll have a solid understanding of how to implement publisher confirms with RabbitMQ and .NET Core, and how to build robust and reliable messaging systems.

In network communication, ensuring that a message is delivered successfully can be a challenge. Networks can fail in unpredictable ways, and detecting failures can take time. This can lead to a situation where a client assumes that a message has been successfully delivered to a server when in reality it has been lost or significantly delayed.

Traditionally, the only way to guarantee that a message is not lost is to use transactions. This involves making the channel transactional and then publishing each message or set of messages with a commit. However, this approach is not always practical. Transactions are heavyweight and can significantly decrease throughput, making them unsuitable for high-performance systems.

To address this challenge, a confirmation mechanism was introduced in the AMQP protocol. Using this mechanism, a client can confirm that a message has been successfully delivered to the server (Message Broker) and processed.

In this article, we’re going to use publisher confirmations to make sure published messages have safely reached the broker. We will use different approaches for handling publisher confirmations, and we will explain each of these approaches with some sample code separately.

Preparing Channel For Publisher Confirm

To enable publisher confirms in RabbitMQ, you need to call the ConfirmSelect() method on the channel that you want to enable confirms for. This method is used to configure the channel to expect publisher confirms for all subsequent messages published on that channel. It is important to note that this method needs to be called only once per channel and not for every message that is published on that channel.

// Create a connection to RabbitMQ var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { // Create a channel on the connection using (var channel = connection.CreateModel()) { // Enable publisher confirms on the channel channel.ConfirmSelect(); //... } }

Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 when we call channel.ConfirmSelect() on channel for prepare channel for confirmation).

In publisher confirms, the Delivery Tag and Publisher Sequence Number are important concepts to understand:

  • The delivery tag is used on the broker level, and it represents the sequence number that identifies the confirmed or rejected message and acts as a unique identifier for that message.The Delivery Tag on the server is based on the sequence of messages received from the publishers and is synced with the client’s pushed sequence number to ensure the confirmed message can be correlated between the client and server.
  • the publisher sequence number is used on the client level, and it is assigned by the channel to each message before it is published using the channel.BasicPublish method. This number is incremented for each subsequent message, and channel.NextPublishSeqNo property returns the publisher sequence number that will be assigned to the next message published on the channel. The Publisher Sequence Number is especially useful for correlating the Delivery Tag in the confirmation message with the original message that was published. This helps the client keep track of the status of each message and take appropriate actions based on the confirmation received from the broker.

When the client publish a message with its publisher sequence number to the broker, Once the broker receives the message, it assigns a delivery tag to the message based on sequence of the message published. The delivery tag is equivalent to the publisher’s sequence number, and it helps the client keep track of the status of each message published on the channel.

The broker confirms messages as it handles them by sending a basic.ack on the same channel with a delivery tag field containing the sequence number of the confirmed message. This confirms that the message has been successfully received by the broker and is in the queue for delivery.

In exceptional cases where the broker is unable to handle messages successfully or a message is not confirmed within a certain timeout period (we specify this in channel.WaitForConfirmsOrDie(timeout)), the broker will send a basic.nack. This indicates that the broker was unable to process the message and refuses responsibility for it. At that point, the client can assume that the message has been lost or failed to be processed and take appropriate action, such as republishing the message or logging the failure.

The broker can also set the multiple field in the basic.ack or basic.nack message to indicate whether it’s confirming a single message or multiple messages. If the multiple field is set to true, then all messages with a lower or equal sequence number to the one in the delivery tag field have been confirmed.

When can messages be confirmed by the broker after publication?

The answer to this question depends on whether the message is routable or unroutable.

For unroutable messages, the broker will issue a confirmation once the exchange verifies that the message won’t route to any queue. This typically happens when the message is published with the mandatory flag set. In this case, if the message cannot be delivered to any queue, the broker will send a basic.return message to the client before sending the basic.ack confirmation. If the message cannot be delivered and is not marked as mandatory, the broker will simply discard it without issuing any confirmation.

For routable messages, the confirmation process is a bit more complex. The broker will issue a basic.ack confirmation once the message has been accepted by all the queues it’s intended to be routed to.

Publisher Confirm Syncrosnuslly

RabbitMQ offers two approaches for handling publisher confirmations: synchronous and asynchronous. In this post, we’ll focus on the synchronous approach. With this approach, the client publishes a message and waits synchronously for its confirmation using the channel.WaitForConfirmsOrDie(timeout) method. However, there are two cases to consider:

  1. Publisher Confirm Single Message: In this approach, the client publishes a message with channel.BasicPublish() and immediately waits for the confirmation from the broker synchronously by blocking the current thread.
  2. Publisher Confirm Batch Messages: In this approach, the client publishes multiple messages subsequently with channel.BasicPublish() or using publishing message in a batch with channel.CreateBasicPublishBatch() and then waits for confirmation for all the messages to be completed.

Publisher Confirm Single Message Syncrosnuslly

When publishing messages in RabbitMQ .NET, you can either publish them in batches or individually. Publishing messages individually provides more control over the publishing process and allows you to confirm each message individually. In the individual acknowledgment mode, the broker sends a separate acknowledgment for each message published. This mode is suitable for low-throughput scenarios where the number of messages being published is small.

Here is an example of how to publish messages individually using WaitForConfirmsOrDie(timeout) and BasicPublish():

public class SyncSinglePublisherConfirm : IPublisher { private readonly ILogger<SyncSinglePublisherConfirm> _logger; private readonly RabbitMqOptions _rabbitmqOptions; public SyncSinglePublisherConfirm( IOptions<RabbitMqOptions> rabbitmqOptions, ILogger<SyncSinglePublisherConfirm> logger ) { _logger = logger; _rabbitmqOptions = rabbitmqOptions.Value; } public int TimeOut { get; set; } = 60; public async Task PublishAsync(EnvelopMessage message) { await PublishAsync(new List<EnvelopMessage> { message }); } public async Task PublishAsync(IEnumerable<EnvelopMessage> messages) { Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>(); var factory = new ConnectionFactory { HostName = _rabbitmqOptions.Host, UserName = _rabbitmqOptions.User, Password = _rabbitmqOptions.Password }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); // with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1' channel.ConfirmSelect(); _logger.LogInformation( $"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}" ); channel.BasicAcks += (sender, ea) => { _logger.LogInformation( $"Message with delivery tag '{ ea.DeliveryTag }' ack-ed, multiple is { ea.Multiple }." ); }; channel.BasicNacks += (sender, ea) => { _logger.LogInformation( $"Message with delivery tag '{ ea.DeliveryTag }' nack-ed, multiple is { ea.Multiple }." ); }; var startTime = Stopwatch.GetTimestamp(); var messageList = messages.ToList(); foreach (var envelopMessage in messageList) { channel.QueueDeclare( queue: envelopMessage.Message.GetType().Name.Underscore(), durable: true, exclusive: false, autoDelete: false, arguments: null ); var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = envelopMessage.Metadata; properties.ContentType = "application/json"; properties.Type = TypeMapper.GetTypeName(envelopMessage.Message.GetType()); properties.MessageId = envelopMessage.Message.MessageId.ToString(); var currentSequenceNumber = channel.NextPublishSeqNo; try { // After publishing publish message sequence number will be incremented channel.BasicPublish( exchange: string.Empty, routingKey: envelopMessage.Message.GetType().Name.Underscore(), basicProperties: properties, body: Encoding.UTF8.GetBytes( JsonConvert.SerializeObject(envelopMessage.Message) ) ); var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId } published, and current SequenceNumber is: { currentSequenceNumber }, next SequenceNumber after publishing is: { nextSequenceNumberAfterPublish }." ); // single confirmation after each publish channel.WaitForConfirmsOrDie(timeout: TimeSpan.FromSeconds(5)); _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId }, and SequenceNumber is: { currentSequenceNumber } confirmed." ); } catch (Exception ex) { var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId } failed, and current SequenceNumber is: { currentSequenceNumber }, next SequenceNumber after publishing is: { nextSequenceNumberAfterPublish }." ); unsuccessfulPublishedMessages.Enqueue(envelopMessage); } } if (unsuccessfulPublishedMessages.Any()) { await PublishAsync(unsuccessfulPublishedMessages); } _logger.LogInformation("All published messages are confirmed"); var endTime = Stopwatch.GetTimestamp(); _logger.LogInformation( $"Published { messageList.Count } messages and handled confirm asynchronously { Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds } ms" ); } }

Publisher confirm is not enabled by default on the channel so at first we need to enable publisher confirms on the channellevel by calling ConfirmSelect(), This method tells the channel to require a confirmation from the broker after each message is published. This method must be called on every channel that you expect to use publisher confirms. Confirms should be enabled just once, not for every message published.

using var channel = connection.CreateModel(); channel.ConfirmSelect();

For keep tracking client publish sequence number, channel internally uses NextPublishSeqNo and _pendingDeliveryTags and _deliveryTagsCountdown during BasicPublish.

Once a channel is in confirm mode with calling channel.ConfirmSelect(), both the broker and the client count messages (counting starts at 1 on the first confirm.select).

Actually with calling ConfirmSelect() method, NextPublishSeqNo on the channel will be set to 1.

For publishing messages, we loop through a set of messages and publish them one by one:

foreach (var envelopMessage in messageList) { var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = envelopMessage.Metadata; var currentSequenceNumber = channel.NextPublishSeqNo; try { // After publishing publish message sequence number will be incremented channel.BasicPublish( exchange: string.Empty, routingKey: typeof(T).Name.Underscore(), basicProperties: properties, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage))); var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId } published, and current SequenceNumber is: { currentSequenceNumber }, next SequenceNumber after publishing is: { nextSequenceNumberAfterPublish }."); // single confirmation after each publish channel.WaitForConfirmsOrDie(timeout: TimeSpan.FromSeconds(5)); _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId }, and SequenceNumber is: { currentSequenceNumber } confirmed." ); } catch (Exception ex) { var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; _logger.LogInformation( $"message with messageId: {envelopMessage.Message.MessageId} failed, and current SequenceNumber is: { currentSequenceNumber }, next SequenceNumber after publishing is: { nextSequenceNumberAfterPublish }."); unsuccessfulPublishedMessages.Enqueue(envelopMessage); } }

But before publishing our first message, current publish sequence number will be 1 because with preparing channel for publisher confirm with calling ConfirmSelect, the NextPublishSeqNo set to 1.

But before publishing our first message, the current publish sequence number will be 1 because, when preparing the channel for publisher confirmation by calling [‘ConfirmSelect’] (https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/6.x/projects/RabbitMQ.Client/client/impl/ModelBase.cs#L1235), the “NextPublishSeqNo” is set to “1”.

var currentSequenceNumber = channel.NextPublishSeqNo;

Next, we call BasicPublish() for our first message with the first publish sequence number (NextPublishSeqNo = 1), and now something happens internally within BasicPublish that we describe for a clear understanding of the workflow:

  • When we call BasicPublish, because our current publish sequence number, or NextPublishSeqNo, is 1, it adds this sequence number to a wait for confirmation list of _pendingDeliveryTags, and then the publisher sequence number, or NextPublishSeqNo, will prepare for the next publish and will increase and become 2, also _deliveryTagsCountdown will increase by 1. This flow will repeat for the next BasicPublish calls as well (the sequence numbers will increase regardless of whether the message confirmation is successful or unsuccessful).
  • The message will be published to the broker, and we are waiting for a confirmation (which could be ACK or NACK) from the broker with a HandleBasicAck or HandleBasicNack callback in the ModelBase class of the RabbitMQ client based on our confirmation, which is ACK for successful confirmation or NACK for failed confirmation. Also, this callback from the broker raises BasicAcks or BasicNacks events on the channel. This confirmation callback by the broker happens asynchronously, and if we want to ensure our publishing message is confirmed by the broker before continuing our code, we should block our thread by calling WaitForConfirmsOrDie(timeout) and wait for broker confirmation.
  • Inner broker confirmation callbacks besides raising channel BasicAcks and BasicNacks events, if our _pendingDeliveryTags collection that is waiting for confirmation is not empty and we have one or more messages to confirm, all of these messages depend on multiple (Multiple is a boolean value, and if it is true, all messages with a lower or equal sequence number are confirmed or nack-ed and If it is false, only one message is confirmed or nack-ed) parameter and basic.ack, basic.nack from the broker will process and items will be removed from _pendingDeliveryTags or actually their state will be ack-ed or nack-ed for each message. Then _deliveryTagsCountdown will decrease by 1 for each acknowledged message and will be used by WaitForConfirmsOrDie to block thread until this countdown becomes empty. If there is one nack-ed it will set _onlyAcksReceived to false (will use by WaitForConfirmsOrDie).
  • RabbitMQ will acknowledge all _pendingDeliveryTags up to and including the delivery tag supplied in the acknowledgement when the multiple field is set to true and remove them from the _pendingDeliveryTags collection and decrease the _deliveryTagsCountdown by count of acknowledge messages. For instance, if we have _pendingDeliveryTags 5, 6, 7, and 8 that are not confirmed on the channel yet, all delivery tags from 5 to 8 will be acknowledged when an acknowledgement frame with delivery tag 8 arrives to the channel with multiple equals to true. Otherwise, all messages with pending delivery tags 5, 6, and 7 would still be unacknowledged if multiple was set to false and Just a message with the pending delivery tag 8 will acknowledge.

Now, if we use WaitForConfirmsOrDie after calling BasicPublish(), WaitForConfirmsOrDie will block current thread and checks _deliveryTagsCountdown with a specific timeout, this method wait for given timeout until _deliveryTagsCountdown becomes empty. After that if _deliveryTagsCountdown becomes empty and _onlyAcksReceived is true all messages confirmed, and we con release blocking thread and this method returns to the application but if _onlyAcksReceived is false or we reached to timeout it throws IOException exception.

We wait for confirmation of the first message from the broker. If confirmation from the broker is ack-ed, we ensure our message is delivered successfully to the broker, then the WaitForConfirmsOrDie method releases the blocking thread, and we will continue to publish other messages. But if the message is not confirmed within the timeout or if it is nack-ed (meaning the broker could not take care of it for some reason), the WaitForConfirmsOrDie will throw an IOException exception. The handling of the exception usually consists of logging an error message and/or retrying to send the message. For handling nack-ed messages that will throw an IOException when we are calling WaitForConfirmsOrDie, we catch the exception, log the exception, and add them to the unsuccessfulPublishedMessages collection to process and re-publish them again.

Here, immediately after publishing we wait for confirmation, so the multiple parameter on HandleBasicAck or HandleBasicNack will be false. When we wait for confirmation after each publish, we don’t let the broker do batch confirmation asynchronously in-place. In batch confirmation broker will set the multiple parameter on HandleBasicAck or HandleBasicNack to true. We discuss batch confirmation in the next section.

channel.BasicPublish( exchange: string.Empty, routingKey: typeof(T).Name.Underscore(), basicProperties: properties, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage))); var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; //2 // single confirmation after each publish channel.WaitForConfirmsOrDie(timeout: TimeSpan.FromSeconds(5));

Also our next publish sequence number here will be 2.

The process will be repeated for all messages, one by one. Each message will be published, and we will immediately wait to receive an acknowledgment (ACK or NACK) from the broker to ensure that the message is delivered successfully. If a message fails to be delivered to the broker due to an exception or timeout, we typically catch the exception and, after logging the exception, add it to our unsuccessfulPublishedMessages field to process and republish it again.

When we publish messages individually, we have more control over the delivery of each message, which can be useful when message ordering is important, but it is not very efficient, and it is better to use batch confirmation or an async message confirmation approach.

Publisher Confirm with Batch Messages Syncrosnuslly

Using individual acknowledgments can be a performance problem in high-throughput cases where publishers need to send a lot of messages. The batch acknowledgment mode is an option offered by RabbitMQ to handle this. In this approach, instead of publish each message one by one and waiting for confirmation for each message, we publish a batch of messages, and then we wait for the whole batch to be confirmed by the broker and after that we publish the next batch.

Also, it can decrease the number of connections we need to publish messages compared to publishing them individually, and we can publish batch of messages in place when we use channel.CreateBasicPublishBatch and channel.PublishBatch instead of channel.BasicPublish.

Waiting for confirmation of a batch of messages instead of waiting for confirmation of each message individually is more efficient.

Here I implemented a batch confirmation with batch size 100 and using BasicPublish:

public class SyncBatchPublisherConfirm : IPublisher { private readonly ILogger<SyncBatchPublisherConfirm> _logger; private readonly RabbitMqOptions _rabbitmqOptions; private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary = new(); public SyncBatchPublisherConfirm( IOptions<RabbitMqOptions> rabbitmqOptions, ILogger<SyncBatchPublisherConfirm> logger ) { _logger = logger; _rabbitmqOptions = rabbitmqOptions.Value; } public int TimeOut { get; set; } = 60; public int BatchSize { get; set; } = 100; public async Task PublishAsync(EnvelopMessage message) { await PublishAsync(new List<EnvelopMessage> { message }); } public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages) { Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>(); var factory = new ConnectionFactory { HostName = _rabbitmqOptions.Host, UserName = _rabbitmqOptions.User, Password = _rabbitmqOptions.Password }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); // with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1' channel.ConfirmSelect(); _logger.LogInformation( $"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}" ); var startTime = Stopwatch.GetTimestamp(); channel.BasicAcks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}." ); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; var batchChunk = 0; var messageList = envelopMessages.ToList(); foreach (var envelopMessage in messageList) { channel.QueueDeclare( queue: envelopMessage.Message.GetType().Name.Underscore(), durable: true, exclusive: false, autoDelete: false, arguments: null ); var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = envelopMessage.Metadata; properties.ContentType = "application/json"; properties.Type = TypeMapper.GetTypeName(envelopMessage.Message.GetType()); properties.MessageId = envelopMessage.Message.MessageId.ToString(); var currentSequenceNumber = channel.NextPublishSeqNo; _messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber, envelopMessage); // After publishing publish message sequence number will be incremented channel.BasicPublish( exchange: string.Empty, routingKey: envelopMessage.Message.GetType().Name.Underscore(), basicProperties: properties, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage.Message)) ); batchChunk++; var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId } published, and current SequenceNumber is: { currentSequenceNumber }, Next SequenceNumber after publishing is: { nextSequenceNumberAfterPublish }." ); if ( batchChunk == BatchSize || (batchChunk != BatchSize && (int)currentSequenceNumber == messageList.Count) ) { channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); batchChunk = 0; } } if (unsuccessfulPublishedMessages.Any()) await PublishAsync(unsuccessfulPublishedMessages); _logger.LogInformation("All published messages are confirmed"); var endTime = Stopwatch.GetTimestamp(); _logger.LogInformation( $"Published { messageList.Count } messages and handled confirm asynchronously { Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds } ms" ); } private void RemovedConfirmedMessage(ulong sequenceNumber, bool multiple) { if (multiple) { var confirmed = _messagesDeliveryTagsDictionary.Where(k => k.Key <= sequenceNumber); foreach (var entry in confirmed) { _messagesDeliveryTagsDictionary.TryRemove(entry.Key, out _); } } else { _messagesDeliveryTagsDictionary.TryRemove(sequenceNumber, out _); } } private EnvelopMessage? GetMappedMessage(ulong sequenceNumber) { _messagesDeliveryTagsDictionary.TryGetValue(sequenceNumber, out EnvelopMessage? e); return e; }

Here we have a BatchChunk local variable with initial value 0, and we increase this variable with each publish until we reach our BatchSize, which is 100. After reaching batch size, we wait for confirmations for all messages in this batch before continuing and going to the next batch. Then we reset our “batch chunk” and prepare for the next batch’s confirmations:

if (batchChunk == BatchSize || (batchChunk != BatchSize && (int)currentSequenceNumber == messageList.Count)) { channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); batchChunk = 0; }

Also for correlation and accessing delivered message based received delivery tag inner channel.BasicAcks and channel.BasicNacks we use a mapping dictionary called _messagesDeliveryTagsDictionary and we add published message sequence number which is equivalent to the server’s delivery tag as key and published message as value to this dictionary before publishing message:

var currentSequenceNumber = channel.NextPublishSeqNo; _messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber, envelopMessage); // After publishing publish message sequence number will be incremented channel.BasicPublish( exchange: string.Empty, routingKey: envelopMessage.Message.GetType().Name.Underscore(), basicProperties: properties, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage)) );

Now we can access published messaged based on its delivery tag inner inner channel.BasicAcks and channel.BasicNacks:

channel.BasicAcks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}." ); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); };

And we can get delivered message based based on its delivery tag with this helper:

private EnvelopMessage? GetMappedMessage(ulong sequenceNumber) { _messagesDeliveryTagsDictionary.TryGetValue(sequenceNumber, out EnvelopMessage? e); return e; }

Also after receiving confirmation regardless of acked or nacked inner our channel.BasicAcks and channel.BasicNacks event handlers, we usually do some information logging for acked and successfully delivered message for channel.BasicAcks and when messages are nack-ed inner channel.BasicNacks usually we log error and do some mechanism for republishing nacked message. for both case we should remove corresponding delivery tag entry from _messagesDeliveryTagsDictionary dictionary because their job completed and confirmation received and we don’t need to track them. for removing this entry we call RemovedConfirmedMessage with corresponding sequenceNumber for deleting entry in dictionary:

private void RemovedConfirmedMessage(ulong sequenceNumber, bool multiple) { if (multiple) { var confirmed = _messagesDeliveryTagsDictionary.Where(k => k.Key <= sequenceNumber); foreach (var entry in confirmed) { _messagesDeliveryTagsDictionary.TryRemove(entry.Key, out _); } } else { _messagesDeliveryTagsDictionary.TryRemove(sequenceNumber, out _); } }

For handling nacked messages or unsuccessful confirmations inner our channel.BasicNacks handler, we’ve created a local queue unsuccessfulPublishedMessages and we add nacked messages to this queue for republishing in the end our process.

channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); } if (unsuccessfulPublishedMessages.Any()) await PublishAsync(unsuccessfulPublishedMessages);

Now let’s implement this batch publishing with PublishBatch instead of BasicPublish.

If we want, each message to be published individually, we need to establish a connection between the client and RabbitMQ separately, so when the number of messages increases, our overall permanence will decrease because each establishing connection takes some time. When we publish messages in a batch instead of publishing them separately, we can decrease the number of connections that are required for publishing messages because we use a single connection for publishing a batch of messages. Therefore, we decrease the overhead of establishing connections for each message separately, and our overall performance will increase.

We can send our batch of messages in-place and in a single transaction, which will decrease overhead, establish a connection between the client and RabbitMQ for each published message, and decrease network traffic.

public class SyncBatchPublisherConfirm : IPublisher { private readonly ILogger<SyncBatchPublisherConfirm> _logger; private readonly RabbitMqOptions _rabbitmqOptions; private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary = new(); public SyncBatchPublisherConfirm( IOptions<RabbitMqOptions> rabbitmqOptions, ILogger<SyncBatchPublisherConfirm> logger ) { _logger = logger; _rabbitmqOptions = rabbitmqOptions.Value; } public int TimeOut { get; set; } = 60; public int BatchSize { get; set; } = 5; public async Task PublishAsync(EnvelopMessage message) { await PublishAsync(new List<EnvelopMessage> { message }); } public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages) { Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>(); var factory = new ConnectionFactory { HostName = _rabbitmqOptions.Host, UserName = _rabbitmqOptions.User, Password = _rabbitmqOptions.Password }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); // with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1' channel.ConfirmSelect(); _logger.LogInformation( $"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}" ); var startTime = Stopwatch.GetTimestamp(); channel.BasicAcks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}." ); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; var messageList = envelopMessages.ToList(); Queue<EnvelopMessage> batchQueue = new Queue<EnvelopMessage>(); ulong currentSequenceNumber = channel.NextPublishSeqNo; // 1 foreach (var envelopMessage in messageList) { batchQueue.Enqueue(envelopMessage); if ( batchQueue.Count == BatchSize || ( batchQueue.Count != BatchSize && ((batchQueue.Count - 1) + (int)currentSequenceNumber == messageList.Count) ) ) { currentSequenceNumber = PublishBatch(channel, batchQueue, currentSequenceNumber); channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(50)); batchQueue = new Queue<EnvelopMessage>(); } } if (unsuccessfulPublishedMessages.Any()) await PublishAsync(unsuccessfulPublishedMessages); _logger.LogInformation("All published messages are confirmed"); var endTime = Stopwatch.GetTimestamp(); _logger.LogInformation( $"Published { messageList.Count } messages and handled confirm asynchronously { Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds } ms" ); } private ulong PublishBatch( IModel channel, IEnumerable<EnvelopMessage> envelopMessages, ulong currentSequenceNumber = 1 ) { // Create a batch of messages var batch = channel.CreateBasicPublishBatch(); var batchMessages = envelopMessages.ToList(); foreach (var envelope in batchMessages) { channel.QueueDeclare( queue: envelope.Message.GetType().Name.Underscore(), durable: true, exclusive: false, autoDelete: false, arguments: null ); var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = envelope.Metadata; properties.ContentType = "application/json"; properties.Type = TypeMapper.GetTypeName(envelope.Message.GetType()); properties.MessageId = envelope.Message.MessageId.ToString(); var body = new ReadOnlyMemory<byte>( Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelope.Message)) ); batch.Add( exchange: string.Empty, routingKey: envelope.Message.GetType().Name.Underscore(), mandatory: true, properties: properties, body: body ); _messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber++, envelope); } // Publish the batch of messages in a single transaction, After publishing publish messages sequence number will be incremented. internally will assign `NextPublishSeqNo` for each message and them to pendingDeliveryTags collection batch.Publish(); return channel.NextPublishSeqNo; } }

This is like our previous approach, but instead of using batch confirmation with BasicPublish to publish each message separately and wait to get confirmation for a batch of messages, we publish messages in a batch and wait to get confirmation for this batch, and it is totally more efficient.

For publishing a batch of messages, we should create a batch with channel.CreateBasicPublishBatch() and then add publish messages to this batch with the batch.Add method. After our batch is read to publish, we use the batch.Publish method on the batch for publishing a batch of messages.

var batch = channel.CreateBasicPublishBatch(); foreach (var envelope in batchMessages) { //... var properties = channel.CreateBasicProperties(); batch.Add( exchange: string.Empty, routingKey: envelope.Message.GetType().Name.Underscore(), mandatory: true, properties: properties, body: body ); }

When we publish a batch of messages with batch.Publish, internally, before publishing the batch of messages to the broker, it creates publish sequence number or NextPublishSeqNo for each message in the batch separately.

Generally Waiting for a batch of messages to be confirmed improves throughput drastically over waiting for a confirmation for individual messages up to 20-30 times, but it is still synchronous, and we should block the thread for publishing a batch of messages and getting broker confirmations. If we publish messages in a batch and wait for their confirmations is more effect and reduce cost and number establishing connection between client and rabbitmq and totally improve of performance and network traffic. This approach can increase message publishing time if the batch size is `large because the publisher should wait to get broker confirmations for all messages in the batch before publishing the next batch, so be careful about choosing the correct batch size suitable for your system.

Publisher Confirm Asynchronously

When we use publisher confirm in a batch, we have to wait to get confirmation for a batch of messages simultaneously, and after getting confirmation for all these messages in the batch, we can go ahead and process the next batch for publishing. This approach can be useful when the number of messages being published is low, but when the number of messages increases, our message throughput and performance will decrease. So it is better for high-throughput publisher confirms that we use asynchronous publisher confirm for our messages.

To prevent waiting for confirmations for batches of messages synchronously, which decreases our performance, RabbitMQ supports handling confirmations asynchronously. So our publisher does not have to wait for confirmation before publishing the next message to the broker and can do this asynchronously. The broker confirms published messages asynchronously and sends a notification to its registered event handlers, so to register an event handler for getting notification for a nacked message we should use channel.BasicNacks and for registering an event handler for getting notification when a message ack-ed we should use channel.BasicAcks. with this approach our performance will increase because publisher can publish messages very quickly without waiting for any confirmation (we get confirmation in our callbacks or event handlers), So as a result, our throughput and rate of published messages drastically increased over time.

As we mentioned before, we get the result of each message confirmation in our callbacks. The channel.BasicNacks callback for nacked messages and channel.BasicAcks for ack-ed messages, and we can do our logics inside these callbacks with help for a mapped dictionary for published sequence number and published message, which is called _messagesDeliveryTagsDictionary, and we previously explained it completely in the Publisher Confirm with Batch Messages Synchronously section. inner BasicAcks we can do some logging and other logics, after that we should remove our corresponding ack-ed message with received delivery tag from the _messagesDeliveryTagsDictionary dictionary because its confirmation process completed and we should remove it from _messagesDeliveryTagsDictionary dictionary with using RemovedConfirmedMessage helper method and passing the corresponding delivery tag, which is used as an entry for deleting elements in the dictionary:

channel.BasicAcks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}." ); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); };

Also inner channel. BasicNacks callback for nacked messages, we should log the errors, and then we add our nacked message to our unsuccessful published messages queue unsuccessfulPublishedMessages for republishing at the end of the publishing process. After that, we should also remove our corresponding nacked message with the received delivery tag from the _messagesDeliveryTagsDictionary dictionary because its confirmation process has completed and we should remove it from the _messagesDeliveryTagsDictionary dictionary:

channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); };

Also, when we use publisher confirm, it is possible the broker does some batch confirmation for us, which increases our preference for doing confirmation for multiple messages in-place. When we have a batch confirmation from the broker, we get ea.Multiple with a value of true in our BasicNacks and BasicAcks callbacks. Multiple is a boolean value. If false, only one message is confirmed or nack-ed; if true, all messages with a lower or equal sequence number are confirmed or nack-ed. We explained the multiple confirmation in the Publisher Confirm with Batch Messages Synchronously section.

The complete code for handling publisher confirm asynchronously and using BasicPublish:

public class AsyncPublisherConfirm : IPublisher { private readonly ILogger<AsyncPublisherConfirm> _logger; private readonly RabbitMqOptions _rabbitmqOptions; private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary = new(); public AsyncPublisherConfirm( IOptions<RabbitMqOptions> rabbitmqOptions, ILogger<AsyncPublisherConfirm> logger ) { _logger = logger; _rabbitmqOptions = rabbitmqOptions.Value; } public int TimeOut { get; set; } = 60; public async Task PublishAsync(EnvelopMessage message) { await PublishAsync(new List<EnvelopMessage> { message }); } public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages) { Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>(); var factory = new ConnectionFactory { HostName = _rabbitmqOptions.Host, UserName = _rabbitmqOptions.User, Password = _rabbitmqOptions.Password }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); // with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1' channel.ConfirmSelect(); _logger.LogInformation( $"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}" ); var startTime = Stopwatch.GetTimestamp(); channel.BasicAcks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ ea.DeliveryTag }' and messageId: { envelop?.Message.MessageId } ack-ed, multiple is { ea.Multiple }." ); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ ea.DeliveryTag }' and messageId: { envelop?.Message.MessageId } nack-ed, multiple is { ea.Multiple }." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; var list = envelopMessages.ToList(); foreach (var envelopMessage in list) { channel.QueueDeclare( queue: envelopMessage.Message.GetType().Name.Underscore(), durable: true, exclusive: false, autoDelete: false, arguments: null ); var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = envelopMessage.Metadata; properties.ContentType = "application/json"; properties.Type = TypeMapper.GetTypeName(envelopMessage.Message.GetType()); properties.MessageId = envelopMessage.Message.MessageId.ToString(); var currentSequenceNumber = channel.NextPublishSeqNo; _messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber, envelopMessage); channel.BasicPublish( exchange: string.Empty, routingKey: envelopMessage.Message.GetType().Name.Underscore(), basicProperties: null, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage.Message)) ); var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; _logger.LogInformation( $"message with messageId: { envelopMessage.Message.MessageId } published, and current SequenceNumber is: { currentSequenceNumber }, next SequenceNumber after publishing is: { nextSequenceNumberAfterPublish }." ); } await WaitUntilConditionMet( () => Task.FromResult(_messagesDeliveryTagsDictionary.IsEmpty), TimeOut, "All messages could not be confirmed in 60 seconds" ); if (unsuccessfulPublishedMessages.Any()) await PublishAsync(unsuccessfulPublishedMessages); _logger.LogInformation("All published messages are confirmed"); var endTime = Stopwatch.GetTimestamp(); _logger.LogInformation( $"Published { list.Count } messages and handled confirm asynchronously { Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds } ms" ); }

Now let’s implement this asynchronous confirmation with PublishBatch instead of BasicPublish.

If we want, each message to be published individually, we need to establish a connection between the client and RabbitMQ separately, so when the number of messages increases, our overall permanence will decrease because each establishing connection takes some time. When we publish messages in a batch instead of publishing them separately, we can decrease the number of connections that are required for publishing messages because we use a single connection for publishing a batch of messages. Therefore, we decrease the overhead of establishing connections for each message separately, and our overall performance will increase.

We can send our batch of messages in-place and in a single transaction, which will decrease overhead, establish a connection between the client and RabbitMQ for each published message, and decrease network traffic.

public class PublisherConfirmBatchAsync : IPublisher { private readonly ILogger<PublisherConfirmBatchAsync> _logger; private readonly RabbitMqOptions _rabbitmqOptions; private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary = new(); public PublisherConfirmBatchAsync( IOptions<RabbitMqOptions> rabbitmqOptions, ILogger<PublisherConfirmBatchAsync> logger ) { _logger = logger; _rabbitmqOptions = rabbitmqOptions.Value; } public int TimeOut { get; set; } = 60; public int BatchSize { get; set; } = 100; public async Task PublishAsync(EnvelopMessage message) { await PublishAsync(new List<EnvelopMessage> { message }); } public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages) { Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>(); var factory = new ConnectionFactory { HostName = _rabbitmqOptions.Host, UserName = _rabbitmqOptions.User, Password = _rabbitmqOptions.Password }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); // with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1' channel.ConfirmSelect(); _logger.LogInformation( $"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}" ); var startTime = Stopwatch.GetTimestamp(); channel.BasicAcks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ ea.DeliveryTag }' and messageId: { envelop?.Message.MessageId } ack-ed, multiple is { ea.Multiple }." ); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; channel.BasicNacks += (_, ea) => { var envelop = GetMappedMessage(ea.DeliveryTag); _logger.LogInformation( $"Message with delivery tag '{ ea.DeliveryTag }' and messageId: { envelop?.Message.MessageId } nack-ed, multiple is { ea.Multiple }." ); if (envelop is { }) unsuccessfulPublishedMessages.Enqueue(envelop); RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple); }; var messageList = envelopMessages.ToList(); Queue<EnvelopMessage> batchQueue = new Queue<EnvelopMessage>(); ulong currentSequenceNumber = channel.NextPublishSeqNo; // 1 foreach (var envelopMessage in messageList) { batchQueue.Enqueue(envelopMessage); if ( batchQueue.Count == BatchSize || ( batchQueue.Count != BatchSize && ((batchQueue.Count - 1) + (int)currentSequenceNumber == messageList.Count) ) ) { currentSequenceNumber = PublishBatch(channel, batchQueue, currentSequenceNumber); channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(50)); batchQueue = new Queue<EnvelopMessage>(); } } await WaitUntilConditionMet( () => Task.FromResult(_messagesDeliveryTagsDictionary.IsEmpty), TimeOut, "All messages could not be confirmed in 60 seconds" ); if (unsuccessfulPublishedMessages.Any()) await PublishAsync(unsuccessfulPublishedMessages); _logger.LogInformation("All published messages are confirmed"); var endTime = Stopwatch.GetTimestamp(); _logger.LogInformation( $"Published { messageList.Count } messages and handled confirm asynchronously { Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds } ms" ); } private ulong PublishBatch( IModel channel, IEnumerable<EnvelopMessage> envelopMessages, ulong currentSequenceNumber = 1 ) { // Create a batch of messages var batch = channel.CreateBasicPublishBatch(); var batchMessages = envelopMessages.ToList(); foreach (var envelope in batchMessages) { channel.QueueDeclare( queue: envelope.Message.GetType().Name.Underscore(), durable: true, exclusive: false, autoDelete: false, arguments: null ); var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = envelope.Metadata; properties.ContentType = "application/json"; properties.Type = TypeMapper.GetTypeName(envelope.Message.GetType()); properties.MessageId = envelope.Message.MessageId.ToString(); var body = new ReadOnlyMemory<byte>( Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelope.Message)) ); batch.Add( exchange: string.Empty, routingKey: envelope.Message.GetType().Name.Underscore(), mandatory: true, properties: properties, body: body ); _messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber++, envelope); } // Publish the batch of messages in a single transaction, After publishing publish messages sequence number will be incremented. internally will assign `NextPublishSeqNo` for each message and them to pendingDeliveryTags collection batch.Publish(); return channel.NextPublishSeqNo; } }

Source Code

All source code with different approaches are available in This Repository on the GitHub.

References

Mehdi

Mehdi

Hey, I'm Mehdi Hadeli, a .NET and Golang Software Engineer, interested in cutting-edge technologies, Microservices, DDD, CQRS, Event Sourcing, Event Driven Architecture.