Bunnies are going to Broadcast Asia in Singapore! 🐰 - Let's dive into your future success with bunny.net🚀

Book a Meeting
x

What is a Message Queue?

What is a Message Queue?

What is a message queue?

A message queue, sometimes also called a mailbox, is a piece of software that facilitates communication between threads, processes, and applications. Message queues are a common way to implement service-to-service communication in serverless and microservices architectures.

How do message queues work?

Internally, message queues leverage the queue data structure to store messages. Clients that send, or produce, messages are called producers and clients that receive, or consume, messages are called consumers. Producers and consumers can be different applications on the same system communicating with each other, or they can be applications on different systems connecting over a network.

Message queues facilitate asynchronous communication. In asynchronous communication, the producer and the consumer don’t have to interact with the message queue at the same time. Instead, when the producer places a message on the queue, the queue executes without waiting for the consumer to reply.

The queue stores the message until the consumer dequeues it, at which point the message is deleted. Once the consumer acts on the message, the consumer can send a reply by placing another message on the queue. Effectively, the consumer becomes a producer when it replies this way. Alternatively, the consumer can write a message to a shared place, like a database.

In contrast, a synchronous request-response pattern requires the producer to send a request and then wait until the consumer responds before it can execute. Message queues are considered a non-blocking communication method because they don’t require a response to execute a message.

Where are message queues used?

The asynchronicity of message queues is very appealing to developers. It allows developers to decouple heavyweight applications into smaller, more manageable components that communicate with each other. These applications can be scaled up or down as needed.

For example, imagine a web app takes in an HTTP request that triggers a computationally intensive task, like creating an image thumbnail, processing a video file, starting a database backup, or running a data-crunching algorithm.

A synchronous app starts the task immediately and wouldn’t return any response to the client until the task ended. Since the task runs in the web server, it occupies the server’s resources and can block other clients from connecting to the server. This consumes server resources and results in a poor user experience for everyone involved.

An app using a message sends the HTTP request to the server, which immediately puts the task on the message queue. The server responds to the client immediately, saying that the task has been started, and gives some information about the task (such as a progress bar). A dedicated program, called a worker, takes the task from the queue and run it. Since the worker runs in a different context than the web server, the server’s performance isn’t affected.

If this kind of task is common, the consumer could instantiate multiple workers running in parallel, and dynamically scale the number of workers to suit the load. Similarly, we could have many producers place tasks in the queue and many consumers that dequeue and run them. In every case, each message is processed only once and only by a single consumer.

Decoupling task creation from execution has another benefit: it allows developers to implement workers using different technology than the web app. For example, we could develop the web app in Python while the workers might by implemented in C++.

So message queues are very useful whenever your application needs to handle a request (i) whose response time cannot be determined in advance, or a request (ii)which starts a long-running or resource-intensive task.

An example application

Consider a web app that takes in an HTTP request and sends the task to a messaging queue. A worker dequeues the request, runs it, and stores the result in a database. The web server immediately sends the client a task identifier, which lets the client check on the task status and see the results when the task completes.

In the following example, we’ll build a simple application using the following technologies:

The worker, the messaging queue, and the backend database

We’ll write a simple calculation that determines if a given number is a prime number or not.

In this application, we are using Celery, a distributed task queuing system. It provides a high-level API and acts as a task manager: handling task distribution, scheduling, and coordination.

Celery supports different message brokers and result backends. The first denote systems that, among others, provide a messaging queue implementation, while the second denote mechanisms that store results of completed tasks. We are using RabbitMQ as the message broker and Redis as a key-value database.

For this example, we implement the worker in tasks.py. Note that while we implemented the worker in Python, it might be more efficient to use something faster, like C++. We’d also want to use a better algorithm for primality testing. This article uses a less efficient method to keep the example simple.

import math
import os

import celery

app = celery.Celery('tasks',
                    broker=os.environ.get('BROKER'),
                    backend=os.environ.get('BACKEND'))


@app.task
def is_prime(n):
    if n < 2:
        return {
            'number': n,
            'is_prime': False
        }

    if n == 2:
        return {
            'number': n,
            'is_prime': True
        }

    for i in range(3, math.ceil(math.sqrt(n)), 2):
        if n % i == 0:
            return {
                'number': n,
                'is_prime': False
            }

    return {
        'number': n,
        'is_prime': True
    }

At the beginning, a Celery instance is created and the message broker and the backend database addresses are set based on the environment variables.

The essence of the worker is the is_prime(n: int) function. It takes in an integer and returns True if and only if the given integer n is a prime number.

This function uses a naïve implementation, which means it’s not the most efficient way to accomplish this task. The functions returns a dictionary that contains two keys: a number that contains the number whose primality is being tested, and an is_prime flag that is set to either True or False. This dictionary is an auxiliary data structure that allows us to store both the number and its primality in the database.

It is also worth noting that Celery handles most of the work for us: we give it the addresses of the message broker and the backend database, and Celery automatically runs the workers, manages their number, stores results in the database, and so on.

The web application

Next, we’ll implement the web application as a simple RESTful API using the Falcon framework. The following code is saved to the file app.py.

import json

import falcon
from celery.result import AsyncResult

from tasks import is_prime


class SubmitNumber:

    def on_post(self, req, resp, **kwargs):
        task = is_prime.delay(int(req.media['number']))

        resp.status = falcon.HTTP_200
        resp.text = json.dumps({
            'status': 'success',
            'task_id': task.id
        })


class RetrieveResult:

    def on_get(self, req, resp, task_id):
        task_result = AsyncResult(task_id)
        resp.status = falcon.HTTP_200
        resp.text = json.dumps({
            'status': task_result.status,
            'result': task_result.result
        })


app = falcon.App()

app.add_route('/', SubmitNumber())
app.add_route('/{task_id}', RetrieveResult())

The on_post method from class SubmitNumber implements a REST API endpoint that takes in a POST request with a JSON payload containing the number to test. It then reads the number, sends the task to the message queue by invoking is_prime.delay(int(req.media['number'])), and finally returns a JSON message to the client containing the task’s id.

Here is an example request and a response using the HTTPie command line client.

$ http -v POST http://localhost:8000/ number=11
POST / HTTP/1.1
Accept: application/json, */*;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 16
Content-Type: application/json
Host: localhost:8000
User-Agent: HTTPie/2.6.0

{
    "number": "11"
}


HTTP/1.1 200 OK
Connection: close
Date: Wed, 21 Jun 2023 08:50:26 GMT
Server: gunicorn
content-length: 72
content-type: application/json

{
    "status": "success",
    "task_id": "f45b09b4-554a-4871-987e-9ec91e698dae"
}

The response message contains the task_id (in this case, f45b09b4-554a-4871-987e-9ec91e698dae) that allows the client to check the result of the submitted task.

The method on_get in class RetrieveResult implements this look-up. It reads the task id from the URL, queries the backend database with the help of AsyncResult class, and returns the result to the client. Here is an example of an HTTP request and a corresponding response.

$ http -v http://localhost:8000/f45b09b4-554a-4871-987e-9ec91e698dae
GET /f45b09b4-554a-4871-987e-9ec91e698dae HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Host: localhost:8000
User-Agent: HTTPie/2.6.0



HTTP/1.1 200 OK
Connection: close
Date: Wed, 21 Jun 2023 08:58:18 GMT
Server: gunicorn
content-length: 65
content-type: application/json

{
    "result": {
        "is_prime": true,
        "number": 11
    },
    "status": "SUCCESS"
}

Service configuration with docker

Since the entire application consists of multiple services (the web application, the message broker, the backend database, and the workers), we will start them with a set of docker containers.

First, let’s prepare a Dockerfile that will build an image used both for running the web application and the worker instances.

FROM python:3.11.4-alpine3.18
WORKDIR /usr/src/app
RUN pip install \
    redis==4.5.5 \
    gunicorn==20.1.0 \
    celery==5.3.1 \
    falcon==3.1.1
COPY app.py tasks.py /usr/src/app/

The image is based on the Python alpine image and simply installs the required Python libraries and then copies in the file that implements the web application, app.py, and the file that implements the workers, tasks.py.

Since the entire application comprises of multiple services, we will be using the docker-compose.yml to provide their configuration.

version: '3'

services:
  falcon-web:
    build: .
    image: falcon-web
    container_name: falcon-web
    ports:
      - "8000:8000"
    command: gunicorn -b 0.0.0.0:8000 app:app
    environment:
      - BROKER=amqp://rabbitmq
      - BACKEND=redis://redis:6379/0
    depends_on:
      - rabbitmq

  celery:
    image: falcon-web
    command: celery -A tasks worker --loglevel=info
    environment:
      - BROKER=amqp://rabbitmq
      - BACKEND=redis://redis:6379/0
    depends_on:
      - falcon-web
      - rabbitmq

  rabbitmq:
    image: rabbitmq:3.12.0-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"

  redis:
    image: redis:7.2-rc-alpine3.18

First, we specify the falcon-web web application service using these settings:

  • Command build: . specifies the services should be built from the Dockerfile specified above.
  • Commands ports and command denote the application will be accessible on all interfaces on port 8000, and it will be run with gunicorn application server.
  • Command environment specifies the BROKER and BACKEND environment variables that contain the addresses of the RabbitMQ message broker and the Redis backend database.

The celery service runs the workers: they are run with the same docker image as the web application, but the command is different. By default, celery will instantiate the same number of workers as there are cores on the computer running the service.

The rabbitmq and redis services use default parameters; so, we won’t do any additional configuration.

Running the application

To build the web application and the worker image, run the following command: docker compose build. After the command completes, we can run all services with docker compose up.

To test the service, open another terminal (such as cURL) and send a POST request:

$ curl -X POST http://localhost:8000 \
            -H 'Content-Type: application/json' \
            -d '{"number":  11}'
{"status": "success", "task_id": "eeaf55ea-8132-4814-bd1d-786a366af779"}

If you’re following along, you might notice the task_id is different on your end. Since the number is trivially small, the task should complete instantly.

Next, we send another HTTP request to check the result. For instance:

$ curl localhost:8000/eeaf55ea-8132-4814-bd1d-786a366af779
{"status": "SUCCESS", "result": {"number": 11, "is_prime": true}}

Great, it seems to be working!

If you want to check a less trivial example, try the number 2305843009213693951. While the task is executing, check that the web server remains responsive and other tasks can also be scheduled.

Finally, to see more details about the messaging queue, point your web browser to http://localhost:15672 and type guest for both username and password. (Needless to say, in a production environment, secure the RabbitMQ console.)

Did you find this article helpful?

0 out of 0 Bunnies found this article helpful

Glossary

REST

Representational State Transfer. An architectural style for designing web services and APIs.

Message Queue

A piece of software that facilitates asynchronous communication.

API

Architectural Programming Interface. A set of coding rules that can be used to let different programs talk to each other.

Prove your Knowledge.
Earn a bunny diploma.

Test your knowledge in our Junior and Master Quizes to see where you stand. Prove your mastery by getting A+ and recieving a diploma.

Start the QuizBunny with a diploma.