Swoole Task實例

2022-07-12 11:26 更新

Swoole Task異步任務介紹

swoole 的異步任務task系統(tǒng)可以很方便的為我們在開發(fā)的過程中調(diào)用異步任務的執(zhí)行,而無需等待。

常見使用場景:

task模塊用來做一些異步的慢速任務,比如webim中發(fā)廣播,發(fā)送郵件,異步訂單處理、異步支付處理等。

  • task進程必須是同步阻塞的
  • task進程支持定時器

node.js 假如有10萬個連接,要發(fā)廣播時,那會循環(huán)10萬次,這時候程序不能做任何事情,不能接受新的連接,也不能收包發(fā)包。

而swoole不同,丟給task進程之后,worker進程可以繼續(xù)處理新的數(shù)據(jù)請求。任務完成后會異步地通知worker進程告訴它此任務已經(jīng)完成。

當然task模塊的作用還不僅如此,實現(xiàn)PHP的數(shù)據(jù)庫連接池,異步隊列等,還需要進一步挖掘。

簡單實例:

$serv = new Swoole\Server("127.0.0.1", 9502);
$serv->set(array('task_worker_num' => 4));
$serv->on('Receive', function($serv, $fd, $from_id, $data) {
    $task_id = $serv->task("Async");
    echo "Dispath AsyncTask: id=$task_id\n";
});
$serv->on('Task', function ($serv, $task_id, $from_id, $data) {
    echo "New AsyncTask[id=$task_id]".PHP_EOL;
    $serv->finish("$data -> OK");
});
$serv->on('Finish', function ($serv, $task_id, $data) {
    echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;
});
$serv->start();

swoole_server task異步任務介紹

投遞一個異步任務到task_worker池中。此函數(shù)是非阻塞的,執(zhí)行完畢會立即返回。worker進程可以繼續(xù)處理新的請求。

int swoole_server::task(mixed $data, int $dst_worker_id = -1) 
$task_id = $serv->task("some data");
//swoole-1.8.6或更高版本
$serv->task("taskcallback", -1, function (swoole_server $serv, $task_id, $data) {
    echo "Task Callback: ";
    var_dump($task_id, $data);
});
  • $data要投遞的任務數(shù)據(jù),可以為除資源類型之外的任意PHP變量
  • $dst_worker_id可以制定要給投遞給哪個task進程,傳入ID即可,范圍是0 - (serv->task_worker_num -1)
  • 調(diào)用成功,返回值為整數(shù)$task_id,表示此任務的ID。如果有finish回應,onFinish回調(diào)中會攜帶$task_id參數(shù)
  • 調(diào)用失敗,返回值為false
  • 未指定目標Task進程,調(diào)用task方法會判斷Task進程的忙閑狀態(tài),底層只會向處于空閑狀態(tài)的Task進程投遞任務
  • 1.8.6版本增加了第三個參數(shù),可以直接設置onFinish函數(shù),如果任務設置了回調(diào)函數(shù),Task返回結果時會直接執(zhí)行制定的回調(diào)函數(shù),不再執(zhí)行Server的onFinish回調(diào)

$dst_worker_id在1.6.11+后可用,默認為隨機投遞
$task_id是從0-42億的整數(shù),在當前進程內(nèi)是唯一的
task方法不能在task進程/用戶自定義進程中調(diào)用

此功能用于將慢速的任務異步地去執(zhí)行,比如一個聊天室服務器,可以用它來進行發(fā)送廣播。當任務完成時,在task進程中調(diào)用$serv->finish("finish")告訴worker進程此任務已完成。當然swoole_server->finish是可選的。

task底層使用Unix Socket管道通信,是全內(nèi)存的,沒有IO消耗。單進程讀寫性能可達100萬/s,不同的進程使用不同的管道通信,可以最大化利用多核。

AsyncTask功能在1.6.4版本增加,默認不啟動task功能,需要在手工設置task_worker_num來啟動此功能
task_worker的數(shù)量在swoole_server::set參數(shù)中調(diào)整,如task_worker_num => 64,表示啟動64個進程來接收異步任務

配置參數(shù)

swoole_server->task/taskwait/finish 3個方法當傳入的$data數(shù)據(jù)超過8K時會啟用臨時文件來保存。當臨時文件內(nèi)容超過server->package_max_length 時底層會拋出一個警告。

WARN: task package is too big.

server->package_max_length 默認為2M

注意事項

  • 使用swoole_server_task必須為Server設置onTask和onFinish回調(diào),否則swoole_server->start會失敗
  • task操作的次數(shù)必須小于onTask處理速度,如果投遞容量超過處理能力,task會塞滿緩存區(qū),導致worker進程發(fā)生阻塞。worker進程將無法接收新的請求
  • 使用addProcess添加的用戶進程中無法使用task投遞任務,請使用sendMessage接口與工作進程通信

Swoole taskwait

函數(shù)原型:

string $result = swoole_server->taskwait(mixed $task_data, float $timeout = 0.5, int $dst_worker_id = -1);

taskwait與task方法作用相同,用于投遞一個異步的任務到task進程池去執(zhí)行。與task不同的是taskwait是阻塞等待的,直到任務完成或者超時返回。

$result為任務執(zhí)行的結果,由$serv->finish函數(shù)發(fā)出。如果此任務超時,這里會返回false。

taskwait是阻塞接口,如果你的Server是全異步的請使用swoole_server::task和swoole_server::finish,不要使用taskwait
第3個參數(shù)可以制定要給投遞給哪個task進程,傳入ID即可,范圍是0 - serv->task_worker_num
$dst_worker_id在1.6.11+后可用,默認為隨機投遞
taskwait方法不能在task進程中調(diào)用

onTask

在task_worker進程內(nèi)被調(diào)用。worker進程可以使用swoole_server_task函數(shù)向task_worker進程投遞新的任務。當前的Task進程在調(diào)用onTask回調(diào)函數(shù)時會將進程狀態(tài)切換為忙碌,這時將不再接收新的Task,當onTask函數(shù)返回時會將進程狀態(tài)切換為空閑然后繼續(xù)接收新的Task。

function onTask((swoole_server $serv, int $task_id, int $src_worker_id, string $data));
  • $task_id是任務ID,由swoole擴展內(nèi)自動生成,用于區(qū)分不同的任務。$task_id和$src_worker_id組合起來才是全局唯一的,不同的worker進程投遞的任務ID可能會有相同
  • $src_worker_id來自于哪個worker進程
  • $data 是任務的內(nèi)容

返回執(zhí)行結果到worker進程

1.7.2以上的版本,在onTask函數(shù)中 return字符串,表示將此內(nèi)容返回給worker進程。worker進程中會觸發(fā)onFinish函數(shù),表示投遞的task已完成。

  • return的變量可以是任意非null的PHP變量

1.7.2以前的版本,需要調(diào)用swoole_server->finish()函數(shù)將結果返回給worker進程


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號