Theme Preview

Hue:

You are using an outdated browser that does not support OKLCH colors. The color setting will not take effect.

Den's RabbitMQ學習筆記

3k words

前言(preface):

本篇是記錄在學習RabbitMQ的過程與經驗,會介紹RabbitMQ的概念、實作方式、以及一些常見Bug解法。如果有錯請大哥指點QQ

概念(Concepts):

RabbitMQ本質是一個 Queue,但專門用於訊息發送接收的一種訊息代理軟體
,就像是Nginx 代理Http/Https請求的概念,他專門代理發送者傳出的訊息,也因此能做到傳送訊息雙方的「解耦」,因為發送方只需將訊息交給RabbitMQ而不需去知道哪個應用程式要使用到

RabbitMQ的競爭者有Kafaka、RocketMQ、ActiveMQ
而他比較獨特的點就是支援AMQP這樣的訊息傳遞協議 在NodeJS中就有提供amqplib這樣的library

  • amqplib 是 RabbitMQ 的官方 Node.js 客戶端庫,它提供了兩種不同的API風格:Promise-based API 和 Callback-based API。
    1
    2
    const amqp = require('amqplib');
    const amqp = require('amqplib/callback_api');

在RabbitMQ的世界中主要分成3個角色

  1. 生產者 Producer:負責insert 訊息到Queue 中
  2. 佇列 Queue:負責暫存訊息
  3. 消費者 Consumer:負責消化掉特定Queue中的訊息

notification

對RabbitMQ其中幾個mode的學習筆記:

  1. Direct mode(point-to-point):
    最基本也最簡單的模式也是此篇實作的方式:D,一個Producer 負責發送訊息至Queue ,
    一個 Consumer 去那唯一的Queue 中吃掉訊息

notification

  1. Work mode:
    這個模式是由一個Producer 產生訊息到唯一的Queue中,但是由多個Consumer 共同消耗那一個Queue裡的訊息
    欸?那如果把Consumer想成是server,queue 想成是大量的請求,不覺得很像load balancer的概念嗎XD
    沒錯 利用這個模式可以做到酷似load balancer的功用讓複雜的服務應用進一步提升程式效能! 可是沒辦法像真正的load balancer有完整的負載平衡畢竟RabbitMQ本來就不是一個Load balancer的產品

ps

  • Load balance分兩種應用:
    1. 網絡層面:以同步的方式,因爲一個HTTP的請求本來就需要等待響應 比如TCP ACK資訊等等
    2. 應用層面:以非同步的方式,一個消費請求在消耗queue的訊息時另一個消費請求不需要等待它完成

notification

  1. Publish/Subscribe mode:
    這個模式不像前面兩個那麼直接好懂了,現在Producer不是直接將訊息丟進Queue中,而是將訊息交「Exchange」,可以先把Exchange想成一個管理員 而他有三種處理訊息的方式
    1. direct (將訊息附加上一把新鑰匙)
    2. fanout(將訊息附上會員限制)
    3. topic(將訊息附上一把多功能鑰匙)

而在 Publish/Subscribe 模式中,使用的是 Exchange 的 fanout type
這時他就像一個掌握很多不同機密資訊的偵探,你想要資訊那你必須要給他$$ XD,而這一段腦補中的你就是指Queue,$$則代表你必須訂閱Subscribe偵探才能夠獲取資料,在直白點,Exchange只會把訊息丟進那些訂閱他的Queue中~~

notification

實作方式:

實作的環境是NodeJS,而RabbitMQ是用Docker包住的。
使用Promise-based API

1
const amqp = require('amqplib');
  • 連線方面其實很簡單:
    ps: @後的是容器名稱(記得確保後端容器跟rabbitmq容器在同個docker網路下)

    1
    2
    const connection = await amqp.connect(`amqp://${process.env.RABBITMQ_DEFAULT_USER}:${process.env.RABBITMQ_DEFAULT_PASS}@rabbitmq:5672`);
    const channel = await connection.createChannel();
  • 連線好後便可以建立Queue啦~
    durable代表你想不想持久化你的message

    1
    await channel.assertQueue(queue, { durable: false });
  • 將message enqueue
    記得必須將message字串化

    1
    2
    const message = JSON.stringify(mailOptions);
    channel.sendToQueue(queue, Buffer.from(message));
  • 消化message:
    此處我的做法是用於消化掉message的當下將這個message包成email並透過NodeJS的nodemailer發出去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    channel.consume(queue, async (msg) => {
    console.log(msg.content.toString());
    const mailOptions = JSON.parse(msg.content.toString());
    await module.exports.sendMail(mailOptions)
    .then(() => {
    console.log('Email sent:', mailOptions.to);
    channel.ack(msg); // 確認訊息已處理
    })
    .catch((error) => {
    console.error('Error sending email:', error);
    channel.reject(msg, false); // 拒絕訊息並不再重新排程
    });
    });

    有了以上關鍵步驟其實就可以做到最簡單的RabbitMQ訊息代理啦!

我遇到哪些Bug,如何解決?

Error: connect ECONNREFUSED X.X.X.X:5672at TCPConnectWrap.afterConnect [as oncomplete]
—> 確保連線時的帳密、端口一致且rabbitmq-server IP (container name)正確後還是錯的話可以去嘗試給rabbitmq server 更多時間準備連線(用loop搭配setTimeOut去不斷要求失敗重連直到成功 像極了苦澀的人生@@)

接著可能會遇到ETIMEDOUT的問題,老實說我不確定我怎麼解的但似乎手動重啟rabbit-server後就可以了(別放棄任何可能性@@ Just try it !)

如果沒有用的話可以嘗試從這些問題檢查起:

  1. Docker-compose.yml設置有問題 ex: rabbitmq跟應用程式是否在同個網路下、帳號密碼是否與連接時一致
  2. 安全群組有無開啟5672,15672的端口
  3. 連線準備時間足夠?(超大的坑)
  4. Nginx/防火牆的設置是否影響?

THANKS WATCHING