Message Queues with Redis
Learning Objectives
- You know how to use Redis as a message queue.
- You understand the benefits and drawbacks of processing messages asynchronously.
Next, we concretely look into using a message queue to process messages. We start with an example with no message queue, and then create an example with a message queue, but where the messages are processed in the same service. Then, we create another service, and pass messages there.
For the experiment, we use just one replica for the server service in the walking skeleton. That is, set the value of replicas
to 1
.
server:
build: server
restart: unless-stopped
volumes:
- ./server:/app
env_file:
- project.env
depends_on:
- database
deploy:
replicas: 1
labels:
- "traefik.enable=true"
- "traefik.http.routers.server-router.entrypoints=web"
- "traefik.http.routers.server-router.rule=PathPrefix(`/`)"
- "traefik.http.services.server-service.loadbalancer.server.port=8000"
The concrete work relates to creating users. For the users, we use the following schema for the user that we also used in the chapter on Indexing and Query Optimization.
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL
);
Baseline implementation
The baseline implementation listens to POST requests on the /users
endpoint, reads in user name from a JSON document in the request, and creates a new user in the database using the name would look as follows. The server responds with a status code 202, which tells that the request has been accepted, but it might not have yet been processed.
import { Hono } from "@hono/hono";
import postgres from "postgres";
const app = new Hono();
const sql = postgres();
app.post("/users", async (c) => {
const { name } = await c.req.json();
const user = await sql`INSERT INTO users (name) VALUES (${name})`;
c.status(202);
return c.body("Accepted");
});
export default app;
When we send a user to the server, the server responds with the Accepted message.
curl -X POST -d '{"name": "Queueing"}' localhost:8000/users
Accepted%
Furthermore, the user has been added to the database.
SELECT * FROM users WHERE name = 'Queueing';
id | name
--------+----------
100001 | Queueing
Baseline performance test
To test the performance of the baseline implementation, we can use k6. As there is just one endpoint to test, the test becomes quite straightforward — we create a user with a random name, post it to the endpoint, and test that the response has the status code 202.
We do not verify that the user has been added to the database.
The k6 test for the task looks as follows. Note that we use traefik
as the address, as the traffic goes through the load balancer.
import http from "k6/http";
import { check } from "k6";
export const options = {
vus: 5,
duration: "10s",
};
export default async () => {
const user = {
name: `User ${Math.random()}`,
};
const url = "http://traefik:8000/users";
const res = http.post(url, JSON.stringify(user), {
headers: {
"Content-Type": "application/json",
},
});
check(res, {
"status is 202": (r) => r.status === 202,
});
};
To concretely run the test, we use the command docker compose run --rm --entrypoint=k6 k6-tests run /tests/hello-k6.js
, assuming that the test script is called hello-k6.js
and it is located in the tests
directory of the walking skeleton. The command is run in the root folder of the walking skeleton.
The test results are as follows.
docker compose run --rm --entrypoint=k6 k6-tests run /tests/hello-k6.js
// ...
✓ status is 202
checks.........................: 100.00% 12690 out of 12690
data_received..................: 1.9 MB 193 kB/s
data_sent......................: 2.2 MB 216 kB/s
http_req_blocked...............: avg=6.12µs min=685ns med=5.49µs max=801.66µs p(90)=7.41µs p(95)=8.43µs
http_req_connecting............: avg=120ns min=0s med=0s max=371.41µs p(90)=0s p(95)=0s
http_req_duration..............: avg=3.74ms min=2.33ms med=3.39ms max=12.7ms p(90)=5.04ms p(95)=5.24ms
{ expected_response:true }...: avg=3.74ms min=2.33ms med=3.39ms max=12.7ms p(90)=5.04ms p(95)=5.24ms
http_req_failed................: 0.00% 0 out of 12690
http_req_receiving.............: avg=59.69µs min=5µs med=57.22µs max=891.33µs p(90)=81.95µs p(95)=92.61µs
http_req_sending...............: avg=24.23µs min=2.23µs med=22.56µs max=574.4µs p(90)=33.25µs p(95)=39.12µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=3.65ms min=2.25ms med=3.31ms max=12.52ms p(90)=4.96ms p(95)=5.14ms
http_reqs......................: 12690 1268.708847/s
iteration_duration.............: avg=3.92ms min=2.62ms med=3.56ms max=13.08ms p(90)=5.23ms p(95)=5.45ms
iterations.....................: 12690 1268.708847/s
vus............................: 5 min=5 max=5
vus_max........................: 5 min=5 max=5
That is, when waiting to insert the user to the database on each request, the server handles around 1270 requests per second. None of the requests fail. On a separate check, the data has been added to the database.
Within-service message queue
Let’s next add a message queue to the server service. The message queue is used to process the messages asynchronously. The server responds with the status code 202, but the user is not added to the database immediately. Instead, the user is added to the database asynchronously.
For the task, we use Redis Lists, which can be used as a message queue. When using ioredis as the client, the lpush
command is used to add an item to the list, and the brpop
command is used to remove and return the first item from the list. The brpop
is a blocking command, which means that it waits until an item is added to the list.
Setting up Redis is discussed in the previous part in the chapter on Caching.
The below implementation has two redis connections, one for producing messages to the list, and one for consuming messages from the list. The consumer listens to the list and processes the messages by adding the user to the database. The producer — the server listening to the POST requests — adds users to the list.
import { Hono } from "@hono/hono";
import { Redis } from "ioredis";
import postgres from "postgres";
const app = new Hono();
const sql = postgres();
const redisConsumer = new Redis(6379, "redis");
const redisProducer = new Redis(6379, "redis");
const QUEUE_NAME = "users";
const consume = async () => {
while (true) {
const result = await redisConsumer.brpop(QUEUE_NAME, 0);
if (result) {
const [queue, user] = result;
const { name } = JSON.parse(user);
await sql`INSERT INTO users (name) VALUES (${name})`;
}
}
};
consume();
app.post("/users", async (c) => {
const { name } = await c.req.json();
await redisProducer.lpush(QUEUE_NAME, JSON.stringify({ name }));
c.status(202);
return c.body("Accepted");
});
export default app;
Now, when we send a user to the server, the server again responds with the status code 202.
curl -X POST -d '{"name": "Redis Queueing, Part 1"}' localhost:8000/users
Accepted%
And, when we visit the database, the user has been added.
SELECT * FROM users WHERE name = 'Redis Queueing, Part 1';
id | name
--------+------------------------
112692 | Redis Queueing, Part 1
Within-service message queue performance test
Let’s next compare the performance of the application with the message queue. We can use the same k6 test as before — now, the test results look as follows.
docker compose run --rm --entrypoint=k6 k6-tests run /tests/hello-k6.js
// ...
✓ status is 202
checks.........................: 100.00% 48763 out of 48763
data_received..................: 7.4 MB 741 kB/s
data_sent......................: 8.3 MB 830 kB/s
http_req_blocked...............: avg=3.92µs min=529ns med=2.9µs max=1.36ms p(90)=6.66µs p(95)=8.32µs
http_req_connecting............: avg=47ns min=0s med=0s max=547.22µs p(90)=0s p(95)=0s
http_req_duration..............: avg=902.85µs min=170.91µs med=727.02µs max=9.9ms p(90)=1.71ms p(95)=2.16ms
{ expected_response:true }...: avg=902.85µs min=170.91µs med=727.02µs max=9.9ms p(90)=1.71ms p(95)=2.16ms
http_req_failed................: 0.00% 0 out of 48763
http_req_receiving.............: avg=36.62µs min=4.48µs med=27.49µs max=2.06ms p(90)=69.01µs p(95)=90.54µs
http_req_sending...............: avg=14.49µs min=1.84µs med=10.76µs max=1.94ms p(90)=26.58µs p(95)=34.06µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=851.73µs min=151.03µs med=687.62µs max=9.78ms p(90)=1.62ms p(95)=2.06ms
http_reqs......................: 48763 4876.047428/s
iteration_duration.............: avg=1.01ms min=213.39µs med=814.72µs max=10.07ms p(90)=1.89ms p(95)=2.38ms
iterations.....................: 48763 4876.047428/s
vus............................: 5 min=5 max=5
vus_max........................: 5 min=5 max=5
That is, when using the message queue, the server handles around 4876 requests per second and none of the requests fail. When checking the database, however, the users are not in the database immediately, as is expected. The users are added to the redis queue, and then processed by the consumer that adds the users to the database.
database=# select count(*) from users;
count
--------
125125
(1 row)
database=# select count(*) from users;
count
--------
130112
(1 row)
database=# select count(*) from users;
count
--------
135099
(1 row)
Separate consumer service
Let’s next create a separate service that consumes the messages from the message queue. For this, create a new folder called server-consumer
to the walking skeleton, and add the files Dockerfile
, deno.json
, and app.js
to the folder. The Dockerfile is as follows.
FROM denoland/deno:alpine-2.0.2
WORKDIR /app
COPY deno.json .
RUN DENO_FUTURE=1 deno install
COPY . .
CMD [ "run", "--allow-env", "--allow-net", "--watch", "app.js" ]
The deno.json
file includes the ioredis
driver and the postgres
driver.
{
"imports": {
"ioredis": "npm:ioredis@5.4.2",
"postgres": "https://deno.land/x/postgresjs@v3.4.4/mod.js"
}
}
And the app.js
file includes the consumer that listens to the message queue and adds the users to the database.
import { Redis } from "ioredis";
import postgres from "postgres";
const sql = postgres();
const redisConsumer = new Redis(6379, "redis");
const QUEUE_NAME = "users";
const consume = async () => {
while (true) {
const result = await redisConsumer.brpop(QUEUE_NAME, 0);
if (result) {
const [queue, user] = result;
const parsedUser = JSON.parse(user);
await sql`INSERT INTO users (name) VALUES (${parsedUser.name})`;
}
}
};
consume();
As you notice, the service does not include a web server, as it is just a process that consumes the messages from the message queue.
Then, add the service to the compose.yaml
file.
consumer-server:
build: consumer-server
restart: unless-stopped
volumes:
- ./consumer-server:/app
env_file:
- project.env
depends_on:
- redis
- database
And, finally, modify the app.js
of the server
service, removing the consumption of messages.
import { Hono } from "@hono/hono";
import { Redis } from "ioredis";
const app = new Hono();
const redisProducer = new Redis(6379, "redis");
const QUEUE_NAME = "users";
app.post("/users", async (c) => {
const { name } = await c.req.json();
await redisProducer.lpush(QUEUE_NAME, JSON.stringify({ name }));
c.status(202);
return c.body("Accepted");
});
export default app;
Now, when you restart the services, the server still responds with the status code 202.
curl -X POST -d '{"name": "Redis Queueing, Part 2"}' localhost:8000/users
Accepted%
And, when you visit the database, the user has — at least after a while — been added to the database.
id | name
--------+------------------------
161456 | Redis Queueing, Part 2
Separate service message queue performance test
Let’s next test the performance of the application with the separate consumer service. We can again use the same k6 test as before — now, the test results look as follows.
✓ status is 202
checks.........................: 100.00% 54236 out of 54236
data_received..................: 8.2 MB 824 kB/s
data_sent......................: 9.2 MB 923 kB/s
http_req_blocked...............: avg=3.72µs min=550ns med=2.87µs max=1.17ms p(90)=6.14µs p(95)=7.4µs
http_req_connecting............: avg=36ns min=0s med=0s max=514.2µs p(90)=0s p(95)=0s
http_req_duration..............: avg=804.63µs min=170.92µs med=692.73µs max=10.34ms p(90)=1.37ms p(95)=1.71ms
{ expected_response:true }...: avg=804.63µs min=170.92µs med=692.73µs max=10.34ms p(90)=1.37ms p(95)=1.71ms
http_req_failed................: 0.00% 0 out of 54236
http_req_receiving.............: avg=34.59µs min=4.58µs med=26.89µs max=1.08ms p(90)=62.43µs p(95)=78.33µs
http_req_sending...............: avg=13.81µs min=1.99µs med=10.51µs max=808.88µs p(90)=24.57µs p(95)=30.4µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=756.21µs min=148.31µs med=654.74µs max=10.21ms p(90)=1.29ms p(95)=1.61ms
http_reqs......................: 54236 5423.123737/s
iteration_duration.............: avg=911.23µs min=229.66µs med=777.43µs max=11.85ms p(90)=1.54ms p(95)=1.91ms
iterations.....................: 54236 5423.123737/s
vus............................: 5 min=5 max=5
vus_max........................: 5 min=5 max=5
Now, when using the separate consumer service, the server handles around 5423 requests per second and none of the requests fail. When checking the database, the users are again slowly being added to the database, as expected.
database=# select count(*) from users;
count
--------
174228
(1 row)
database=# select count(*) from users;
count
--------
178352
(1 row)
Summary
Overall, the server handled approximately 1270 requests per second when handling the requests synchronously, 4876 requests per second when handling the requests asynchronously but within the same server, and 5423 requests per second when using a separate consumer service. All the tests were conducted on a single replica of the server service on a laptop, where the laptop was running the same services on all the tests.
For reliability, the tests should be repeated multiple times, and the results should be averaged, possibly dropping the best and worst result. Regardless, there is strong evidence that adding asynchronous processing to the server service improves the throughput. However, this comes at a cost — when storing the data asynchronously, the users are not immediately added to the database, which might be a requirement in some applications.
Further improvements
There is plenty of room to improve. For example:
- The database inserts could be done in batches to improve the performance. Instead of inserting one user at a time, multiple users could be inserted at once.
- The producer and consumer could be scaled to multiple replicas.
- The database schema could be changed so that there is no serial primary key, but an UUID instead. This would allow the producer to generate the UUID for the user, and the consumer to use the same UUID when adding the user to the database. This way, the response could already return the UUID, giving the client a way to track the user.