Publish and Subscribe
為了說明發布與訂閱
我們將會建立一個簡單的 log system
這包含了兩隻程式
一隻會發布 log
另一隻則會接收並且 print 在 console上
若我們有多個接收的程式
他們就都會接收到同樣的訊息
如此的話
我們就可以一個程式在接收到 Log 後寫入檔案
另一個接收到 Log 則將訊息顯示在螢幕上
也就是說也就是說發布的訊息將會被所有接收者接收
Exchanges
我們之前教學的內容
- 生產者負責發送訊息
- Queue 是任務的暫存區
- 客戶是負責接收訊息
RabbitMQ 的核心是生產者不直接發送任何訊息進入Queue
甚至也不知道 Message 發送後會進入哪一個Queue
生產者只需要將 Message 發送給 Exchange 就好了
Exchange 必須十分清楚接收到了訊息之後要如何處理
加入特定的 Queue?
加到多個 Queue?
或是應該捨棄
規則則由 Exchange type 定義
有幾種 Exchange type 可以使用
- Direct
- Topic
- headers
- fanout
這個範例是以 fanout 為主
先建立一個 fanout 類型的 type 命名為 log
1 | ch.assertExchange('logs', 'fanout', {durable: false}); |
fanout 主要就是廣播給所有的 channel 知道
很適合這次的 Log 範例
1 | //Listing exchanges |
1 | ch.sendToQueue('hello', new Buffer('Hello World!')); |
我們發送一個 訊息
1 | ch.publish('logs', '', new Buffer('Hello World!')); |
第二個值給空字串代表我們沒有要發送給其他 chaneel, 只有要發送給 log
Temporary queues
可以將 Queue 定義一個 name
而 producers 要與 consumers 要共享時
就可以依據 name 做為指定 Queue 的依據
而對於每一個 Queue 重視的是當前的訊息
對於已經取得過的訊息並不重視
所以我們在取得一個新的 Queue 時有兩個事情是很重要的
- 初始化必須是空的一個 Queue
- 所有連結者斷線後,必須刪掉Queue
Binding
剛剛有建立了一個 fanout 的 channel 名為 log
現在我們希望告訴這個 log 有訊息的時候可以通知我
這個行為叫做 binding
1 | ch.bindQueue(queue_name, 'logs', ''); |
1 | # 可以列出目前有binding 的 queue list |
Example
emeit_log.js
1 | const amqp = require('amqplib/callback_api'); |
recive_log.js
1 | var amqp = require('amqplib/callback_api'); |