Publish-Subscribe, Pola Desain Komunikasi Back-end Penting Beserta Implementasinya


Publish/Subscribe adalah salah satu pola desain populer yang mungkin anda lihat di backend. Pola ini sering digunakan untuk komunikasi asinkron di antara beberapa komponen (biasanya terpisah) dan sangat cocok ketika anda memiliki sejumlah komponen yang perlu bekerja sama tetapi ingin tetap loosely-coupled.

Bagaimana Publish/Subscribe bekerja?

Berikut adalah ilustrasi diagram bagaimana publish/subscriber bekerja:

Pola Publish/Subscribe melibatkan penggunaan message queue (sering disebut message broker) yang berfungsi sebagai perantara antara penerbit (Publisher) dan pelanggan (Subscriber). Message broker ini mengelompokkan pesan ke dalam sesuatu yang disebut channel (atau topic).

Publisher adalah komponen yang membuat dan mengirim pesan, sedangkan Subscriber adalah komponen yang menerima dan menggunakan pesan.

Publisher hanya mengirimkan message (atau event) ke channel atau topic tertentu dalam message broker. Channel-channel ini bertindak sebagai titik distribusi pesan. Subscriber kemudian menunjukkan minatnya dengan berlangganan channel-channel tersebut dalam message broker dan setiap kali message atau event dipublikasikan ke channel itu, mereka menerima salinannya.

Software message queue seperti Apache Kafka, dan MQTT menggunakan pola komunikasi publish/subscribe.

Kelebihan dari Pola Desain Publish/Subscribe

  • Komunikasi Asinkron
    Berbeda dengan pola request-response, publish/subscribe pada dasarnya dirancang untuk bersifat asinkron sehinga ideal untuk membuat aplikasi real-time dengan hambatan latensi yang lebih rendah.
  • Komponen yang Loose Coupling
    Komponen dalam model publikasi/berlangganan digabungkan secara longgar. Artinya, mereka tidak terikat satu sama lain dan dapat berinteraksi secara bebas dengan memicu dan merespons peristiwa.
  • Sangat Skalabel
    Tidak ada batasan jumlah subscriber untuk publisher dapat memublikasikan acara. Selain itu, tidak ada batasan jumlah berlangganan publisher oleh subscriber.
  • Independen dalam Bahasa Pemrograman dan Protokol
    Model publish/subscribe dapat dengan mudah diintegrasikan ke dalam stack teknologi apa pun karena tidak bergantung pada bahasa pemrograman. Selain itu, sering kali pola ini mendukung berbagai environment dan platform sehingga kompatibel lintas platform.
  • Penyeimbangan Beban (Load Balancing)
    Dalam kasus di mana banyak subscriber berlangganan topic tertentu, model publish/subscribe dapat mendistribusikan topic secara merata di antara subscriber, sehingga memberikan kemampuan penyeimbangan beban yang siap digunakan.

Keterbatasan Pola Desain Publish/Subscribe

  • Kompleksitas dalam Implementasi
    Menyiapkan sistem publish/subscribe bisa lebih kompleks daripada model komunikasi sederhana seperti pola request-response. Anda perlu mengonfigurasi dan mengelola message broker, channel, dan subscription, yang dapat menambah overhead pada sistem Anda.
  • Duplikasi Pesan
    Tergantung pada konfigurasi dan masalah jaringan, message dapat diduplikasi. Subscriber mungkin menerima message yang sama lebih dari sekali, yang dapat menyebabkan redundansi dan pemrosesan ekstra.
  • Tantangan Skalabilitas
    Meskipun publish/subscribe sangat skalabel, mengelola message dan subscriber dalam jumlah yang sangat besar bisa menjadi rumit. Anda mungkin perlu mempertimbangkan cara mendistribusikan message secara efisien dan menangani subscriber dalam jumlah besar.
  • Penanganan Kesalahan yang Kompleks
    Menangani kesalahan dalam sistem publish/subscribe dapat menjadi sebuah tantangan. Menangani situasi seperti kegagalan pengiriman message atau kesalahan subscriber memerlukan pertimbangan dan desain yang cermat.

Kapan Anda Harus Memakai Pola Publish/Subscribe?

  • Pembuatan fitur yang memerlukan respons real-time dan latensi rendah bagi pengguna, misalnya live chat atau aplikasi game multiplayer .
  • Dalam sistem notifikasi event
  • Dalam membangun sistem terdistribusi yang mengandalkan logging dan caching

Contoh Implementasi Pola Desain Publish/Subscribe Dengan Node.js dan RabbitMQ pada Windows

Persyaratan:
Diasumsikan anda telah memahami dasar dari Node.js dan Javascript.

Instal RabbitMQ

Pastikan RabbitMQ telah terinstall pada mesin lokal anda. Karena saya menggunakan sistem operasi Windows, maka perlu menginstal program RabbitMQ berdasarkan link ini.

Setelah anda berhasil menginstal program RabbitMQ, anda dapat mengecek status port RabbitMQ yang akan digunakan dengan perintah berikut:

rabbitmq-diagnostics  status

Perintah tersebut akan menghasilkan atribut-atribut status dari node RabbitMQ yang berjalan di mesin anda. Karena kita memerlukan port yang akan digunakan perhatikan atribut listener. Untuk percobaan ini, port RabbitMQ yang digunakan pada mesin saya adalah 5672.

Persiapkan proyek

Buatlah proyek Node.js baru dengan perintah:

npm init -y

Setelah itu, kita memerlukan beberapa dependensi yang digunakan untuk proyek.

Instal Dependensi Node.js yang Diperlukan

Kita akan menggunakan framework express untuk membuat server antara publisher, subscriber, dan message broker RabbitMQ. Library amqblib diperlukan untuk protokol message queue dalam RabbitMQ. Dependensi-dependensi lain digunakan sebagai pendukung dari server.

npm i express dotenv cors amqplib
npm i -D nodemon

Persiapkan file environment

Sebelum melanjutkan pengerjaan proyek, kita perlu menyiapkan file berisi variabel environment yang tergantung dengan mesin lokal. Disini, saya menggunakan port yang tersedia yaitu 4001 dan URL AMQP yang menggunakan setelan RabbitMQ secara default:

PORT=4001
CLOUDAMQP_URL='amqp://guest:guest@localhost:5672'

Pembuatan server Node.js

Buatlah file index.js yang merupakan file utama server beserta dengan file routes untuk mendefinisikan rute-rute URL:

import express from "express";
import queueRoutes from "./routes/messageQueue.js";
import dotenv from "dotenv";
import cors from "cors";
const app = express();
const PORT = process.env.PORT || 4001;

dotenv.config();
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.use(queueRoutes);

app.use(
  cors({
    origin: "*",
  })
);

app.listen(PORT, () => console.log("Server running at port " + PORT));

Setelah membuat file server, kita perlu membuat file routes yang digunakan untuk mendefinisikan rute-rute untuk publishe dan subscribe pesan yang dikirimkan. Dalam hal ini saya menyimpannya di file /routes/messageQueue.js.

import express from "express";
import subscribeMessage from "../controllers/subscriberController.js";
import publishMessage from "../controllers/publisherController.js";
import getBasicMessage from "../controllers/getAllMessagesController.js";

const router = express.Router();

router.get("/", getBasicMessage);
router.post("/publish", publishMessage);
router.get("/subscribe", subscribeMessage);

export default router;

Membuat Kode Publisher

Setelah kita membuat file server dan routes, dilanjutkan dengan membuat file berisi kode untuk publisher mengirimkan pesan kepada server dan message broker. Dalam hal ini, saya menyimpan kodenya dalam file /controllers/publisherController.js

import { connect } from "amqplib";

const publishMessage = async (req, res) => {
  try {
    const companyName = req.body.companyName;
    const queueName = req.body.queueName;
    const queueMessage = req.body.queueMessage;

    const connection = await connect(process.env.CLOUDAMQP_URL);
    const channel = await connection.createChannel();

    const uniqueQueueName = companyName + "-" + queueName;

    // console.log(msg)
    // connect to 'test-queue', create one if does not exist already
    await channel.assertQueue(uniqueQueueName, { durable: false });

    // send data to queue
    const routingKey = uniqueQueueName;
    await channel.sendToQueue(
      routingKey,
      Buffer.from(JSON.stringify(queueMessage))
    );

    // close the channel and connections
    await channel.close();
    await connection.close();

    res.send("Message Sent");
  } catch (error) {
    console.log(error);
  }
};

export default publishMessage;

Fungsi publishMessage ini menerima request HTTP dan mengirim pesan ke antrian RabbitMQ. Pertama, fungsi ini mengambil companyName, queueName, dan queueMessage dari body request. Kemudian, fungsi ini membuat koneksi ke RabbitMQ menggunakan URL yang disimpan dalam variabel environment CLOUDAMQP_URL. Setelah koneksi berhasil dibuat, sebuah channel dibuat dari koneksi tersebut. Nama queue unik dibuat dengan menggabungkan companyName dan queueName. Fungsi ini kemudian memastikan bahwa queue dengan nama unik tersebut ada, dan jika tidak, queue tersebut akan dibuat. Pesan kemudian dikirim ke queue tersebut dengan menggunakan sendToQueue. Pesan yang dikirimkan diubah menjadi Buffer dari string JSON queueMessage. Setelah pesan berhasil dikirim, channel dan koneksi ditutup. Jika ada kesalahan selama proses ini, kesalahan tersebut akan dicetak ke konsol.

Membuat Kode Subscriber

File untuk kode publisher sudah dibuat, tentu saja kita perlu membuat file untuk kode subscriber agar dapat menerima pesan yang dikirimkan oleh publisher. Dalam bagian ini, saya menyimpan kode subscriber pada file /controllers/subscriberController.js.

import { connect } from "amqplib";

// const queue = 'hello';

const subscribeMessage = async (req, res) => {
  try {
    const notificationQueue = req.body.notificationQueueID || "test";
    const connection = await connect(process.env.CLOUDAMQP_URL);
    const channel = await connection.createChannel();

    process.once("SIGINT", async () => {
      await channel.close();
      await connection.close();
    });

    var notificationMessage;
    var retrievedMessage;
    // console.log("queue",queue)
    await channel.consume(notificationQueue, (message) => {
      if (message) {
        console.log("message", message.content.toString());
        retrievedMessage = JSON.parse(message.content.toString());
        channel.ack(message);
        res.send(retrievedMessage);
      }
    });
    if (retrievedMessage === undefined) res.send("no new notification");

    // console.log(" [*] Waiting for messages. To exit press CTRL+C");
  } catch (err) {
    console.warn(err);
  }
};

export default subscribeMessage;

Fungsi subscribeMessage menerima request HTTP dan mendengarkan pesan dari queue RabbitMQ. Pertama, fungsi ini mengambil notificationQueueID dari body request atau menggunakan “test” sebagai default. Kemudian, fungsi ini membuat koneksi ke RabbitMQ menggunakan URL yang disimpan dalam variabel environment CLOUDAMQP_URL. Setelah koneksi berhasil dibuat, sebuah channel dibuat dari koneksi tersebut. Fungsi ini kemudian mulai mendengarkan pesan dari antrian dengan menggunakan channel.consume Jika ada pesan, pesan tersebut akan dicetak ke konsol dan dikirim sebagai response HTTP. Pesan tersebut juga di-acknowledge dengan channel.ack(message). Jika tidak ada pesan, respon “no new notification” akan dikirim. Selain itu, ada penanganan untuk event SIGINT yang akan menutup channel dan koneksi ketika aplikasi dihentikan. Jika ada kesalahan selama proses ini, kesalahan tersebut akan dicetak ke konsol.

Menjalankan Server Publish/Subscribe

Setelah kita menyelesaikan berbagai persiapan dalam proyek, seperti menginstal program RabbitMQ dan menulis kode, kita dapat menjalankan server untuk mengetes apakah server berhasil menerima pesan dari publisher dan mengirimkannya kepada subscriber. Sebelum itu, kita dapat membuat skrip kustom dengan merubah kode pada file package.json.

"scripts": {
    "dev": "nodemon run index.js"
  }

Setelah membuat skrip kita dapat menjalankan server dengan perintah npm run dev

Server berhasil berjalan!

Setelah server berhasil berjalan kita dapat mencoba untuk mengirim pesan sebagai publisher menuju server dan message broker dengan menggunakan Postman.

Publisher berhasil mengirimkan data menuju queue !

Setelah publisher berhasil mengirimkan pesan ke queue RabbitMQ, subscriber dapat mengambil pesan dari server dengan mengakses URL http://localhost:4001/subscribe

Subscriber berhasil mendapatkan pesan dari queue !

Mungkin kita dapat memonitor pesan yang ada di queue beserta metrik-metriknya dengan mengakses RabbitMQ Management pada browser.