Using RabbitMQ with Nodejs and Typescript
Introduction
Node.js and RabbitMQ are two powerful technologies that can be used together to build scalable and reliable applications. as Node.js is known for it’s non-blocking I/O and RabbitMQ is a messaging broker that allows applications to communicate with each other asynchronously. By combining the strengths of these two technologies, you can build applications that are responsive, fault-tolerant, and can easily handle large volumes of traffic.
Combining them together can result in a powerful and efficient stack for building scalable and reliable applications. Here are some examples of how these technologies can be integrated:
- RabbitMQ can be used with Node.js to enable asynchronous communication between applications. This can be useful for building distributed systems, where different services need to communicate with each other in a decoupled manner. you can use RabbitMQ to implement a message queue that allows services to publish and consume messages. For example, you can have one service that publishes messages to a RabbitMQ exchange, and another service that consumes messages from the exchange and processes them. Node.js has several libraries that support RabbitMQ integration, such as amqplib and rabbit.js.
However, it’s important to keep in mind that each technology has its own learning curve and best practices, so it’s important to carefully evaluate each tool and how it fits into your development workflow.
Setting up the app
We will create simple app that sends a notification from the main app “Producer” which sends or produce the notification message to another notification service “Consumer” which process the message and notify the users with it’s notification integration flow
First app “Producer”
Creating producer app that creates the notification details.
mkdir producer && cd producer
npm init --y
npm i amqplib dotenv
npm i -D nodemon @types/node tsc-watch typescript @types/amqplib
Adding tsconfig.json and package.json files
tsconfig.json
{
"compilerOptions": {
"target": "es2016" /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */,
"lib": [
"dom",
"es6",
"es2017",
"esnext.asynciterable"
] /* Specify what JSX code is generated. */,
"experimentalDecorators": true /* Enable experimental support for TC39 stage 2 draft decorators. */,
"module": "commonjs" /* Specify what module code is generated. */,
"rootDir": "./src" /* Specify the root folder within your source files. */,
"moduleResolution": "node" /* Specify how TypeScript looks up a file from a given module specifier. */,
"baseUrl": "." /* Specify the base directory to resolve non-relative module names. */,
"paths": {
"@/*": ["src/*"],
"@models/*": ["src/models/*"]
} /* Specify a set of entries that re-map imports to additional lookup locations. */,
"resolveJsonModule": true /* Enable importing .json files */,
"allowJs": true /* Allow JavaScript files to be a part of your program. Use the `checkJS` option to get errors from these files. */,
"outDir": "./dist" /* Specify an output folder for all emitted files. */,
"esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables `allowSyntheticDefaultImports` for type compatibility. */,
"forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */,
/* Type Checking */
"strict": true /* Enable all strict type-checking options. */,
"skipLibCheck": true
},
"include": ["src/**/*", "*"]
}
Adding some script to package.json to watch the typescript
{
"scripts": {
"watch": "tsc-watch --noClear -p ./tsconfig.json --onSuccess \"node ./dist/index.js\""
}
}
Creating the RabbitMQ connection:
To use RabbitMQ in your Node.js application, you’ll need to establish a connection to the RabbitMQ server. Here’s an example of how to connect to a RabbitMQ server using the amqplib library using singleton class that handles the connection itself and a method for sending new message.
Here’s connection.ts file
import client, { Connection, Channel, ConsumeMessage } from "amqplib";
import { rmqUser, rmqPass, rmqhost, NOTIFICATION_QUEUE } from "./config";
class RabbitMQConnection {
connection!: Connection;
channel!: Channel;
private connected!: Boolean;
async connect() {
if (this.connected && this.channel) return;
else this.connected = true;
try {
console.log(`⌛️ Connecting to Rabbit-MQ Server`);
this.connection = await client.connect(
`amqp://${rmqUser}:${rmqPass}@${rmqhost}:5672`
);
console.log(`✅ Rabbit MQ Connection is ready`);
this.channel = await this.connection.createChannel();
console.log(`🛸 Created RabbitMQ Channel successfully`);
} catch (error) {
console.error(error);
console.error(`Not connected to MQ Server`);
}
}
async sendToQueue(queue: string, message: any) {
try {
if (!this.channel) {
await this.connect();
}
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
} catch (error) {
console.error(error);
throw error;
}
}
}
const mqConnection = new RabbitMQConnection();
export default mqConnection;
Also we need a new function that is responsible for sending the notification
Here’s notification.ts
import { NOTIFICATION_QUEUE } from "./config";
import mqConnection from "./connection";
export type INotification = {
title: string;
description: string;
};
export const sendNotification = async (notification: INotification) => {
await mqConnection.sendToQueue(NOTIFICATION_QUEUE, notification);
console.log(`Sent the notification to consumer`);
};
index.ts
import mqConnection from "./connection";
import { sendNotification } from "./notification";
const send = async () => {
await mqConnection.connect();
const newNotification = {
title: "You have received new notification",
description:
"You have received new incmoing notification from the producer service",
};
sendNotification(newNotification);
};
send();
Second app “Consumer”
Creating Consumer app receives the notification details from the producer app using rabbitmq and sends it to the users
mkdir consumer && cd consumer
npm init --y
npm i amqplib dotenv
npm i -D nodemon @types/node tsc-watch typescript @types/amqplib
For the tsconfig.json and package.json files , We can use the same from the producer app
- RabbitMQ Connection
import client, { Connection, Channel } from "amqplib";
import { rmqUser, rmqPass, rmqhost, NOTIFICATION_QUEUE } from "./config";
type HandlerCB = (msg: string) => any;
class RabbitMQConnection {
connection!: Connection;
channel!: Channel;
private connected!: Boolean;
async connect() {
if (this.connected && this.channel) return;
try {
console.log(`⌛️ Connecting to Rabbit-MQ Server`);
this.connection = await client.connect(
`amqp://${rmqUser}:${rmqPass}@${rmqhost}:5672`
);
console.log(`✅ Rabbit MQ Connection is ready`);
this.channel = await this.connection.createChannel();
console.log(`🛸 Created RabbitMQ Channel successfully`);
this.connected = true;
} catch (error) {
console.error(error);
console.error(`Not connected to MQ Server`);
}
}
async consume(handleIncomingNotification: HandlerCB) {
await this.channel.assertQueue(NOTIFICATION_QUEUE, {
durable: true,
});
this.channel.consume(
NOTIFICATION_QUEUE,
(msg) => {
{
if (!msg) {
return console.error(`Invalid incoming message`);
}
handleIncomingNotification(msg?.content?.toString());
this.channel.ack(msg);
}
},
{
noAck: false,
}
);
}
}
const mqConnection = new RabbitMQConnection();
export default mqConnection;
index.ts
import mqConnection from "./connection";
const handleIncomingNotification = (msg: string) => {
try {
const parsedMessage = JSON.parse(msg);
console.log(`Received Notification`, parsedMessage);
// Implement your own notification flow
} catch (error) {
console.error(`Error While Parsing the message`);
}
};
const listen = async () => {
await mqConnection.connect();
await mqConnection.consume(handleIncomingNotification);
};
listen();
We can run the consumer app to start to listen to new messages with this script cd consumer && npm run watch
if everything is ok, you shouldn’t see any console errors.
Now we can start to send the notification we have hard coded it in the example by running cd producer && npm run watch
but you can implement any new notification handler you want after you understand the concept itself.
Here is the complete code of the previous example: https://github.com/HassanFouaad/rabbitmq-typescript-example