Server-Side Scalability

Message Queues with Redis


Learning Objectives

  • You know how to use Redis as a message queue.
  • You understand the benefits and drawbacks of processing messages asynchronously.

Next, we concretely look into using a message queue to process messages. We start with an example with no message queue, and then create an example with a message queue, but where the messages are processed in the same service. Then, we create another service, and pass messages there.

For the experiment, we use just one replica for the server service in the walking skeleton. That is, set the value of replicas to 1.

  server:
    build: server
    restart: unless-stopped
    volumes:
      - ./server:/app
    env_file:
      - project.env
    depends_on:
      - database
    deploy:
      replicas: 1
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.server-router.entrypoints=web"
      - "traefik.http.routers.server-router.rule=PathPrefix(`/`)"
      - "traefik.http.services.server-service.loadbalancer.server.port=8000"

The concrete work relates to creating users. For the users, we use the following schema for the user that we also used in the chapter on Indexing and Query Optimization.

CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL
);

Baseline implementation

The baseline implementation listens to POST requests on the /users endpoint, reads in user name from a JSON document in the request, and creates a new user in the database using the name would look as follows. The server responds with a status code 202, which tells that the request has been accepted, but it might not have yet been processed.

import { Hono } from "@hono/hono";
import postgres from "postgres";

const app = new Hono();
const sql = postgres();

app.post("/users", async (c) => {
  const { name } = await c.req.json();
  const user = await sql`INSERT INTO users (name) VALUES (${name})`;
  c.status(202);
  return c.body("Accepted");
});

export default app;

When we send a user to the server, the server responds with the Accepted message.

curl -X POST -d '{"name": "Queueing"}' localhost:8000/users
Accepted%

Furthermore, the user has been added to the database.

SELECT * FROM users WHERE name = 'Queueing';
   id   |   name
--------+----------
 100001 | Queueing

Baseline performance test

To test the performance of the baseline implementation, we can use k6. As there is just one endpoint to test, the test becomes quite straightforward — we create a user with a random name, post it to the endpoint, and test that the response has the status code 202.

We do not verify that the user has been added to the database.

The k6 test for the task looks as follows. Note that we use traefik as the address, as the traffic goes through the load balancer.

import http from "k6/http";
import { check } from "k6";

export const options = {
  vus: 5,
  duration: "10s",
};

export default async () => {
  const user = {
    name: `User ${Math.random()}`,
  };

  const url = "http://traefik:8000/users";
  const res = http.post(url, JSON.stringify(user), {
    headers: {
      "Content-Type": "application/json",
    },
  });

  check(res, {
    "status is 202": (r) => r.status === 202,
  });
};

To concretely run the test, we use the command docker compose run --rm --entrypoint=k6 k6-tests run /tests/hello-k6.js, assuming that the test script is called hello-k6.js and it is located in the tests directory of the walking skeleton. The command is run in the root folder of the walking skeleton.

The test results are as follows.

docker compose run --rm --entrypoint=k6 k6-tests run /tests/hello-k6.js
// ...
     ✓ status is 202

     checks.........................: 100.00% 12690 out of 12690
     data_received..................: 1.9 MB  193 kB/s
     data_sent......................: 2.2 MB  216 kB/s
     http_req_blocked...............: avg=6.12µs  min=685ns  med=5.49µs  max=801.66µs p(90)=7.41µs  p(95)=8.43µs
     http_req_connecting............: avg=120ns   min=0s     med=0s      max=371.41µs p(90)=0s      p(95)=0s
     http_req_duration..............: avg=3.74ms  min=2.33ms med=3.39ms  max=12.7ms   p(90)=5.04ms  p(95)=5.24ms
       { expected_response:true }...: avg=3.74ms  min=2.33ms med=3.39ms  max=12.7ms   p(90)=5.04ms  p(95)=5.24ms
     http_req_failed................: 0.00%   0 out of 12690
     http_req_receiving.............: avg=59.69µs min=5µs    med=57.22µs max=891.33µs p(90)=81.95µs p(95)=92.61µs
     http_req_sending...............: avg=24.23µs min=2.23µs med=22.56µs max=574.4µs  p(90)=33.25µs p(95)=39.12µs
     http_req_tls_handshaking.......: avg=0s      min=0s     med=0s      max=0s       p(90)=0s      p(95)=0s
     http_req_waiting...............: avg=3.65ms  min=2.25ms med=3.31ms  max=12.52ms  p(90)=4.96ms  p(95)=5.14ms
     http_reqs......................: 12690   1268.708847/s
     iteration_duration.............: avg=3.92ms  min=2.62ms med=3.56ms  max=13.08ms  p(90)=5.23ms  p(95)=5.45ms
     iterations.....................: 12690   1268.708847/s
     vus............................: 5       min=5              max=5
     vus_max........................: 5       min=5              max=5

That is, when waiting to insert the user to the database on each request, the server handles around 1270 requests per second. None of the requests fail. On a separate check, the data has been added to the database.

Within-service message queue

Let’s next add a message queue to the server service. The message queue is used to process the messages asynchronously. The server responds with the status code 202, but the user is not added to the database immediately. Instead, the user is added to the database asynchronously.

For the task, we use Redis Lists, which can be used as a message queue. When using ioredis as the client, the lpush command is used to add an item to the list, and the brpop command is used to remove and return the first item from the list. The brpop is a blocking command, which means that it waits until an item is added to the list.

Setting up Redis is discussed in the previous part in the chapter on Caching.

The below implementation has two redis connections, one for producing messages to the list, and one for consuming messages from the list. The consumer listens to the list and processes the messages by adding the user to the database. The producer — the server listening to the POST requests — adds users to the list.

import { Hono } from "@hono/hono";
import { Redis } from "ioredis";
import postgres from "postgres";

const app = new Hono();
const sql = postgres();

const redisConsumer = new Redis(6379, "redis");
const redisProducer = new Redis(6379, "redis");

const QUEUE_NAME = "users";

const consume = async () => {
  while (true) {
    const result = await redisConsumer.brpop(QUEUE_NAME, 0);
    if (result) {
      const [queue, user] = result;
      const { name } = JSON.parse(user);
      await sql`INSERT INTO users (name) VALUES (${name})`;
    }
  }
};

consume();

app.post("/users", async (c) => {
  const { name } = await c.req.json();
  await redisProducer.lpush(QUEUE_NAME, JSON.stringify({ name }));
  c.status(202);
  return c.body("Accepted");
});

export default app;

Now, when we send a user to the server, the server again responds with the status code 202.

curl -X POST -d '{"name": "Redis Queueing, Part 1"}' localhost:8000/users
Accepted%

And, when we visit the database, the user has been added.

SELECT * FROM users WHERE name = 'Redis Queueing, Part 1';
   id   |          name
--------+------------------------
 112692 | Redis Queueing, Part 1

Within-service message queue performance test

Let’s next compare the performance of the application with the message queue. We can use the same k6 test as before — now, the test results look as follows.

docker compose run --rm --entrypoint=k6 k6-tests run /tests/hello-k6.js
// ...
     ✓ status is 202

     checks.........................: 100.00% 48763 out of 48763
     data_received..................: 7.4 MB  741 kB/s
     data_sent......................: 8.3 MB  830 kB/s
     http_req_blocked...............: avg=3.92µs   min=529ns    med=2.9µs    max=1.36ms   p(90)=6.66µs  p(95)=8.32µs
     http_req_connecting............: avg=47ns     min=0s       med=0s       max=547.22µs p(90)=0s      p(95)=0s
     http_req_duration..............: avg=902.85µs min=170.91µs med=727.02µs max=9.9ms    p(90)=1.71ms  p(95)=2.16ms
       { expected_response:true }...: avg=902.85µs min=170.91µs med=727.02µs max=9.9ms    p(90)=1.71ms  p(95)=2.16ms
     http_req_failed................: 0.00%   0 out of 48763
     http_req_receiving.............: avg=36.62µs  min=4.48µs   med=27.49µs  max=2.06ms   p(90)=69.01µs p(95)=90.54µs
     http_req_sending...............: avg=14.49µs  min=1.84µs   med=10.76µs  max=1.94ms   p(90)=26.58µs p(95)=34.06µs
     http_req_tls_handshaking.......: avg=0s       min=0s       med=0s       max=0s       p(90)=0s      p(95)=0s
     http_req_waiting...............: avg=851.73µs min=151.03µs med=687.62µs max=9.78ms   p(90)=1.62ms  p(95)=2.06ms
     http_reqs......................: 48763   4876.047428/s
     iteration_duration.............: avg=1.01ms   min=213.39µs med=814.72µs max=10.07ms  p(90)=1.89ms  p(95)=2.38ms
     iterations.....................: 48763   4876.047428/s
     vus............................: 5       min=5              max=5
     vus_max........................: 5       min=5              max=5

That is, when using the message queue, the server handles around 4876 requests per second and none of the requests fail. When checking the database, however, the users are not in the database immediately, as is expected. The users are added to the redis queue, and then processed by the consumer that adds the users to the database.

database=# select count(*) from users;
 count
--------
 125125
(1 row)

database=# select count(*) from users;
 count
--------
 130112
(1 row)

database=# select count(*) from users;
 count
--------
 135099
(1 row)
Loading Exercise...

Separate consumer service

Let’s next create a separate service that consumes the messages from the message queue. For this, create a new folder called server-consumer to the walking skeleton, and add the files Dockerfile, deno.json, and app.js to the folder. The Dockerfile is as follows.

FROM denoland/deno:alpine-2.0.2

WORKDIR /app

COPY deno.json .

RUN DENO_FUTURE=1 deno install

COPY . .

CMD [ "run", "--allow-env", "--allow-net", "--watch", "app.js" ]

The deno.json file includes the ioredis driver and the postgres driver.

{
  "imports": {
    "ioredis": "npm:ioredis@5.4.2",
    "postgres": "https://deno.land/x/postgresjs@v3.4.4/mod.js"
  }
}

And the app.js file includes the consumer that listens to the message queue and adds the users to the database.

import { Redis } from "ioredis";
import postgres from "postgres";

const sql = postgres();

const redisConsumer = new Redis(6379, "redis");

const QUEUE_NAME = "users";

const consume = async () => {
  while (true) {
    const result = await redisConsumer.brpop(QUEUE_NAME, 0);
    if (result) {
      const [queue, user] = result;
      const parsedUser = JSON.parse(user);
      await sql`INSERT INTO users (name) VALUES (${parsedUser.name})`;
    }
  }
};

consume();

As you notice, the service does not include a web server, as it is just a process that consumes the messages from the message queue.

Then, add the service to the compose.yaml file.

  consumer-server:
    build: consumer-server
    restart: unless-stopped
    volumes:
      - ./consumer-server:/app
    env_file:
      - project.env
    depends_on:
      - redis
      - database

And, finally, modify the app.js of the server service, removing the consumption of messages.

import { Hono } from "@hono/hono";
import { Redis } from "ioredis";

const app = new Hono();

const redisProducer = new Redis(6379, "redis");

const QUEUE_NAME = "users";

app.post("/users", async (c) => {
  const { name } = await c.req.json();
  await redisProducer.lpush(QUEUE_NAME, JSON.stringify({ name }));
  c.status(202);
  return c.body("Accepted");
});

export default app;

Now, when you restart the services, the server still responds with the status code 202.

curl -X POST -d '{"name": "Redis Queueing, Part 2"}' localhost:8000/users
Accepted%

And, when you visit the database, the user has — at least after a while — been added to the database.

   id   |          name
--------+------------------------
 161456 | Redis Queueing, Part 2

Separate service message queue performance test

Let’s next test the performance of the application with the separate consumer service. We can again use the same k6 test as before — now, the test results look as follows.

     ✓ status is 202

     checks.........................: 100.00% 54236 out of 54236
     data_received..................: 8.2 MB  824 kB/s
     data_sent......................: 9.2 MB  923 kB/s
     http_req_blocked...............: avg=3.72µs   min=550ns    med=2.87µs   max=1.17ms   p(90)=6.14µs  p(95)=7.4µs
     http_req_connecting............: avg=36ns     min=0s       med=0s       max=514.2µs  p(90)=0s      p(95)=0s
     http_req_duration..............: avg=804.63µs min=170.92µs med=692.73µs max=10.34ms  p(90)=1.37ms  p(95)=1.71ms
       { expected_response:true }...: avg=804.63µs min=170.92µs med=692.73µs max=10.34ms  p(90)=1.37ms  p(95)=1.71ms
     http_req_failed................: 0.00%   0 out of 54236
     http_req_receiving.............: avg=34.59µs  min=4.58µs   med=26.89µs  max=1.08ms   p(90)=62.43µs p(95)=78.33µs
     http_req_sending...............: avg=13.81µs  min=1.99µs   med=10.51µs  max=808.88µs p(90)=24.57µs p(95)=30.4µs
     http_req_tls_handshaking.......: avg=0s       min=0s       med=0s       max=0s       p(90)=0s      p(95)=0s
     http_req_waiting...............: avg=756.21µs min=148.31µs med=654.74µs max=10.21ms  p(90)=1.29ms  p(95)=1.61ms
     http_reqs......................: 54236   5423.123737/s
     iteration_duration.............: avg=911.23µs min=229.66µs med=777.43µs max=11.85ms  p(90)=1.54ms  p(95)=1.91ms
     iterations.....................: 54236   5423.123737/s
     vus............................: 5       min=5              max=5
     vus_max........................: 5       min=5              max=5

Now, when using the separate consumer service, the server handles around 5423 requests per second and none of the requests fail. When checking the database, the users are again slowly being added to the database, as expected.

database=# select count(*) from users;
 count
--------
 174228
(1 row)

database=# select count(*) from users;
 count
--------
 178352
(1 row)

Summary

Overall, the server handled approximately 1270 requests per second when handling the requests synchronously, 4876 requests per second when handling the requests asynchronously but within the same server, and 5423 requests per second when using a separate consumer service. All the tests were conducted on a single replica of the server service on a laptop, where the laptop was running the same services on all the tests.

For reliability, the tests should be repeated multiple times, and the results should be averaged, possibly dropping the best and worst result. Regardless, there is strong evidence that adding asynchronous processing to the server service improves the throughput. However, this comes at a cost — when storing the data asynchronously, the users are not immediately added to the database, which might be a requirement in some applications.

Loading Exercise...

Further improvements

There is plenty of room to improve. For example:

  • The database inserts could be done in batches to improve the performance. Instead of inserting one user at a time, multiple users could be inserted at once.
  • The producer and consumer could be scaled to multiple replicas.
  • The database schema could be changed so that there is no serial primary key, but an UUID instead. This would allow the producer to generate the UUID for the user, and the consumer to use the same UUID when adding the user to the database. This way, the response could already return the UUID, giving the client a way to track the user.