What is a message queue?
A message queue, sometimes also called a mailbox, is a piece of software that facilitates communication between threads, processes, and applications. Message queues are a common way to implement service-to-service communication in serverless and microservices architectures.
How do message queues work?
Internally, message queues leverage the queue data structure to store messages. Clients that send, or produce, messages are called producers and clients that receive, or consume, messages are called consumers. Producers and consumers can be different applications on the same system communicating with each other, or they can be applications on different systems connecting over a network.
Message queues facilitate asynchronous communication. In asynchronous communication, the producer and the consumer don’t have to interact with the message queue at the same time. Instead, when the producer places a message on the queue, the queue executes without waiting for the consumer to reply.
The queue stores the message until the consumer dequeues it, at which point the message is deleted. Once the consumer acts on the message, the consumer can send a reply by placing another message on the queue. Effectively, the consumer becomes a producer when it replies this way. Alternatively, the consumer can write a message to a shared place, like a database.
In contrast, a synchronous request-response pattern requires the producer to send a request and then wait until the consumer responds before it can execute. Message queues are considered a non-blocking communication method because they don’t require a response to execute a message.
Where are message queues used?
The asynchronicity of message queues is very appealing to developers. It allows developers to decouple heavyweight applications into smaller, more manageable components that communicate with each other. These applications can be scaled up or down as needed.
For example, imagine a web app takes in an HTTP request that triggers a computationally intensive task, like creating an image thumbnail, processing a video file, starting a database backup, or running a data-crunching algorithm.
A synchronous app starts the task immediately and wouldn’t return any response to the client until the task ended. Since the task runs in the web server, it occupies the server’s resources and can block other clients from connecting to the server. This consumes server resources and results in a poor user experience for everyone involved.
An app using a message sends the HTTP request to the server, which immediately puts the task on the message queue. The server responds to the client immediately, saying that the task has been started, and gives some information about the task (such as a progress bar). A dedicated program, called a worker, takes the task from the queue and run it. Since the worker runs in a different context than the web server, the server’s performance isn’t affected.
If this kind of task is common, the consumer could instantiate multiple workers running in parallel, and dynamically scale the number of workers to suit the load. Similarly, we could have many producers place tasks in the queue and many consumers that dequeue and run them. In every case, each message is processed only once and only by a single consumer.
Decoupling task creation from execution has another benefit: it allows developers to implement workers using different technology than the web app. For example, we could develop the web app in Python while the workers might by implemented in C++.
So message queues are very useful whenever your application needs to handle a request (i) whose response time cannot be determined in advance, or a request (ii)which starts a long-running or resource-intensive task.
An example application
Consider a web app that takes in an HTTP request and sends the task to a messaging queue. A worker dequeues the request, runs it, and stores the result in a database. The web server immediately sends the client a task identifier, which lets the client check on the task status and see the results when the task completes.
In the following example, we’ll build a simple application using the following technologies:
- Falcon framework to implement the web application
- Unicorn application server
- Celery task-queueing system
- RabbitMQ message broker to implement the message queue
- Redis key-value database to store results.
The worker, the messaging queue, and the backend database
We’ll write a simple calculation that determines if a given number is a prime number or not.
In this application, we are using Celery, a distributed task queuing system. It provides a high-level API and acts as a task manager: handling task distribution, scheduling, and coordination.
Celery supports different message brokers and result backends. The first denote systems that, among others, provide a messaging queue implementation, while the second denote mechanisms that store results of completed tasks. We are using RabbitMQ as the message broker and Redis as a key-value database.
For this example, we implement the worker in tasks.py
. Note that while we implemented the worker in Python, it might be more efficient to use something faster, like C++. We’d also want to use a better algorithm for primality testing. This article uses a less efficient method to keep the example simple.
import math
import os
import celery
app = celery.Celery('tasks',
broker=os.environ.get('BROKER'),
backend=os.environ.get('BACKEND'))
@app.task
def is_prime(n):
if n < 2:
return {
'number': n,
'is_prime': False
}
if n == 2:
return {
'number': n,
'is_prime': True
}
for i in range(3, math.ceil(math.sqrt(n)), 2):
if n % i == 0:
return {
'number': n,
'is_prime': False
}
return {
'number': n,
'is_prime': True
}
At the beginning, a Celery instance is created and the message broker and the backend database addresses are set based on the environment variables.
The essence of the worker is the is_prime(n: int)
function. It takes in an integer and returns True
if and only if the given integer n
is a prime number.
This function uses a naïve implementation, which means it’s not the most efficient way to accomplish this task. The functions returns a dictionary that contains two keys: a number
that contains the number whose primality is being tested, and an is_prime
flag that is set to either True
or False
. This dictionary is an auxiliary data structure that allows us to store both the number and its primality in the database.
It is also worth noting that Celery handles most of the work for us: we give it the addresses of the message broker and the backend database, and Celery automatically runs the workers, manages their number, stores results in the database, and so on.
The web application
Next, we’ll implement the web application as a simple RESTful API using the Falcon framework. The following code is saved to the file app.py
.
import json
import falcon
from celery.result import AsyncResult
from tasks import is_prime
class SubmitNumber:
def on_post(self, req, resp, **kwargs):
task = is_prime.delay(int(req.media['number']))
resp.status = falcon.HTTP_200
resp.text = json.dumps({
'status': 'success',
'task_id': task.id
})
class RetrieveResult:
def on_get(self, req, resp, task_id):
task_result = AsyncResult(task_id)
resp.status = falcon.HTTP_200
resp.text = json.dumps({
'status': task_result.status,
'result': task_result.result
})
app = falcon.App()
app.add_route('/', SubmitNumber())
app.add_route('/{task_id}', RetrieveResult())
The on_post
method from class SubmitNumber
implements a REST API endpoint that takes in a POST
request with a JSON payload containing the number to test. It then reads the number, sends the task to the message queue by invoking is_prime.delay(int(req.media['number']))
, and finally returns a JSON message to the client containing the task’s id.
Here is an example request and a response using the HTTPie command line client.
$ http -v POST http://localhost:8000/ number=11
POST / HTTP/1.1
Accept: application/json, */*;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 16
Content-Type: application/json
Host: localhost:8000
User-Agent: HTTPie/2.6.0
{
"number": "11"
}
HTTP/1.1 200 OK
Connection: close
Date: Wed, 21 Jun 2023 08:50:26 GMT
Server: gunicorn
content-length: 72
content-type: application/json
{
"status": "success",
"task_id": "f45b09b4-554a-4871-987e-9ec91e698dae"
}
The response message contains the task_id
(in this case, f45b09b4-554a-4871-987e-9ec91e698dae
) that allows the client to check the result of the submitted task.
The method on_get
in class RetrieveResult
implements this look-up. It reads the task id from the URL, queries the backend database with the help of AsyncResult
class, and returns the result to the client. Here is an example of an HTTP request and a corresponding response.
$ http -v http://localhost:8000/f45b09b4-554a-4871-987e-9ec91e698dae
GET /f45b09b4-554a-4871-987e-9ec91e698dae HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Host: localhost:8000
User-Agent: HTTPie/2.6.0
HTTP/1.1 200 OK
Connection: close
Date: Wed, 21 Jun 2023 08:58:18 GMT
Server: gunicorn
content-length: 65
content-type: application/json
{
"result": {
"is_prime": true,
"number": 11
},
"status": "SUCCESS"
}
Service configuration with docker
Since the entire application consists of multiple services (the web application, the message broker, the backend database, and the workers), we will start them with a set of docker containers.
First, let’s prepare a Dockerfile
that will build an image used both for running the web application and the worker instances.
FROM python:3.11.4-alpine3.18
WORKDIR /usr/src/app
RUN pip install \
redis==4.5.5 \
gunicorn==20.1.0 \
celery==5.3.1 \
falcon==3.1.1
COPY app.py tasks.py /usr/src/app/
The image is based on the Python alpine image and simply installs the required Python libraries and then copies in the file that implements the web application, app.py
, and the file that implements the workers, tasks.py
.
Since the entire application comprises of multiple services, we will be using the docker-compose.yml
to provide their configuration.
version: '3'
services:
falcon-web:
build: .
image: falcon-web
container_name: falcon-web
ports:
- "8000:8000"
command: gunicorn -b 0.0.0.0:8000 app:app
environment:
- BROKER=amqp://rabbitmq
- BACKEND=redis://redis:6379/0
depends_on:
- rabbitmq
celery:
image: falcon-web
command: celery -A tasks worker --loglevel=info
environment:
- BROKER=amqp://rabbitmq
- BACKEND=redis://redis:6379/0
depends_on:
- falcon-web
- rabbitmq
rabbitmq:
image: rabbitmq:3.12.0-management-alpine
ports:
- "5672:5672"
- "15672:15672"
redis:
image: redis:7.2-rc-alpine3.18
First, we specify the falcon-web
web application service using these settings:
- Command
build: .
specifies the services should be built from theDockerfile
specified above. - Commands
ports
andcommand
denote the application will be accessible on all interfaces on port8000
, and it will be run with gunicorn application server. - Command
environment
specifies theBROKER
andBACKEND
environment variables that contain the addresses of the RabbitMQ message broker and the Redis backend database.
The celery
service runs the workers: they are run with the same docker image as the web application, but the command
is different. By default, celery will instantiate the same number of workers as there are cores on the computer running the service.
The rabbitmq
and redis
services use default parameters; so, we won’t do any additional configuration.
Running the application
To build the web application and the worker image, run the following command: docker compose build
. After the command completes, we can run all services with docker compose up
.
To test the service, open another terminal (such as cURL) and send a POST request:
$ curl -X POST http://localhost:8000 \
-H 'Content-Type: application/json' \
-d '{"number": 11}'
{"status": "success", "task_id": "eeaf55ea-8132-4814-bd1d-786a366af779"}
If you’re following along, you might notice the task_id
is different on your end. Since the number is trivially small, the task should complete instantly.
Next, we send another HTTP request to check the result. For instance:
$ curl localhost:8000/eeaf55ea-8132-4814-bd1d-786a366af779
{"status": "SUCCESS", "result": {"number": 11, "is_prime": true}}
Great, it seems to be working!
If you want to check a less trivial example, try the number 2305843009213693951
. While the task is executing, check that the web server remains responsive and other tasks can also be scheduled.
Finally, to see more details about the messaging queue, point your web browser to http://localhost:15672 and type guest
for both username and password. (Needless to say, in a production environment, secure the RabbitMQ console.)