Introducing Rqueue: Redis Queue
Rqueue is a message broker built for the Spring framework backed by Redis. Out of the box, it supports Spring MVC and Spring boot. It provides a very convenient interface to mark a method as a listener/worker like spring-messaging library JmsListener.
In this article listeners, workers, tasks, and messages are used interchangeably.
Motivation
There’re many queuing systems and brokers but why this new one? Most of the queuing system misses the delayed queue feature, where an event must be delivered after some time it could be any arbitrary interval instead of some fixed delay. Many of the web servers now rely on the Redis database, then why not use the existing database setup instead of setting up new infrastructure and doing maintenance for the same?
Key features
- Delayed message delivery: Message delivery can be delayed for any arbitrary time from zero to any number. Zero means deliver it immediately, any number greater than zero means it will not be delivered unless the time has elapsed.
- The maximum number of retries: The number of times the same message would be retried before it can be discarded or moved to a dead letter queue. A message would be retried due to the failure of the underlying workers or any others.
- Dead letter queue: In distributed systems, workers can fail due to many reasons like one or more internal/external service(s) is/are not working as expected; there’s a transient failure in downstream services. Database failure, due to the high number of active connections database server is refusing a new connection. Some buggy code is deployed or the message is not compatible with the existing one. In these scenarios, our expectation is to retry some number of times and then move this to another queue so that we can investigate the failure later.
- Parallel Execution: Consumers and producers can be started independent of each other, which means a producer can produce tasks at a higher rate than the consumer consuming those messages, in such cases, the queue size will keep increasing. To avoid the long queue multiple consumers can be started in parallel to cope up with a high producer rate.
- At least once message delivery: In modern applications, a message needs to be delivered multiple times in many cases some of them are (i) Failure in the underlying workers (ii) Modern application development follows CI/CD, a new application can be deployed as soon as the fix or feature is ready. All new deployments would require a shutdown of existing live applications and deployment of new application code. Even though we have a shutdown hook in place some tasks may have to be canceled due to fix shutdown time etc. In any case, if some tasks are canceled in the middle of execution then we would need to run the same task from the beginning or resume from the middle. Resuming from the middle is not trivial as it requires a great deal of data including instruction pointer, database state, and many more but we can re-run from the beginning. Rqueue guarantees that the same message would be re-delivered due to failure in executions.
Rqueue Internal
Rqueue stores data in two of the Redis data structures internally ZSET and LIST. ZSET is used for delayed tasks and at-least-once delivery mechanism. ZSET is a sorted set, like a priority queue but it does not perform well like min-heap, though it provides log(N) complexity of addition, search, etc. LIST is like a double-ended queue that supports O(1) operation for addition and removal.
Message Submission
All delayed messages are added to the corresponding ZSET while adding a message it checks whether there’re any messages with low priority on the head of the ZSET then it sends a message on the Redis PUB/SUB channel. This message would be consumed by a listener, that would copy messages from ZSET to the end of the LIST in the priority order, and all moved items would be deleted from the ZSET.
Messages having zero delays are immediately added to the end of the LIST.
Rqueue provides multiple APIs to submit a task, in RqueueMessageSender’s eqneueXXX methods. Some of the methods take delay time while others take retry count.
Message Consumption/Task execution
Rqueue removes one element from the head of LIST and adds this to processing ZSET that’s scheduled to be re-process in 15 minutes. If it finds any items in the processing SET with low priority then a message is published on the processing channel that would be consumed by processing listeners. On receiving such messages processing workers copy messages from processing ZSET to LIST and delete them from the processing ZSET. Rqueue internally records the number of times a task is failed, if it finds the execution failure count is greater than the specified retry count then this message can be discarded or moved to the dead letter queue. This configuration is specified using numReties and deadLetterQueue fields. For example, discarding a message if it’s not consumed after three retries can be specified as
Without a dead letter queue
A listener with a dead letter queue
Rqueue internally maintains two types of scheduled workers, these workers pull messages from ZSET to LIST. Even worker has two categories
1. d-scheduled (for delayed ZSET)
2. p-scheduled (for processing ZSET)
p-workers and d-workers also pull messages from their respective ZSETs but these are triggered on Redis PUB/SUB. The message poller continuously polls messages from the LIST and sends them to an executor that executes the listener method.
Rqueue library code: https://github.com/sonus21/rqueue
I’ve built multiple demo applications for this, some of them can be found at the above git repo. Also another story with details https://medium.com/@sonus21/asynchronous-task-execution-using-redis-and-spring-framework-125386c33f9
If you found this post helpful please share across and give it a thumbs up.