RabbitMQ-Routing

Routing

之前完成了 fanout 的 logging system

但是我們不希望每一則訊息都會通知到每一個人

也希望會有一些特定的訊息推送

這時就可以利用 direct 來做訊息的發送

可是 direct 並沒有辦法做到多個條件分類 Route

因為不希望 logging system 只能依據嚴重性來發送訊息

例如 unix 中的 syslog 中就可以依據嚴重性或是設備其他條件來發訊息傳遞

會更加彈性化

此時為了要達到這個需要

必須使用較為複雜的 topic exchange

Topic Exchange

當訊息發送到 topic exchage 的時候 route_key 是由多個字使用 . 來做分隔組成

這些字也不是隨意選定

通常都代表著 features

Example:

  • “stock.usd.nyse”
  • “nyse.vmw”
  • “quick.orange.rabbit”.

上述都是可以當成 Routing 的範例

最多可以接受 255 bytes 的大小

### Binding Key

可以有兩種特別的綁定方式

* ‘*’ (star) 可以取代一個字

  • ‘#’ (hash) 可以取代零或多個字

Example

範例中我們發送關於描述動物的訊息

訊息將會以三個字(兩個.)的方式來發送

第一個字描述速度

第二個字描述描述顏色

第三個描述種類

建立建立三種不同的 Binding key

  1. .orange.“ //所有橘色的動物
  2. ..rabbit” // 所有兔子類的動物
  3. “lazy.#” // 所有 lazy 的動物

若發送一個 “quick.orange.rabbit” 會發送給兩個 Queue

“lazy.orange.elephone” 也會發送給兩個 Queue

“quick.orange.fox” 只會發送給一個 Queue

“quick.brown.fox” 則不會發送給任何 Queue 而被棄用

若我們發送單一字節 如”Orange”

這些都不會符合 binding routing

發送四個字節 “quick.orange.male.rabbit”

因為最後一個字節有符合

將會被傳到第二個 Queue

```
Topic Exchange

Topic Exchange 是相當強大的 Exchange

而且可以模仿其他不同的 Exchange

如果有你使用 ”#” 則可以取得所有 Exchange

效果就如同 fanout

若是沒有使用 “*”, “#” 來做 Routing

效果則是如同 direct一樣

Putting it all together

emit_log_topic.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const amqp = require('amqplib/callback_api');

amqp.connect('amqp:localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
const ex = 'topic_logs';
const args = process.argv.slice(2);
const key = (args.length > 0) ? args[0] : 'anonymous.info';
const msg = args.slice(1).join(' ') || 'Hello World';

ch.assertExchange(ex, 'topic', { durable: false });
ch.publish(ex, key, new Buffer(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});

setTimeout(function () { conn.close(); process.exit(0) }, 500);
});

receive_logs_topic.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const amqp = require('amqplib/callback_api');

const args = process.argv.slice(2);

if (args.length === 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}

amqp.connect('amqp://localhost', function (err, conn) {
console.log(err);
conn.createChannel(function (err, ch) {
const ex = 'topic_logs';

ch.assertExchange(ex, 'topic', { durable: false });

ch.assertQueue('', { exclusive: true }, function (err, q) {
console.log(' [*] Waiting for logs. To exit press CTRL+C');

args.forEach(function (key) {
ch.bindQueue(q.queue, ex, key);
});

ch.consume(q.queue, function (msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true });
});
});
});

RabbitMQ-PublishAndSubscribe

Publish and Subscribe

為了說明發布與訂閱

我們將會建立一個簡單的 log system

這包含了兩隻程式

一隻會發布 log

另一隻則會接收並且 print 在 console上

若我們有多個接收的程式

他們就都會接收到同樣的訊息

如此的話

我們就可以一個程式在接收到 Log 後寫入檔案

另一個接收到 Log 則將訊息顯示在螢幕上

也就是說也就是說發布的訊息將會被所有接收者接收

Exchanges

我們之前教學的內容

  • 生產者負責發送訊息
  • Queue 是任務的暫存區
  • 客戶是負責接收訊息

RabbitMQ 的核心是生產者不直接發送任何訊息進入Queue

甚至也不知道 Message 發送後會進入哪一個Queue

生產者只需要將 Message 發送給 Exchange 就好了

Exchange 必須十分清楚接收到了訊息之後要如何處理

加入特定的 Queue?

加到多個 Queue?

或是應該捨棄

規則則由 Exchange type 定義

有幾種 Exchange type 可以使用

  • Direct
  • Topic
  • headers
  • fanout

這個範例是以 fanout 為主

先建立一個 fanout 類型的 type 命名為 log

1
ch.assertExchange('logs', 'fanout', {durable: false});

fanout 主要就是廣播給所有的 channel 知道

很適合這次的 Log 範例

1
2
3
4
5
6
7
8
9
//Listing exchanges

列出可以使用的 Exchange type 可以使用命令列查詢

$ sudo rabbitmqctl list_exchanges

列表會顯示 amq.*

發送預設的 Exchange
1
ch.sendToQueue('hello', new Buffer('Hello World!'));

我們發送一個 訊息

1
ch.publish('logs', '', new Buffer('Hello World!'));

第二個值給空字串代表我們沒有要發送給其他 chaneel, 只有要發送給 log

Temporary queues

可以將 Queue 定義一個 name

而 producers 要與 consumers 要共享時

就可以依據 name 做為指定 Queue 的依據

而對於每一個 Queue 重視的是當前的訊息

對於已經取得過的訊息並不重視

所以我們在取得一個新的 Queue 時有兩個事情是很重要的

  1. 初始化必須是空的一個 Queue
  2. 所有連結者斷線後,必須刪掉Queue

Binding

剛剛有建立了一個 fanout 的 channel 名為 log

現在我們希望告訴這個 log 有訊息的時候可以通知我

這個行為叫做 binding

1
ch.bindQueue(queue_name, 'logs', '');
1
2
3
# 可以列出目前有binding 的 queue list

rabbitmqctl list_bindings

Example

emeit_log.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
var ex = 'logs';
var msg = process.argv.slice(2).join(' ') || 'Hello World!';

ch.assertExchange(ex, 'fanout', { durable: false });
ch.publish(ex, '', new Buffer(msg));
console.log(" [x] Sent %s", msg);
});

setTimeout(function () { conn.close(); process.exit(0) }, 500);
});

recive_log.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
var ex = 'logs';

ch.assertExchange(ex, 'fanout', { durable: false });

ch.assertQueue('', { exclusive: true }, function (err, q) {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
ch.bindQueue(q.queue, ex, '');

ch.consume(q.queue, function (msg) {
console.log(" [x] %s", msg.content.toString());
}, { noAck: true });
});
});
});

RabbitMQ-WorkQueue

Work Queues

避免一些佔用大量資源或是時間的工作,

我們幫每份工作定義一個 channel

透過 MessageQueue 發送文字訊息

通知增加一個 Task

而 Queue 會自動在未來某個時間點處理這件事情

Round-robin dispatching

使用任務隊列的優點之一是能夠輕鬆地併行工作

如果我們正在建立許多的的工作

我們可以增加更多的worker

這樣可以輕易地擴充架構

範例

下列的範例可以開三個 Terminal console

兩個執行 work.js

而一個執行 new_task.js

1
2
# shell 1
$ node worker.js
1
$ node worker.js

在第三個我們將發布新的任務

一旦您開始使用消費者

您可以發布一些消息

1
2
3
4
5
6
# shell 3
./new_task.js First message.
./new_task.js Second message..
./new_task.js Third message...
./new_task.js Fourth message....
./new_task.js Fifth message.....

執行結果

1
2
3
4
5
6
# shell 1
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
1
2
3
4
5
# shell 2
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

Message acknowledgment

如果有一個長時間的任務

在執行過程中 crash

我們將會失去這個執行的任務

但是我們不希望失去任務

所以我們可以把任務交給其他的 worker

為了確保任務不會消失

所以提供了 Message acknowledgment (消息確認)

若是 worker Crash 連接關閉或 TCP 連接結束

並不發送確認訊息

RabbitMQ 將會重新排隊

若有其他 worker 則會將任務轉給其他 worker

所以即使有長時間執行的任務

也會確保該任務執行完成不會丟失

在上一個例子中

消息確認功能被關閉

** {noAck: false} **

1
2
3
4
5
6
7
8
9
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, {noAck: false});

上述範例可以確認任務會執行

若 worker Crash 也會把任務重新執行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
忘記確認

錯過這個錯誤是一個常見的錯誤

這是一個容易的錯誤

但後果是嚴重的

當您的客戶端退出(可能看起來像隨機重新傳遞)時

消息將被重新傳遞

但是RabbitMQ將會消耗越來越多的內存

因為它將無法釋放任何未包含的消息

Message durability

我們已經學會瞭如何確保即使 worker Crash

任務也不會丟失

但是如果RabbitMQ服務器停止

我們的任務仍然會丟失

當RabbitMQ退出或崩潰時

它會忘記隊列和消息

需要兩件事來確保消息不會丟失:我們需要將 Queue 和消息 durable 設定為 true

1
ch.assertQueue('hello', {durable: true});

雖然這個命令本身是正確的

但是在我們目前的設置中是不行的

這是因為我們已經定義了一個不耐用的名為 hello 的隊列。

RabbitMQ 不允許您重新定義具有不同參數的現有隊列

並會向嘗試執行此操作的任何程序返回錯誤

但是有一個快速的解決方法 - 讓我們用不同的名稱聲明一個隊列

例如task_queue

1
ch.assertQueue('task_queue', {durable: true});

這種持久的選項更改需要適用於 new_taskworker代碼。

在這一點上 我們確信

即使RabbitMQ重新啟動

task_queue Queue也不會丟失

現在我們需要使用持久化選項 Channel.sendToQueue 來將消息標記為持久性

1
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

注意消息持久性

將消息標記為持久性不能完全保證消息不會丟失

雖然它告訴RabbitMQ將消息保存到硬碟

但是當RabbitMQ接受消息並且還沒有保存時

仍然有一個很短的時間窗口

RabbitMQ不會對每個消息執行fsync(2) - 它可能只是保存到緩存中

而不是真正寫入磁盤

持久性保證不強

但對我們的簡單任務隊列來說已經足夠了

如果您需要更強大的保證

那麼您可以使用發布商確認

Fair dispatch

您可能已經注意到

dispatching 仍然無法正常工作

例如在兩個 worker 的情況下

當所有奇怪的信息都很消耗資源與時間

甚至信息很小的時候

一個worker將不斷忙碌

另一個worker幾乎不會做任何工作

那麼 RabbitMQ 還會平均分配消息

這是因為當消息進入隊列時

RabbitMQ只會分派消息

它不看 sender 的未確認消息的數量

它只是盲目地向第n個 sender 發送每個第n個消息。

1
ch.prefetch(1);
1
2
3
4
5
6
7
8
9
10
注意 Queue大小

如果所有的 **worker** 都忙

你的Queue可以填滿

你會想要注意的是

也許增加更多的 worker 或者有其他的策略

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
var q = 'task';

ch.assertQueue(q, { durable: true });

console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function (msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function () {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, { noAck: false });
});
});

Client

new_task.js

1
2
3
4
5
6
7
8
9
10
11
12
13
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
const q = 'task';
const msg = process.argv.slice(2).join(' ') || 'Hello world!';
ch.assertQueue(q, { durable: true });
ch.sendToQueue(q, new Buffer(msg), { persistent: true });

console.log(" [x] Sent '%s'", msg);
});
setTimeout(function () { conn.close(); process.exit(0) }, 500);
});

RabbitMQ-Install

Rabbit Message Queue

Installing on Homebrew

Step 1

1
$ brew update

Step 2

1
$ brew install rabbitmq

Step 3

1
$ rabbitmqctl status

Example

Client

send.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel((err, ch) => {
const q = 'hello';
const msg = 'Hello World!';

ch.assertQueue(q, { durable: false });
ch.sendToQueue(q, new Buffer('Hello world'));
console.log(" [x] Sent 'Hello World!'");

});
setTimeout(function () { conn.close(); process.exit(0) }, 500);
});

Server

receive.js

1
2
3
4
5
6
7
8
9
10
11
12
13
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel((err, ch) => {
const q = 'hello';
const msg = 'Hello World!';

ch.assertQueue(q, { durable: false });
ch.sendToQueue(q, new Buffer('Hello world'));

});
setTimeout(function () { conn.close(); process.exit(0) }, 500);
});

Result

Demo Result

參考文章

install

Leetcode-Q6

12 Integer to Roman

Description

Given an integer, convert it to a roman numeral.

Input is guaranteed to be within the range from 1 to 3999.

這是進制的轉換

要將十進位進位轉換成羅馬數字

數字的範圍在1~3999

關於羅馬數字的規則

  • 羅馬數字總共會有七個 Ⅰ(1)、Ⅴ(5)、Ⅹ(10)、Ⅼ(50)、Ⅽ(100)、Ⅾ(500)和Ⅿ(1000)

  • 重複數次:一個羅馬數字重複幾次,就表示這個數的幾倍

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const intToRoman = function (num) {
if (num < 1 || num > 3999) {
return '';
}
const lookup = { M: 1000, CM: 900, D: 500, CD: 400, C: 100, XC: 90, L: 50, XL: 40, X: 10, IX: 9, V: 5, IV: 4, I: 1 };
let romanStr = '';
for (const key in lookup) {
while (num >= lookup[key]) {
romanStr += key;
num -= lookup[key];
}
}
return romanStr;
};

因為有規範在1~3999之間

所以先檢查 num 是否在這個區間

若不在的話則回傳空字串

先用一個物件將羅馬數字設為Key與十進位數字設為Value

準備等等做比對計算

然後使用一個迴圈依序對此物件中的各值去做轉換

在迴圈中加上一個 while 迴圈

當值比較大的時候

代表它可以在轉換一次

所以在romanStr中加上一次符號

num 在扣除相對的value

直到num為零為止

此時romanStr就是相對應的羅馬字串

回傳後就可以解答

Leetcode-Q5

561 Array Partition I

Description

Given an array of 2n integers, your task is to group these integers into n pairs of integer,

say (a1, b1), (a2, b2), …, (an, bn) which makes sum of min(ai, bi) for all i from 1 to n as large as possible.

一個 2n 個整數的陣列

將這些整入分成N對整數
(a1, a2), (b1, b2)…..

並使(ai, bi)的最大總和數

Hint

將陣列做排序

然後切個 n 個陣列 每個陣列兩個元素

再把各自陣列的第二個元素相加

就可以得到答案

Example

1
2
3
4
5
6
nums = nums.sort((a, b) => (a - b));
let total = 0;
for (let index = 0; index < nums.length; index += 2) {
total += nums[index];
}
return total;

Leetcode-Q4

9 Palindrome Number

迴文數

wiki

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const INT_MAX = 2147483647;
let y = 0;

if (x > 0 && x < 10) {
return true;
} else {
let str = x.toString(),
length = str.length,
total = 0;

for (let index = 0; index < length; index++) {
let num = parseInt(str[index]) * Math.pow(10, index);
total += num;
}
if (total > INT_MAX
|| total < -(1 + INT_MAX)
|| x > INT_MAX
|| x < -(1 + INT_MAX)) {
return false;
} else if (total === x) {
return true;
} else {
return false;
}
}

這一題的原理跟上一題 Reverse Integer 原理類似

在翻轉變數之後

比對是否相等

若是相等就回傳 true

不相等就回傳 false

Leetcode-Q3

7 Reverse Integer

Example

Example1: x = 123, return 321

Example2: x = -123, return -321

解題

Version 1

第一個想到的方式就是先用字串的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var reverse = function (x) {

const INT_MAX = 2147483647;

const isNegativeNumber = (x < 0) ? true : false,
y = Math.abs(x).toString().split('')
length = y.length;

let result = [];
for (let i = 0; i < length; i++) {
result.push(y.pop());
}

let total = parseInt(result.join(''));

total = isNegativeNumber ? (0 - total) : total;
if (total > INT_MAX || total < -(1 + INT_MAX)) {
return 0;
} else {
return total;
}
};

我先確定他是否為負數

然後把數字轉絕對值 切一維陣列

在使用for 迴圈來迴轉

最後檢查是否有超出32 bit與回傳正負總值

但是這樣的效能實在欠佳

Version 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var reverse = function (x) {
if (x >= 0 && x < 10) {
return x;
}
const INT_MAX = 2147483647;

const isNegativeNumber = (x < 0) ? true : false,
y = Math.abs(x).toString().split(''),
length = y.length;

let total = 0;
y.map((v, index) => {
const value = parseInt(v) * Math.pow(10, index);
total += value;
})
total = isNegativeNumber ? (0 - total) : total;
if (total > INT_MAX || total < -(1 + INT_MAX)) {
return 0;
} else {
return total;
}
};

第二種方式其實跟第一種大同小異

只是我是利用數字十進位數的方式

使用迴圈加回去一個值

最後再檢查是否有超出32 bit

因為使用數字計算

所以效能提昇了不少

最高衝到了51 %

不過還是略有欠缺

若有其他解法再來更新

Leetcode-Q2

1 Two sum

First solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var twoSum = function(nums, target) {
var index,
index2 = -1,
i = 0;
while(index === undefined){
var num = nums[i],
need = target - num;
index2 = nums.indexOf(need)
if(index2 === i){
index = undefined;
index2 = -1
}else if(index2 !== -1){
index = i;
}
i++;
}
return [index, index2];
};

因為是兩層的巢狀迴圈

加上有兩個 if 判斷式

所以效能會很差

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var twoSum = function(nums, target) {

var map = {};
for(var i = 0 ; i < nums.length ; i++){
var v = nums[i];

for(var j = i+1 ; j < nums.length ; j++ ){
if( nums[i] + nums[j] == target ){
return [i,j];
}
}

}
};

因為是兩層的巢狀迴圈

加上有一個 if 判斷式

所以效能好一點

但還是不理想

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var map = {};
for(var i = 0 ; i < nums.length ; i++){
var v = nums[i];

if(map[target-v] >= 0){
// 如果 target - v可以在map中找到值x,代表之前已經出現過值x, target = x + v
// 因此回傳 x的位置與目前v的位置
return [map[target-v],i]
} else {
// 使用map儲存目前的數字與其位置

map[v] = i;
}
}

僅僅使用一層迴圈

若沒有找到的話就會記錄在 map 中

所以效能提升不少

Leetcode-Q1

538 Convert BST to Greater Tree

Description

Given a Binary Search Tree (BST), convert it to a Greater Tree such that every key of the original BST is changed to the original key plus sum of all keys greater than the original key in BST.

1
輸入一個Binary Search Tree(BST), 將每一個節點得值更改為原始的值加上加上大於BST中 Node 的值的總和

Example

1
2
3
4
5
6
7
8
9
Input: The root of a Binary Search Tree like this:
5
/ \
2 13

Output: The root of a Greater Tree like this:
18
/ \
20 13
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  var convertBST = function(root) {
var vals = [];
var count = 0;
function visit1(root){
if(root){
visit1(root.left);
vals.push(root.val);
visit1(root.right);
}
}
visit1(root);

function visit2(root){
if(root){
visit2(root.right);
count += vals.pop();
root.val = count;
visit2(root.left);
}
}
visit2(root);
return root;
}

Note

visit1: 利用遞迴將 Node 往下延伸到最左邊子元素的時候依序push 到陣列中

visit2: 利用遞迴將 Node 往下延伸到最右邊的子元素依序將 value 修改為加總得值

利用二元樹的特性來做輪巡並修改值

|