Timer定時(shí)器、心跳檢測(cè)及Task進(jìn)階實(shí)例:mysql連接池

2018-02-24 16:13 更新

環(huán)境說明: 系統(tǒng):Ubuntu14.04 (安裝教程包括CentOS6.5)
PHP版本:PHP-5.5.10
swoole版本:1.7.7-stable

1.Timer定時(shí)器

在實(shí)際應(yīng)用中,往往會(huì)遇到需要每隔一段時(shí)間重復(fù)做一件事,比如心跳檢測(cè)、訂閱消息、數(shù)據(jù)庫(kù)備份等工作。通常,我們會(huì)借助PHP的time()以及相關(guān)函數(shù)自己實(shí)現(xiàn)一個(gè)定時(shí)器,或者使用crontab工具來實(shí)現(xiàn)。但是,自定義的定時(shí)器容易出錯(cuò),而使用crontab則需要編寫額外的腳本文件,無論是遷移還是調(diào)試都比較麻煩。
因此,Swoole提供了一個(gè)內(nèi)置的Timer定時(shí)器功能,通過函數(shù)addtimer即可在Swoole中添加一個(gè)定時(shí)器,該定時(shí)器會(huì)在建立之后,按照預(yù)先設(shè)定好的時(shí)間間隔,每到對(duì)應(yīng)的時(shí)間就會(huì)調(diào)用一次回調(diào)函數(shù)onTimer通知Server。
簡(jiǎn)單示例如下:

    $this->serv->on('Timer', array($this, 'onTimer'));

    public function onWorkerStart( $serv , $worker_id) {
        // 在Worker進(jìn)程開啟時(shí)綁定定時(shí)器
        // 只有當(dāng)worker_id為0時(shí)才添加定時(shí)器,避免重復(fù)添加
        if( $worker_id == 0 ) {
            $serv->addtimer(500);
            $serv->addtimer(1000);
            $serv->addtimer(1500);
        }
    }

    public function onTimer($serv, $interval) {
        switch( $interval ) {
            case 500: { // 
                echo "Do Thing A at interval 500\n";
                break;
            }
            case 1000:{
                echo "Do Thing B at interval 1000\n";
                break;
            }
            case 1500:{
                echo "Do Thing C at interval 1500\n";
                break;
            }
        }
    }

可以看到,在onWorkerStart回調(diào)函數(shù)中,通過addtimer添加了三個(gè)定時(shí)器,時(shí)間間隔分別為500、1000、1500。而在onTimer回調(diào)中,正好通過間隔的不同來區(qū)分不同的定時(shí)器回調(diào),從而執(zhí)行不同的操作。
需要注意的是,在上述示例中,當(dāng)1000ms的定時(shí)器被觸發(fā)時(shí),500ms的定時(shí)器同樣會(huì)被觸發(fā),但是不能保證會(huì)在1000ms定時(shí)器前觸發(fā)還是后觸發(fā),因此需要注意,定時(shí)器中的操作不能依賴其他定時(shí)器的執(zhí)行結(jié)果。

點(diǎn)此查看完整示例

(PS:在Swoole-1.7.7版本,新提供了一個(gè)after函數(shù), 這個(gè)功能的用法會(huì)在以后的教程中給出。)

2.心跳檢測(cè)

上文提到過,使用Timer定時(shí)器功能可以實(shí)現(xiàn)發(fā)送心跳包的功能。事實(shí)上,Swoole已經(jīng)內(nèi)置了心跳檢測(cè)功能,能自動(dòng)close掉長(zhǎng)時(shí)間沒有數(shù)據(jù)來往的連接。而開啟心跳檢測(cè)功能,只需要設(shè)置heartbeat_check_intervalheartbeat_idle_time即可。如下:

$this->serv->set(
    array(
        'heartbeat_check_interval' => 60,
        'heartbeat_idle_time' => 600,
    )
);

其中heartbeat_idle_time的默認(rèn)值是heartbeat_check_interval的兩倍。 在設(shè)置這兩個(gè)選項(xiàng)后,swoole會(huì)在內(nèi)部啟動(dòng)一個(gè)線程,每隔heartbeat_check_interval秒后遍歷一次全部連接,檢查最近一次發(fā)送數(shù)據(jù)的時(shí)間和當(dāng)前時(shí)間的差,如果這個(gè)差值大于heartbeat_idle_time,則會(huì)強(qiáng)制關(guān)閉這個(gè)連接,并通過回調(diào)onClose通知Server進(jìn)程。?點(diǎn)此查看完整示例?小技巧: 結(jié)合之前的Timer功能,如果我們想維持連接,就設(shè)置一個(gè)略小于如果這個(gè)差值大于heartbeat_idle_time的定時(shí)器,在定時(shí)器內(nèi)向所有連接發(fā)送一個(gè)心跳包。如果收到心跳回應(yīng),則判斷連接正常,如果沒有收到,則關(guān)閉這個(gè)連接或者再次嘗試發(fā)送。

3.Task進(jìn)階:MySQL連接池

上一章中我簡(jiǎn)單講解了如何開啟和使用Task功能。這一節(jié),我將提供一個(gè)Task的高級(jí)用法。

在PHP中,訪問MySQL數(shù)據(jù)庫(kù)往往是性能提升的瓶頸。而MySQL連接池我想大家都不陌生,這是一個(gè)很好的提升數(shù)據(jù)庫(kù)訪問性能的方式。傳統(tǒng)的MySQL連接池,是預(yù)先申請(qǐng)一定數(shù)量的連接,每一個(gè)新的請(qǐng)求都會(huì)占用其中一個(gè)連接,請(qǐng)求結(jié)束后再將連接放回池中,如果所有連接都被占用,新來的連接則會(huì)進(jìn)入等待狀態(tài)。
知道了MySQL連接池的實(shí)現(xiàn)原理,那我們來看如何使用Swoole實(shí)現(xiàn)一個(gè)連接池。
首先,Swoole允許開啟一定量的Task Worker進(jìn)程,我們可以讓每個(gè)進(jìn)程都擁有一個(gè)MySQL連接,并保持這個(gè)連接,這樣,我們就創(chuàng)建了一個(gè)連接池。
其次,設(shè)置swoole的dispatch_mode為搶占模式(主進(jìn)程會(huì)根據(jù)Worker的忙閑狀態(tài)選擇投遞,只會(huì)投遞給處于閑置狀態(tài)的Worker)。這樣,每個(gè)task都會(huì)被投遞給閑置的Task Worker。這樣,我們保證了每個(gè)新的task都會(huì)被閑置的Task Worker處理,如果全部Task Worker都被占用,則會(huì)進(jìn)入等待隊(duì)列。

下面直接上關(guān)鍵代碼:

public function onWorkerStart( $serv , $worker_id) {
    echo "onWorkerStart\n";
    // 判定是否為Task Worker進(jìn)程
    if( $worker_id >= $serv->setting['worker_num'] ) {
        $this->pdo = new PDO(
            "mysql:host=localhost;port=3306;dbname=Test", 
            "root", 
            "123456", 
            array(
                PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
                PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
                PDO::ATTR_PERSISTENT => true
            )
        );
    }
}

首先,在每個(gè)Task Worker進(jìn)程中,創(chuàng)建一個(gè)MySQL連接。這里我選用了PDO擴(kuò)展。

public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
    $sql = array(
        'sql'=>'select * from Test where pid > ?',
        'param' => array(
            0
        ),
        'fd' => $fd
    );
    $serv->task( json_encode($sql) );
}

其次,在需要的時(shí)候,通過task函數(shù)投遞一個(gè)任務(wù)(也就是發(fā)起一次SQL請(qǐng)求)

public function onTask($serv,$task_id,$from_id, $data) {
    $sql = json_decode( $data , true );

    $statement = $this->pdo->prepare($sql['sql']);
    $statement->execute($sql['param']);     

    $result = $statement->fetchAll(PDO::FETCH_ASSOC);
    $serv->send( $sql['fd'],json_encode($result));
    return true;
}

最后,在onTask回調(diào)中,根據(jù)請(qǐng)求過來的SQL語(yǔ)句以及相應(yīng)的參數(shù),發(fā)起一次MySQL請(qǐng)求,并將獲取到的結(jié)果通過send發(fā)送給客戶端(或者通過return返回給Worker進(jìn)程)。而且,這樣的一次MySQL請(qǐng)求還不會(huì)阻塞Worker進(jìn)程,Worker進(jìn)程可以繼續(xù)處理其他的邏輯。

可以看到,簡(jiǎn)單十幾行代碼,就實(shí)現(xiàn)了一個(gè)高效的異步MySQL連接池。
通過測(cè)試,單個(gè)客戶端一共發(fā)起1W次select請(qǐng)求,共耗時(shí)9s;
1W次insert請(qǐng)求,共耗時(shí)21s。
(客戶端會(huì)在每次收到前一個(gè)請(qǐng)求的結(jié)果后才會(huì)發(fā)起下一次請(qǐng)求,而不是并發(fā))。

點(diǎn)此查看完整服務(wù)端代碼
點(diǎn)此查看完整客戶端代碼

4.Task實(shí)戰(zhàn):yii中應(yīng)用task

在YII框架中結(jié)合了swoole 的task 做了異步處理。 本例中 主要用到 1、protected/commands/ServerCommand.php 用來做server。 2、protected/event/下的文件 這里是在異步中的具體實(shí)現(xiàn)。

客戶端調(diào)用參照 TestController

<?php
class TestController extends Controller{
    public function actionTT(){
        $message['uid'] = 2;
        $message['email'] = '83212019@qq.com';
        $message['title'] = '接口報(bào)警郵件';
        $message['contents'] = "'EmailEvent'接口請(qǐng)求過程出錯(cuò)! 錯(cuò)誤信息如下:err_no:'00000' err_msg:'測(cè)試隊(duì)列' 請(qǐng)求參數(shù)為:'[]'";
        $message['type'] = 2;

        $data['param'] = $message;
        $data['class'] = 'Email';
        $client = new EventClient();
        $data = $client->send($data);
    }
}
?>

有個(gè)task表是用來記錄異步任務(wù)的。如果失敗重試3次。sql在protected/data/sql.sql里。
點(diǎn)此查看完整客戶端代碼

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)