Rqueue V2.0 [Asynchronous task execution using Redis]

Sonu Kumar
5 min readMay 24, 2020

Rqueue is a message broker built for the Spring framework backed by Redis. It supports Spring MVC and Spring boot. It provides a very convenient interface RqueueListener to mark a method as a listener/worker.

Rqueue V1.0 was released in November 2019, now I’m introducing to V2.0, version 2.0 has many new features, some interesting features in this version.

Concurrency

Each queue has different level of throughputs, some of the queue might be getting thousands of messages per minute while others might be receiving tens of messages per minute or so. Handling different workloads require different consumer concurrency, if we use the same concurrency for both the consumers then messages in high MPS (Message Per second) queue would create back-pressure on the producer i.e consumer would be lagging behind the producer. Now we can specify the concurrency for the queue using “concurrency” annotation field. Concurrency can be provided in either as “min-max” e.g “5–10” or “max” e.g. “10”. When min-max is used then min number of threads are always ready to handle consumer work loads while when only max is used then one thread is always active and it increases as per the load to the max number of threads.

Queue Priority

Rqueue supports two types of priority mode

  1. Weighted
  2. Strict

Priority of any queue can be specified in two forms

  1. priority=”4"
  2. priority=”critical=4,high=3,low=1".

In the first example priority=”4" means, the priority for the specified queue is 4, in this case we can specify priority group as well to segregate the related priority listeners, otherwise all queues are placed in a default priority group

In the 2nd example priority is specified for the specific-sub queue, for example let’s assume sms queue has three levels like critical for OTP, high for transaction and low for promotional sms. As we know critical SMS should be sent as soon as possible while high and low priorities sms can be delayed due to higher MPS. To utilise this queue level we need to enqueue tasks in the specific priority, there’re new methods to enqueue tasks in such queue using

enqueueWithPriority(“sms”, “critical”, message)

enqueueInWithPriority(“sms”, “low”, message, Duration.ofSeconds(30 ) )

How does these modes affect your application?

Let’s consider a simple example we have 3 queues Q1,Q2 and Q3 and their priority as 6,4 and 2 respectively.

Queue Priority

Weighted Priority

Weighted priority works in round robin fashion, in this example Q1 would be polled 6 times, Q2 4 times and Q3 2 times and again this process repeats. After polling if it’s found that any queue does not have more tasks than their weight is reduced by Δ e.g Q1 does not have any item then it’s weight is reduces by Δ = currentWeight * (1-(6/(6+4+2)). As soon as the weight becomes <= 0, queue is inactive and weight is reinitialised when all queues become inactive.

This algorithm is implemented in WeightedPriorityPoller

Strict Priority

In Strict priority mode, consumer would always consume tasks from the highest priority queue, that’s Q1 here. After consuming if it encounters that there’re no more tasks to do than that queue becomes inactive for polling interval, to avoid starvation a queue can be inactive for maximum of 1 minute.

This algorithm is implemented in StrictPriorityPoller

Web Dashboard

Rqueue has got new dashboard to see what’s going on, dashboard has multiple components for multiple features. It includes two types of graph

  1. Latency graph: latency for all queues or specific queues on daily basis upto 90 days. Latency has min, max and average latency.
  2. Queue statistics: How many messages were retried, executed, moved to dead letter, discarded due to retry limit exceeded.

Allows to delete any enqueued messages, either it’s in scheduled to run or it’s running or it’s waiting to be consumed. It also allows you to see queue internal messages like SQS dashboard.

Queue Configuration

Queue Configuration

Pending Message Count

Job Queue

Queue Messages

Job Queue Execution Statistics

Execution Statistics

Utility Dashboard to move some messages from here and there

Utility Dashboard

Redis Cluster

Now you can use Redis cluster setup with Rqueue. Rqueue stores data in different Redis collections, that makes it more susceptible to cross slot error. To work around this problem we need to use hash tag approach for naming collection. Refer: https://redis.io/topics/cluster-spec

Task Execution Backoff

No system is free of failure, any task can fail for many reasons while executing, it’s a wise to delay the execution of a task if it’s failing so frequently like try in 5 seconds then next try in 5 or 10 seconds etc. Rqueue has two task execution backoff implementations, linear and exponential, by default linear backoff is used to delay the failing tasks at the interval of 5 seconds. In exponential backoff delay is computed as

delay = initialDelay * Math.pow(multiplier, failureCount)

InitialDelay is 1.5 seconds and multiplier=1.5

Get the latest release from maven central https://search.maven.org/artifact/com.github.sonus21/rqueue-spring-boot-starter/2.0.1-RELEASE/pom

Fork Rqueue and give a start https://github.com/sonus21/rqueue

Next upcoming feature is support for Redis stream

--

--