允許多個(gè)并發(fā)用戶(hù)處理在同一個(gè)通訊通道接收的消息。這種模式使系統(tǒng)能夠同時(shí)處理多個(gè)郵件,以?xún)?yōu)化吞吐量,提高可擴(kuò)展性和可用性,以及平衡工作負(fù)載。
在云中運(yùn)行的應(yīng)用程序,可以預(yù)計(jì),以處理大量的請(qǐng)求。而不是過(guò)程的每個(gè)請(qǐng)求同步地,一個(gè)常用的方法是通過(guò)一個(gè)消息傳送系統(tǒng)到該異步地處理它們的另一服務(wù)(消費(fèi)者服務(wù)),以通過(guò)他們的應(yīng)用程序。這種策略有助于確保在應(yīng)用程序的業(yè)務(wù)邏輯沒(méi)有被阻塞,而正在處理的請(qǐng)求。
請(qǐng)求的數(shù)量可以隨著時(shí)間的原因有很多顯著變化。突然一陣在用戶(hù)活動(dòng)或聚集的請(qǐng)求,來(lái)自多個(gè)租戶(hù)未來(lái)可能會(huì)導(dǎo)致不可預(yù)測(cè)的工作負(fù)載。在高峰時(shí)間的系統(tǒng)可能需要處理許多每秒數(shù)百個(gè)請(qǐng)求,而在其他時(shí)間的數(shù)量可能是非常小的。此外,該工作的性質(zhì)進(jìn)行的處理這些請(qǐng)求可能是高度可變的。使用消費(fèi)者服務(wù)的單個(gè)實(shí)例,可能會(huì)導(dǎo)致該實(shí)例成為充斥請(qǐng)求或消息傳送系統(tǒng)可通過(guò)消息從應(yīng)用程序來(lái)的流入被重載。為了處理這種波動(dòng)的負(fù)載,該系統(tǒng)可以運(yùn)行消費(fèi)者服務(wù)的多個(gè)實(shí)例。然而這些消費(fèi)者必須協(xié)調(diào),以確保每個(gè)消息只傳送給一個(gè)單個(gè)消費(fèi)者。工作量也需要跨消費(fèi)者被負(fù)載平衡,以防止一個(gè)實(shí)例成為瓶頸。
使用消息隊(duì)列來(lái)實(shí)現(xiàn)應(yīng)用和消費(fèi)者服務(wù)的實(shí)例之間的通信信道。在消息隊(duì)列中的形式應(yīng)用帖請(qǐng)求,以及消費(fèi)者的服務(wù)實(shí)例從隊(duì)列中接收消息并對(duì)其進(jìn)行處理。這種方法使消費(fèi)者的服務(wù)實(shí)例的同一池中從應(yīng)用程序的任何實(shí)例處理消息。圖 1 示出了該架構(gòu)。
圖1 - 使用消息隊(duì)列分發(fā)工作提高到一個(gè)服務(wù)的實(shí)例
該解決方案具有以下優(yōu)點(diǎn):
在決定如何實(shí)現(xiàn)這個(gè)模式時(shí),請(qǐng)考慮以下幾點(diǎn):
注意
微軟 Azure 服務(wù)總線(xiàn)隊(duì)列可以通過(guò)使用消息會(huì)先入先出消息的順序工具保證。欲了解更多信息,請(qǐng)參閱消息傳遞模式 MSDN 上使用會(huì)話(huà)。
注意
如果您正在使用 Azure 的工作進(jìn)程可能能夠通過(guò)使用專(zhuān)用的郵件回復(fù)隊(duì)列回傳結(jié)果的應(yīng)用程序邏輯。應(yīng)用邏輯必須能夠?qū)⑦@些結(jié)果與原來(lái)的消息關(guān)聯(lián)起來(lái)。這種情況下進(jìn)行了更詳細(xì)的異步消息的引物進(jìn)行說(shuō)明。
使用這種模式時(shí):
這種模式可能不適合時(shí):
有些郵件系統(tǒng)支持會(huì)話(huà),使生產(chǎn)者對(duì)消息進(jìn)行分組在一起,并確保它們都被同一個(gè)接收者處理。這個(gè)機(jī)制可以與優(yōu)先消息使用(如果它們支持)來(lái)實(shí)現(xiàn)消息排序的一種形式,在順序從生產(chǎn)者傳送消息到單個(gè)消費(fèi)者。
Azure 提供存儲(chǔ)隊(duì)列和服務(wù)總線(xiàn)隊(duì)列,可作為一個(gè)合適的機(jī)制來(lái)實(shí)現(xiàn)這種模式。應(yīng)用邏輯可以發(fā)布消息到一個(gè)隊(duì)列,而消費(fèi)者實(shí)現(xiàn)為在一個(gè)或多個(gè)角色的任務(wù)可以檢索從這個(gè)隊(duì)列中的消息并進(jìn)行處理。對(duì)于彈性,一個(gè)服務(wù)總線(xiàn)隊(duì)列使得消費(fèi)者使用 PeekLock 模式,當(dāng)它從隊(duì)列檢索消息。這種模式實(shí)際上不是刪除消息,而只是從其他消費(fèi)者隱藏它。當(dāng)處理完它原來(lái)的用戶(hù)可以刪除該郵件。如果消費(fèi)者要失敗,偷看鎖將超時(shí),消息將再次變得可見(jiàn),讓消費(fèi)者又找回它。
有關(guān)使用 Azure 的服務(wù)總線(xiàn)隊(duì)列的詳細(xì)信息,請(qǐng)參閱服務(wù)總線(xiàn)隊(duì)列,主題和 MSDN 上的訂閱。有關(guān)使用 Azure 存儲(chǔ)隊(duì)列的信息,請(qǐng)參閱如何 MSDN 上使用隊(duì)列存儲(chǔ)服務(wù)。
從可供下載的例子 CompetingConsumers 解決方案的 QueueManager 類(lèi)下面的代碼顯示了本指南說(shuō)明了如何通過(guò)在網(wǎng)絡(luò)或輔助角色開(kāi)始的事件處理程序使用 QueueClient 實(shí)例中創(chuàng)建一個(gè)隊(duì)列。
private string queueName = ...;
private string connectionString = ...;
...
?
public async Task Start()
{
// Check if the queue already exists.
var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
if (!manager.QueueExists(this.queueName))
{
var queueDescription = new QueueDescription(this.queueName);
?
// Set the maximum delivery count for messages in the queue. A message
// is automatically dead-lettered after this number of deliveries. The
// default value for dead letter count is 10.
queueDescription.MaxDeliveryCount = 3;
?
await manager.CreateQueueAsync(queueDescription);
}
...
?
// Create the queue client. By default the PeekLock method is used.
this.client = QueueClient.CreateFromConnectionString(
this.connectionString, this.queueName);
}
下面的代碼片段顯示了一個(gè)應(yīng)用程序如何創(chuàng)建和發(fā)送一批消息隊(duì)列。
public async Task SendMessagesAsync()
{
// Simulate sending a batch of messages to the queue.
var messages = new List<BrokeredMessage>();
?
for (int i = 0; i < 10; i++)
{
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
messages.Add(message);
}
await this.client.SendBatchAsync(messages);
}
下面的代碼顯示了如何消費(fèi)服務(wù)實(shí)例可以從隊(duì)列中下一個(gè)事件驅(qū)動(dòng)的方式接收消息。該 processMessageTask 參數(shù)的 ReceiveMessages 法為代表,它引用在收到消息時(shí)運(yùn)行的代碼。此代碼是異步運(yùn)行。
private ManualResetEvent pauseProcessingEvent;
...
?
public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
// Set up the options for the message pump.
var options = new OnMessageOptions();
?
// When AutoComplete is disabled it is necessary to manually
// complete or abandon the messages and handle any errors.
options.AutoComplete = false;
options.MaxConcurrentCalls = 10;
options.ExceptionReceived += this.OptionsOnExceptionReceived;
?
// Use of the Service Bus OnMessage message pump.
// The OnMessage method must be called once, otherwise an exception will occur.
this.client.OnMessageAsync(
async (msg) =>
{
// Will block the current thread if Stop is called.
this.pauseProcessingEvent.WaitOne();
?
// Execute processing task here.
await processMessageTask(msg);
},
options);
}
...
?
private void OptionsOnExceptionReceived(object sender,
ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
...
}
需要注意的是自動(dòng)縮放的功能,例如可在天青,可用于啟動(dòng)和停止的角色實(shí)例的隊(duì)列長(zhǎng)度的波動(dòng)。欲了解更多信息,請(qǐng)參閱自動(dòng)縮放指導(dǎo)。另外,沒(méi)有必要維持角色實(shí)例和工人之間的一對(duì)一的對(duì)應(yīng)過(guò)程,單個(gè)角色實(shí)例可以實(shí)現(xiàn)多個(gè)工作進(jìn)程。欲了解更多信息,請(qǐng)參閱計(jì)算資源整合模式。
更多建議: