前言(preface):
本篇是記錄在學習RabbitMQ的過程與經驗,會介紹RabbitMQ的概念、實作方式、以及一些常見Bug解法。如果有錯請大哥指點QQ
概念(Concepts):
RabbitMQ本質是一個 Queue,但專門用於訊息發送接收的一種訊息代理軟體
,就像是Nginx 代理Http/Https請求的概念,他專門代理發送者傳出的訊息,也因此能做到傳送訊息雙方的「解耦」,因為發送方只需將訊息交給RabbitMQ而不需去知道哪個應用程式要使用到
RabbitMQ的競爭者有Kafaka、RocketMQ、ActiveMQ
而他比較獨特的點就是支援AMQP這樣的訊息傳遞協議 在NodeJS中就有提供amqplib這樣的library
- amqplib 是 RabbitMQ 的官方 Node.js 客戶端庫,它提供了兩種不同的API風格:Promise-based API 和 Callback-based API。
1
2const amqp = require('amqplib');
const amqp = require('amqplib/callback_api');
在RabbitMQ的世界中主要分成3個角色
- 生產者 Producer:負責insert 訊息到Queue 中
- 佇列 Queue:負責暫存訊息
- 消費者 Consumer:負責消化掉特定Queue中的訊息
對RabbitMQ其中幾個mode的學習筆記:
- Direct mode(point-to-point):
最基本也最簡單的模式也是此篇實作的方式:D,一個Producer 負責發送訊息至Queue ,
一個 Consumer 去那唯一的Queue 中吃掉訊息
- Work mode:
這個模式是由一個Producer 產生訊息到唯一的Queue中,但是由多個Consumer 共同消耗那一個Queue裡的訊息
欸?那如果把Consumer想成是server,queue 想成是大量的請求,不覺得很像load balancer的概念嗎XD
沒錯 利用這個模式可以做到酷似load balancer的功用讓複雜的服務應用進一步提升程式效能! 可是沒辦法像真正的load balancer有完整的負載平衡畢竟RabbitMQ本來就不是一個Load balancer的產品
ps
- Load balance分兩種應用:
- 網絡層面:以同步的方式,因爲一個HTTP的請求本來就需要等待響應 比如TCP ACK資訊等等
- 應用層面:以非同步的方式,一個消費請求在消耗queue的訊息時另一個消費請求不需要等待它完成
- Publish/Subscribe mode:
這個模式不像前面兩個那麼直接好懂了,現在Producer不是直接將訊息丟進Queue中,而是將訊息交「Exchange」,可以先把Exchange想成一個管理員 而他有三種處理訊息的方式- direct (將訊息附加上一把新鑰匙)
- fanout(將訊息附上會員限制)
- topic(將訊息附上一把多功能鑰匙)
而在 Publish/Subscribe 模式中,使用的是 Exchange 的 fanout type
這時他就像一個掌握很多不同機密資訊的偵探,你想要資訊那你必須要給他$$ XD,而這一段腦補中的你就是指Queue,$$則代表你必須訂閱Subscribe偵探才能夠獲取資料,在直白點,Exchange只會把訊息丟進那些訂閱他的Queue中~~
實作方式:
實作的環境是NodeJS,而RabbitMQ是用Docker包住的。
使用Promise-based API
1 | const amqp = require('amqplib'); |
連線方面其實很簡單:
ps: @後的是容器名稱(記得確保後端容器跟rabbitmq容器在同個docker網路下)1
2const connection = await amqp.connect(`amqp://${process.env.RABBITMQ_DEFAULT_USER}:${process.env.RABBITMQ_DEFAULT_PASS}@rabbitmq:5672`);
const channel = await connection.createChannel();連線好後便可以建立Queue啦~
durable代表你想不想持久化你的message1
await channel.assertQueue(queue, { durable: false });
將message enqueue
記得必須將message字串化1
2const message = JSON.stringify(mailOptions);
channel.sendToQueue(queue, Buffer.from(message));消化message:
此處我的做法是用於消化掉message的當下將這個message包成email並透過NodeJS的nodemailer發出去1
2
3
4
5
6
7
8
9
10
11
12
13channel.consume(queue, async (msg) => {
console.log(msg.content.toString());
const mailOptions = JSON.parse(msg.content.toString());
await module.exports.sendMail(mailOptions)
.then(() => {
console.log('Email sent:', mailOptions.to);
channel.ack(msg); // 確認訊息已處理
})
.catch((error) => {
console.error('Error sending email:', error);
channel.reject(msg, false); // 拒絕訊息並不再重新排程
});
});有了以上關鍵步驟其實就可以做到最簡單的RabbitMQ訊息代理啦!
我遇到哪些Bug,如何解決?
Error: connect ECONNREFUSED X.X.X.X:5672at TCPConnectWrap.afterConnect [as oncomplete]
—> 確保連線時的帳密、端口一致且rabbitmq-server IP (container name)正確後還是錯的話可以去嘗試給rabbitmq server 更多時間準備連線(用loop搭配setTimeOut去不斷要求失敗重連直到成功 像極了苦澀的人生@@)
接著可能會遇到ETIMEDOUT
的問題,老實說我不確定我怎麼解的但似乎手動重啟rabbit-server後就可以了(別放棄任何可能性@@ Just try it !)
如果沒有用的話可以嘗試從這些問題檢查起:
- Docker-compose.yml設置有問題 ex: rabbitmq跟應用程式是否在同個網路下、帳號密碼是否與連接時一致
- 安全群組有無開啟5672,15672的端口
- 連線準備時間足夠?(超大的坑)
- Nginx/防火牆的設置是否影響?
THANKS WATCHING