This is a simple implementation of RabbitMQ consumer and sender. It was initially developed with the aim of learning how to use the official RabbitMQ c# library in simple scenarios. It is in use in several projects in production environments and has proved to be stable. These projects include Rest APIs, windows services, .net Core services (windows and Linux) and others.
Features
- Sender implementation
- Multiple consumer instances supported
- Multiple processing options for received messages
- Random expiration for messages sent to an holding queue (depending on the processing option)
- TLS connection support
- Limiter for message processing
- Message properties for more advanced scenarios such as queues with support for priority messages, messages Headers, etc.
Current version
Current version id 2.4.3
Release notes for version 2.4.3
- Added service extensions AddConsumersService() and AddSenderService() (in namespace JN.RabbitMQClient.Extensions)
- Added ConnectionDetails readonly property
- Renamed property GetTotalConsumers to TotalConsumers
- Renamed property GetTotalRunningConsumers to TotalRunningConsumers
- Updated RabbitMQ.Client to the latest version
Release notes for version 2.4.2
- Update RabbitMQ.Client to latest version.
- Added
ConsumersPrefetch
property (in consumer service) - Bug fixes
Release notes for version 2.4.1
- Update RabbitMQ.Client to latest version.
- Bug fixes
Release notes for version 2.4.0
- Added support for message properties (in sender and consumer classes); messages can now be sent for more advanced scenarios, such as queues with support for priority messages, messages Headers, etc.
- Merged sender classes; feature for keep connection open was imported to the main sender class
- Changed type for MaxChannelsPerConnection property (in consumer service)
- Bug fixes
Release notes for version 2.3.4
- Added support for additional information to be passed to the processing delegate; the processing instruction is now an object
MessageProcessInstruction
where that additional information can be passed. Useful for when a message is requeued with delay to pass information to the next processing attempt.
Release notes for version 2.3.3
- Update target frameworks; added .NETFramework4.6.1
- Update RabbitMQ.Client to latest version
- Update consumer to expose MaxChannelsPerConnection property
Release notes for version 2.3.2
- Added sender service that keeps connection open (
RabbitMqSenderService2
class)
Release notes for version 2.3.0
- Update RabbitMQ.Client Library to 6.2.1
- Changed namespace for IRabbitMqConsumerService and IRabbitMqSenderService
- Changed behavior for StopConsumers(consumerTag) – now stops all consumers with tag starting with ‘consumerTag’
- Added limiter feature
Release notes for version 2.2.1
- Update RabbitMQ.Client Library to 6.0.0
- Upgrade to .NET Standard 2.1
- Solved bug in connect port
- TLS connection support
Install
Download the package from NuGet:
Install-Package JN.RabbitMQClient -version [version number]
The package is available here and source code is available here.
About sending messages
There are two services available to send messages:
- service
RabbitMqSenderService
that implementsIRabbitMqSenderService
interface. It will create one connection per message sent. It is not suitable for sending large amounts of messages. - service
RabbitMqSenderService2
that implementsIRabbitMqSenderServiceKeepConnection
interface. It will keep the connection open while the object is not disposed. It is most suitable for sending large amounts of messages.
Usage – consuming messages
First, you must create the RabbitMqConsumerService
and then define delegates for ReceiveMessage
, ShutdownConsumer
and ReceiveMessageError
. The service will start the required number of consumers when StartConsumers
is called. To use a retry queue, the method StartConsumers
should be called with a RetryQueueDetails
object.
Message processing instructions
The ReceiveMessage
delegate receives and processes the message. After the message is processed it returns a message processing instruction (object
).MessageProcessInstruction
Instructions
OK
– message is considered as successfully processed
RequeueMessageWithDelay
– message is removed from the queue, but sent to a retry queue for later processing (typically with a dead letter configuration)
IgnoreMessage
– message is removed from the queue and ignored
IgnoreMessageWithRequeue
– message is rejected and sent back to the queue
Requeue message with delay
The RequeueMessageWithDelay
processing instructions allows a message to be processed later. This is to be used with a secondary queue that will receive the message to be processed. When the message is sent to that queue the timestamp and expiration properties are set. Later, when the message expires on the secondary queue, it is sent back to the main queue. When that happens, the timestamp can be verified and if the elapsed time is longer than allowed, then the message can be ignored (with IgnoreMessage
instruction).
For this to work, a configuration like the following could be used.
Example
- MainQeue – main queue where consumers are connected
- HoldingQueue – secondary queue to hold retry messages; when a message needs to be processed later it will be sent to this queue.
- TestExchangeHolding – a dead letter exchange to redirect messages from HoldingQueue to MainQeue when they expire
Configuration
- HoldingQueue should be configured with “x-dead-letter-exchange” parameter as “TestExchangeHolding”.
- TestExchangeHolding exchange should have a binding to MainQeue
Consumer configuration
To use a retry queue, consumers must be configured. When consumers are started a RetryQueueDetails
object must be provided.
Example:
var details = new RetryQueueDetails
{
RetryQueue="HoldingQueue",
RetentionPeriodInRetryQueueMilliseconds = 1000,
RetentionPeriodInRetryQueueMillisecondsMax = 5000
};
This will define the retry queue as “HoldingQueue” and the retention period for each message will be a random value from 1 to 5 seconds. To disabled the random value RetentionPeriodInRetryQueueMillisecondsMax
can be set to 0 or to same value as RetentionPeriodInRetryQueueMilliseconds
.
About TLS connect support
It is possible to connect to a RabbitMQ using TLS. For this, UseTLS
must be true
in the configuration object. See the example below.
Client certificates are not supported.
Processing limiter
We can limit the processing of messages. This can be useful if consumers are unable to process all messages or simply need to slow down the processing of messages.
For this we have to provide an implementation of the ILimiter
interface to the consumer service. Please see next example.
public class MyApp
{
private readonly IRabbitMqConsumerService _consumerService;
private readonly IRabbitMqSenderService _senderService;
private readonly AppConfig _config;
public MyApp(IRabbitMqConsumerService consumerService, IRabbitMqSenderService senderService, ILimiter limiter)
{
_consumerService = consumerService;
_senderService = senderService;
_consumerService.ServiceDescription = "Consumer Service";
_consumerService.ReceiveMessage += ProcessMessage;
_consumerService.ShutdownConsumer += ProcessShutdown;
_consumerService.ReceiveMessageError += ProcessError;
_consumerService.Limiter = limiter; // setup the limiter
_senderService.ServiceDescription = "Sender Service";
}
//... ...
}
It’s important to note that messages are always removed from the queue. The ILimiter
provided to consumer service will decide if the received message can be processed or not – method IsAllowed()
. If the message can’t be processed then the processing delegate will not be executed and the processing instruction defined by DeniedProcessInstruction
property is returned.
This feature can be useful when combined with an holding queue. In this case, messages that can’t be processed are sent to the holding queue for later processing.
A default ILimiter
implementation is provided. That is the WindowLimiter
class that limits processing to N messages in the defined time window. The next example will return a limiter that allows 3 messages per second. If the message can’t be processed then it will be requeded with a delay (sent to the holding queue).
private static WindowLimiter GetLimiter()
{
const int maxAllowed = 3; // number of items to process in the time window
const int windowSeconds = 1;
const Constants.MessageProcessInstruction deniedInstruction = Constants.MessageProcessInstruction.RequeueMessageWithDelay;
return new WindowLimiter(maxAllowed, windowSeconds, deniedInstruction);
}
Utilites service
A small utilites service class RabbitMqUtilitiesService
is provided with methods to create, delete and get the total number of items in a queue.
Example
Example for consumer and sender services:
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
// consumer
var consumerService = new RabbitMqConsumerService(GetBrokerConfigConsumers());
consumerService.ReceiveMessage += ReceiveMessage;
consumerService.ShutdownConsumer += ShutdownConsumer;
consumerService.ReceiveMessageError += ReceiveMessageError;
consumerService.MaxChannelsPerConnection = 5; // default is 3
consumerService.ConsumersPrefetch = 2; // default is 1
consumerService.ServiceDescription = "test consumer service";
consumerService.StartConsumers("my consumer");
// sender
var senderService = new RabbitMqSenderService(GetBrokerConfigSender());
IMessageProperties properties = new MessageProperties { Priority = 3 };
senderService.Send("my message", properties);
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
consumerService.Dispose();
}
private static IBrokerConfigSender GetBrokerConfigSender()
{
IBrokerConfigSender configSender = new BrokerConfigSender
{
Username = "test",
Password = "123",
Host = hostName,
VirtualHost = "MyVirtualHost",
RoutingKeyOrQueueName = "MyTestQueue",
KeepConnectionOpen = true
};
return configSender;
}
private static IBrokerConfigConsumers GetBrokerConfigConsumers()
{
IBrokerConfigConsumers configConsumers = new BrokerConfigConsumers
{
Username = "test",
Password = "123",
Host = hostName,
VirtualHost = "MyVirtualHost",
RoutingKeyOrQueueName = "MyTestQueue",
ShuffleHostList = false,
Port = 0,
TotalInstances = 4
};
return configConsumers;
}
private static async Task ReceiveMessageError(string routingKeyOrQueueName, string consumerTag, string exchange, string message, string errorMessage)
{
await Console.Out.WriteLineAsync($"Error: '{consumerTag}' | Queued message: {message} | Error message: {errorMessage}").ConfigureAwait(false);
}
private static async Task ShutdownConsumer(string consumerTag, ushort errorCode, string shutdownInitiator, string errorMessage)
{
await Console.Out.WriteLineAsync($"Shutdown '{consumerTag}' | {errorCode} | {shutdownInitiator} | {errorMessage}").ConfigureAwait(false);
}
private static async Task<MessageProcessInstruction> ReceiveMessage(string routingKeyOrQueueName, string consumerTag, long firstErrorTimestamp, string exchange, string message, string additionalInfo, IMessageProperties properties)
{
var priorityReceived = properties.Priority;
var newPriority = (byte)(priorityReceived <= 3 ? 5 : priorityReceived);
await Console.Out.WriteLineAsync($"Message received by '{consumerTag}' from queue '{routingKeyOrQueueName}': {message}; Priority received: {properties.Priority} ").ConfigureAwait(false);
return new MessageProcessInstruction
{
Value = Constants.MessageProcessInstruction.OK,
Priority = newPriority,
AdditionalInfo = "id: 123"
};
}
}