Publish-Subscribe, Pola Desain Komunikasi Back-end Penting Beserta Implementasinya
Publish/Subscribe adalah salah satu pola desain populer yang mungkin anda lihat di backend. Pola ini sering digunakan untuk komunikasi asinkron di antara beberapa komponen (biasanya terpisah) dan sangat cocok ketika anda memiliki sejumlah komponen yang perlu bekerja sama tetapi ingin tetap loosely-coupled.
Bagaimana Publish/Subscribe bekerja?
Berikut adalah ilustrasi diagram bagaimana publish/subscriber bekerja:
Pola Publish/Subscribe melibatkan penggunaan message queue (sering disebut message broker) yang berfungsi sebagai perantara antara penerbit (Publisher) dan pelanggan (Subscriber). Message broker ini mengelompokkan pesan ke dalam sesuatu yang disebut channel (atau topic).
Publisher adalah komponen yang membuat dan mengirim pesan, sedangkan Subscriber adalah komponen yang menerima dan menggunakan pesan.
Publisher hanya mengirimkan message (atau event) ke channel atau topic tertentu dalam message broker. Channel-channel ini bertindak sebagai titik distribusi pesan. Subscriber kemudian menunjukkan minatnya dengan berlangganan channel-channel tersebut dalam message broker dan setiap kali message atau event dipublikasikan ke channel itu, mereka menerima salinannya.
Software message queue seperti Apache Kafka, dan MQTT menggunakan pola komunikasi publish/subscribe.
Kelebihan dari Pola Desain Publish/Subscribe
- Komunikasi Asinkron
Berbeda dengan pola request-response, publish/subscribe pada dasarnya dirancang untuk bersifat asinkron sehinga ideal untuk membuat aplikasi real-time dengan hambatan latensi yang lebih rendah. - Komponen yang Loose Coupling
Komponen dalam model publikasi/berlangganan digabungkan secara longgar. Artinya, mereka tidak terikat satu sama lain dan dapat berinteraksi secara bebas dengan memicu dan merespons peristiwa. - Sangat Skalabel
Tidak ada batasan jumlah subscriber untuk publisher dapat memublikasikan acara. Selain itu, tidak ada batasan jumlah berlangganan publisher oleh subscriber. - Independen dalam Bahasa Pemrograman dan Protokol
Model publish/subscribe dapat dengan mudah diintegrasikan ke dalam stack teknologi apa pun karena tidak bergantung pada bahasa pemrograman. Selain itu, sering kali pola ini mendukung berbagai environment dan platform sehingga kompatibel lintas platform. - Penyeimbangan Beban (Load Balancing)
Dalam kasus di mana banyak subscriber berlangganan topic tertentu, model publish/subscribe dapat mendistribusikan topic secara merata di antara subscriber, sehingga memberikan kemampuan penyeimbangan beban yang siap digunakan.
Keterbatasan Pola Desain Publish/Subscribe
- Kompleksitas dalam Implementasi
Menyiapkan sistem publish/subscribe bisa lebih kompleks daripada model komunikasi sederhana seperti pola request-response. Anda perlu mengonfigurasi dan mengelola message broker, channel, dan subscription, yang dapat menambah overhead pada sistem Anda. - Duplikasi Pesan
Tergantung pada konfigurasi dan masalah jaringan, message dapat diduplikasi. Subscriber mungkin menerima message yang sama lebih dari sekali, yang dapat menyebabkan redundansi dan pemrosesan ekstra. - Tantangan Skalabilitas
Meskipun publish/subscribe sangat skalabel, mengelola message dan subscriber dalam jumlah yang sangat besar bisa menjadi rumit. Anda mungkin perlu mempertimbangkan cara mendistribusikan message secara efisien dan menangani subscriber dalam jumlah besar. - Penanganan Kesalahan yang Kompleks
Menangani kesalahan dalam sistem publish/subscribe dapat menjadi sebuah tantangan. Menangani situasi seperti kegagalan pengiriman message atau kesalahan subscriber memerlukan pertimbangan dan desain yang cermat.
Kapan Anda Harus Memakai Pola Publish/Subscribe?
- Pembuatan fitur yang memerlukan respons real-time dan latensi rendah bagi pengguna, misalnya live chat atau aplikasi game multiplayer .
- Dalam sistem notifikasi event
- Dalam membangun sistem terdistribusi yang mengandalkan logging dan caching
Contoh Implementasi Pola Desain Publish/Subscribe Dengan Node.js dan RabbitMQ pada Windows
Persyaratan:
Diasumsikan anda telah memahami dasar dari Node.js dan Javascript.
Instal RabbitMQ
Pastikan RabbitMQ telah terinstall pada mesin lokal anda. Karena saya menggunakan sistem operasi Windows, maka perlu menginstal program RabbitMQ berdasarkan link ini.
Setelah anda berhasil menginstal program RabbitMQ, anda dapat mengecek status port RabbitMQ yang akan digunakan dengan perintah berikut:
rabbitmq-diagnostics status
Perintah tersebut akan menghasilkan atribut-atribut status dari node RabbitMQ yang berjalan di mesin anda. Karena kita memerlukan port yang akan digunakan perhatikan atribut listener. Untuk percobaan ini, port RabbitMQ yang digunakan pada mesin saya adalah 5672.
Persiapkan proyek
Buatlah proyek Node.js baru dengan perintah:
npm init -y
Setelah itu, kita memerlukan beberapa dependensi yang digunakan untuk proyek.
Instal Dependensi Node.js yang Diperlukan
Kita akan menggunakan framework express untuk membuat server antara publisher, subscriber, dan message broker RabbitMQ. Library amqblib diperlukan untuk protokol message queue dalam RabbitMQ. Dependensi-dependensi lain digunakan sebagai pendukung dari server.
npm i express dotenv cors amqplib
npm i -D nodemon
Persiapkan file environment
Sebelum melanjutkan pengerjaan proyek, kita perlu menyiapkan file berisi variabel environment yang tergantung dengan mesin lokal. Disini, saya menggunakan port yang tersedia yaitu 4001
dan URL AMQP
yang menggunakan setelan RabbitMQ secara default:
PORT=4001
CLOUDAMQP_URL='amqp://guest:guest@localhost:5672'
Pembuatan server Node.js
Buatlah file index.js
yang merupakan file utama server beserta dengan file routes untuk mendefinisikan rute-rute URL:
import express from "express";
import queueRoutes from "./routes/messageQueue.js";
import dotenv from "dotenv";
import cors from "cors";
const app = express();
const PORT = process.env.PORT || 4001;
dotenv.config();
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.use(queueRoutes);
app.use(
cors({
origin: "*",
})
);
app.listen(PORT, () => console.log("Server running at port " + PORT));
Setelah membuat file server, kita perlu membuat file routes
yang digunakan untuk mendefinisikan rute-rute untuk publishe dan subscribe pesan yang dikirimkan. Dalam hal ini saya menyimpannya di file /routes/messageQueue.js
.
import express from "express";
import subscribeMessage from "../controllers/subscriberController.js";
import publishMessage from "../controllers/publisherController.js";
import getBasicMessage from "../controllers/getAllMessagesController.js";
const router = express.Router();
router.get("/", getBasicMessage);
router.post("/publish", publishMessage);
router.get("/subscribe", subscribeMessage);
export default router;
Membuat Kode Publisher
Setelah kita membuat file server dan routes, dilanjutkan dengan membuat file berisi kode untuk publisher mengirimkan pesan kepada server dan message broker. Dalam hal ini, saya menyimpan kodenya dalam file /controllers/publisherController.js
import { connect } from "amqplib";
const publishMessage = async (req, res) => {
try {
const companyName = req.body.companyName;
const queueName = req.body.queueName;
const queueMessage = req.body.queueMessage;
const connection = await connect(process.env.CLOUDAMQP_URL);
const channel = await connection.createChannel();
const uniqueQueueName = companyName + "-" + queueName;
// console.log(msg)
// connect to 'test-queue', create one if does not exist already
await channel.assertQueue(uniqueQueueName, { durable: false });
// send data to queue
const routingKey = uniqueQueueName;
await channel.sendToQueue(
routingKey,
Buffer.from(JSON.stringify(queueMessage))
);
// close the channel and connections
await channel.close();
await connection.close();
res.send("Message Sent");
} catch (error) {
console.log(error);
}
};
export default publishMessage;
Fungsi publishMessage
ini menerima request HTTP dan mengirim pesan ke antrian RabbitMQ. Pertama, fungsi ini mengambil companyName
, queueName
, dan queueMessage
dari body request. Kemudian, fungsi ini membuat koneksi ke RabbitMQ menggunakan URL yang disimpan dalam variabel environment CLOUDAMQP_URL
. Setelah koneksi berhasil dibuat, sebuah channel dibuat dari koneksi tersebut. Nama queue unik dibuat dengan menggabungkan companyName
dan queueName
. Fungsi ini kemudian memastikan bahwa queue dengan nama unik tersebut ada, dan jika tidak, queue tersebut akan dibuat. Pesan kemudian dikirim ke queue tersebut dengan menggunakan sendToQueue
. Pesan yang dikirimkan diubah menjadi Buffer dari string JSON queueMessage
. Setelah pesan berhasil dikirim, channel dan koneksi ditutup. Jika ada kesalahan selama proses ini, kesalahan tersebut akan dicetak ke konsol.
Membuat Kode Subscriber
File untuk kode publisher
sudah dibuat, tentu saja kita perlu membuat file untuk kode subscriber
agar dapat menerima pesan yang dikirimkan oleh publisher
. Dalam bagian ini, saya menyimpan kode subscriber
pada file /controllers/subscriberController.js
.
import { connect } from "amqplib";
// const queue = 'hello';
const subscribeMessage = async (req, res) => {
try {
const notificationQueue = req.body.notificationQueueID || "test";
const connection = await connect(process.env.CLOUDAMQP_URL);
const channel = await connection.createChannel();
process.once("SIGINT", async () => {
await channel.close();
await connection.close();
});
var notificationMessage;
var retrievedMessage;
// console.log("queue",queue)
await channel.consume(notificationQueue, (message) => {
if (message) {
console.log("message", message.content.toString());
retrievedMessage = JSON.parse(message.content.toString());
channel.ack(message);
res.send(retrievedMessage);
}
});
if (retrievedMessage === undefined) res.send("no new notification");
// console.log(" [*] Waiting for messages. To exit press CTRL+C");
} catch (err) {
console.warn(err);
}
};
export default subscribeMessage;
Fungsi subscribeMessage
menerima request HTTP dan mendengarkan pesan dari queue RabbitMQ. Pertama, fungsi ini mengambil notificationQueueID
dari body request atau menggunakan “test” sebagai default. Kemudian, fungsi ini membuat koneksi ke RabbitMQ menggunakan URL yang disimpan dalam variabel environment CLOUDAMQP_URL
. Setelah koneksi berhasil dibuat, sebuah channel dibuat dari koneksi tersebut. Fungsi ini kemudian mulai mendengarkan pesan dari antrian dengan menggunakan channel.consume
Jika ada pesan, pesan tersebut akan dicetak ke konsol dan dikirim sebagai response HTTP. Pesan tersebut juga di-acknowledge dengan channel.ack(message)
. Jika tidak ada pesan, respon “no new notification” akan dikirim. Selain itu, ada penanganan untuk event SIGINT
yang akan menutup channel dan koneksi ketika aplikasi dihentikan. Jika ada kesalahan selama proses ini, kesalahan tersebut akan dicetak ke konsol.
Menjalankan Server Publish/Subscribe
Setelah kita menyelesaikan berbagai persiapan dalam proyek, seperti menginstal program RabbitMQ dan menulis kode, kita dapat menjalankan server untuk mengetes apakah server berhasil menerima pesan dari publisher
dan mengirimkannya kepada subscriber
. Sebelum itu, kita dapat membuat skrip kustom dengan merubah kode pada file package.json
.
"scripts": {
"dev": "nodemon run index.js"
}
Setelah membuat skrip kita dapat menjalankan server dengan perintah npm run dev
Server berhasil berjalan!
Setelah server berhasil berjalan kita dapat mencoba untuk mengirim pesan sebagai publisher
menuju server dan message broker dengan menggunakan Postman.
Publisher
berhasil mengirimkan data menuju queue !
Setelah publisher
berhasil mengirimkan pesan ke queue RabbitMQ, subscriber
dapat mengambil pesan dari server dengan mengakses URL http://localhost:4001/subscribe
Subscriber
berhasil mendapatkan pesan dari queue !
Mungkin kita dapat memonitor pesan yang ada di queue beserta metrik-metriknya dengan mengakses RabbitMQ Management
pada browser.