Rust 將單線程 server 變?yōu)槎嗑€程 server

2023-03-22 15:16 更新
ch20-02-multithreaded.md
commit 95b5e7c86d33e98eec6f73b268d106621f3d24a1

目前 server 會依次處理每一個請求,意味著它在完成第一個連接的處理之前不會處理第二個連接。如果 server 正接收越來越多的請求,這類串行操作會使性能越來越差。如果一個請求花費很長時間來處理,隨后而來的請求則不得不等待這個長請求結(jié)束,即便這些新請求可以很快就處理完。我們需要修復(fù)這種情況,不過首先讓我們實際嘗試一下這個問題。

在當前 server 實現(xiàn)中模擬慢請求

讓我們看看一個慢請求如何影響當前 server 實現(xiàn)中的其他請求。示例 20-10 通過模擬慢響應(yīng)實現(xiàn)了 /sleep 請求處理,它會使 server 在響應(yīng)之前休眠五秒。

文件名: src/main.rs

use std::thread;
use std::time::Duration;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    // --snip--
}

示例 20-10: 通過識別 /sleep 并休眠五秒來模擬慢請求

這段代碼有些凌亂,不過對于模擬的目的來說已經(jīng)足夠。這里創(chuàng)建了第二個請求 sleep,我們會識別其數(shù)據(jù)。在 if 塊之后增加了一個 else if 來檢查 /sleep 請求,當接收到這個請求時,在渲染成功 HTML 頁面之前會先休眠五秒。

現(xiàn)在就可以真切的看出我們的 server 有多么的原始:真實的庫將會以更簡潔的方式處理多請求識別問題!使用 cargo run 啟動 server,并接著打開兩個瀏覽器窗口:一個請求 http://127.0.0.1:7878/ 而另一個請求 http://127.0.0.1:7878/sleep 。如果像之前一樣多次請求 /,會發(fā)現(xiàn)響應(yīng)的比較快速。不過如果請求 /sleep 之后在請求 /,就會看到 / 會等待直到 sleep 休眠完五秒之后才出現(xiàn)。

這里有多種辦法來改變我們的 web server 使其避免所有請求都排在慢請求之后;我們將要實現(xiàn)的一個便是線程池。

使用線程池改善吞吐量

線程池thread pool)是一組預(yù)先分配的等待或準備處理任務(wù)的線程。當程序收到一個新任務(wù),線程池中的一個線程會被分配任務(wù),這個線程會離開并處理任務(wù)。其余的線程則可用于處理在第一個線程處理任務(wù)的同時處理其他接收到的任務(wù)。當?shù)谝粋€線程處理完任務(wù)時,它會返回空閑線程池中等待處理新任務(wù)。線程池允許我們并發(fā)處理連接,增加 server 的吞吐量。

我們會將池中線程限制為較少的數(shù)量,以防拒絕服務(wù)(Denial of Service, DoS)攻擊;如果程序為每一個接收的請求都新建一個線程,某人向 server 發(fā)起千萬級的請求時會耗盡服務(wù)器的資源并導致所有請求的處理都被終止。

不同于分配無限的線程,線程池中將有固定數(shù)量的等待線程。當新進請求時,將請求發(fā)送到線程池中做處理。線程池會維護一個接收請求的隊列。每一個線程會從隊列中取出一個請求,處理請求,接著向?qū)﹃犃兴魅×硪粋€請求。通過這種設(shè)計,則可以并發(fā)處理 N 個請求,其中 N 為線程數(shù)。如果每一個線程都在響應(yīng)慢請求,之后的請求仍然會阻塞隊列,不過相比之前增加了能處理的慢請求的數(shù)量。

這個設(shè)計僅僅是多種改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和單線程異步 I/O 模型。如果你對這個主題感興趣,則可以閱讀更多關(guān)于其他解決方案的內(nèi)容并嘗試用 Rust 實現(xiàn)他們;對于一個像 Rust 這樣的底層語言,所有這些方法都是可能的。

在開始之前,讓我們討論一下線程池應(yīng)用看起來怎樣。當嘗試設(shè)計代碼時,首先編寫客戶端接口確實有助于指導代碼設(shè)計。以期望的調(diào)用方式來構(gòu)建 API 代碼的結(jié)構(gòu),接著在這個結(jié)構(gòu)之內(nèi)實現(xiàn)功能,而不是先實現(xiàn)功能再設(shè)計公有 API。

類似于第十二章項目中使用的測試驅(qū)動開發(fā)。這里將要使用編譯器驅(qū)動開發(fā)(compiler-driven development)。我們將編寫調(diào)用所期望的函數(shù)的代碼,接著觀察編譯器錯誤告訴我們接下來需要修改什么使得代碼可以工作。

為每一個請求分配線程的代碼結(jié)構(gòu)

首先,讓我們探索一下為每一個連接都創(chuàng)建一個線程的代碼看起來如何。這并不是最終方案,因為正如之前講到的它會潛在的分配無限的線程,不過這是一個開始。示例 20-11 展示了 main 的改變,它在 for 循環(huán)中為每一個流分配了一個新線程進行處理:

文件名: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

示例 20-11: 為每一個流新建一個線程

正如第十六章講到的,thread::spawn 會創(chuàng)建一個新線程并在其中運行閉包中的代碼。如果運行這段代碼并在在瀏覽器中加載 /sleep,接著在另兩個瀏覽器標簽頁中加載 /,確實會發(fā)現(xiàn) / 請求不必等待 /sleep 結(jié)束。不過正如之前提到的,這最終會使系統(tǒng)崩潰因為我們無限制的創(chuàng)建新線程。

為有限數(shù)量的線程創(chuàng)建一個類似的接口

我們期望線程池以類似且熟悉的方式工作,以便從線程切換到線程池并不會對使用該 API 的代碼做出較大的修改。示例 20-12 展示我們希望用來替換 thread::spawn 的 ThreadPool 結(jié)構(gòu)體的假想接口:

文件名: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

示例 20-12: 假想的 ThreadPool 接口

這里使用 ThreadPool::new 來創(chuàng)建一個新的線程池,它有一個可配置的線程數(shù)的參數(shù),在這里是四。這樣在 for 循環(huán)中,pool.execute 有著類似 thread::spawn 的接口,它獲取一個線程池運行于每一個流的閉包。pool.execute 需要實現(xiàn)為獲取閉包并傳遞給池中的線程運行。這段代碼還不能編譯,不過通過嘗試編譯器會指導我們?nèi)绾涡迯?fù)它。

采用編譯器驅(qū)動構(gòu)建 ThreadPool 結(jié)構(gòu)體

繼續(xù)并對示例 20-12 中的 src/main.rs 做出修改,并利用來自 cargo check 的編譯器錯誤來驅(qū)動開發(fā)。下面是我們得到的第一個錯誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:10:16
   |
10 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error

好的,這告訴我們需要一個 ThreadPool 類型或模塊,所以我們將構(gòu)建一個。ThreadPool 的實現(xiàn)會與 web server 的特定工作相獨立,所以讓我們從 hello crate 切換到存放 ThreadPool 實現(xiàn)的新庫 crate。這也意味著可以在任何工作中使用這個單獨的線程池庫,而不僅僅是處理網(wǎng)絡(luò)請求。

創(chuàng)建 src/lib.rs 文件,它包含了目前可用的最簡單的 ThreadPool 定義:

文件名: src/lib.rs

pub struct ThreadPool;

接著創(chuàng)建一個新目錄,src/bin,并將二進制 crate 根文件從 src/main.rs 移動到 src/bin/main.rs。這使得庫 crate 成為 hello 目錄的主要 crate;不過仍然可以使用 cargo run 運行 src/bin/main.rs 二進制文件。移動了 main.rs 文件之后,修改 src/bin/main.rs 文件開頭加入如下代碼來引入庫 crate 并將 ThreadPool 引入作用域:

文件名: src/bin/main.rs

use hello::ThreadPool;

這仍然不能工作,再次嘗試運行來得到下一個需要解決的錯誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/bin/main.rs:11:28
   |
11 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

這告訴我們下一步是為 ThreadPool 創(chuàng)建一個叫做 new 的關(guān)聯(lián)函數(shù)。我們還知道 new 需要有一個參數(shù)可以接受 4,而且 new 應(yīng)該返回 ThreadPool 實例。讓我們實現(xiàn)擁有此特征的最小化 new 函數(shù):

文件夾: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

這里選擇 usize 作為 size 參數(shù)的類型,因為我們知道為負的線程數(shù)沒有意義。我們還知道將使用 4 作為線程集合的元素數(shù)量,這也就是使用 usize 類型的原因,如第三章 “整型” 部分所講。

再次編譯檢查這段代碼:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/bin/main.rs:16:14
   |
16 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

現(xiàn)在有了一個警告和一個錯誤。暫時先忽略警告,發(fā)生錯誤是因為并沒有 ThreadPool 上的 execute 方法?;貞?nbsp;“為有限數(shù)量的線程創(chuàng)建一個類似的接口” 部分我們決定線程池應(yīng)該有與 thread::spawn 類似的接口,同時我們將實現(xiàn) execute 函數(shù)來獲取傳遞的閉包并將其傳遞給池中的空閑線程執(zhí)行。

我們會在 ThreadPool 上定義 execute 函數(shù)來獲取一個閉包參數(shù)?;貞浀谑碌?nbsp;“使用帶有泛型和 Fn trait 的閉包” 部分,閉包作為參數(shù)時可以使用三個不同的 trait:FnFnMut 和 FnOnce。我們需要決定這里應(yīng)該使用哪種閉包。最終需要實現(xiàn)的類似于標準庫的 thread::spawn,所以我們可以觀察 thread::spawn 的簽名在其參數(shù)中使用了何種 bound。查看文檔會發(fā)現(xiàn):

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

F 是這里我們關(guān)心的參數(shù);T 與返回值有關(guān)所以我們并不關(guān)心??紤]到 spawn 使用 FnOnce 作為 F 的 trait bound,這可能也是我們需要的,因為最終會將傳遞給 execute 的參數(shù)傳給 spawn。因為處理請求的線程只會執(zhí)行閉包一次,這也進一步確認了 FnOnce 是我們需要的 trait,這里符合 FnOnce 中 Once 的意思。

F 還有 trait bound Send 和生命周期綁定 'static,這對我們的情況也是有意義的:需要 Send 來將閉包從一個線程轉(zhuǎn)移到另一個線程,而 'static 是因為并不知道線程會執(zhí)行多久。讓我們編寫一個使用帶有這些 bound 的泛型參數(shù) F 的 ThreadPool 的 execute 方法:

文件名: src/lib.rs

impl ThreadPool {
    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

FnOnce trait 仍然需要之后的 (),因為這里的 FnOnce 代表一個沒有參數(shù)也沒有返回值的閉包。正如函數(shù)的定義,返回值類型可以從簽名中省略,不過即便沒有參數(shù)也需要括號。

這里再一次增加了 execute 方法的最小化實現(xiàn):它沒有做任何工作,只是嘗試讓代碼能夠編譯。再次進行檢查:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

現(xiàn)在就只有警告了!這意味著能夠編譯了!注意如果嘗試 cargo run 運行程序并在瀏覽器中發(fā)起請求,仍會在瀏覽器中出現(xiàn)在本章開始時那樣的錯誤。這個庫實際上還沒有調(diào)用傳遞給 execute 的閉包!

一個你可能聽說過的關(guān)于像 Haskell 和 Rust 這樣有嚴格編譯器的語言的說法是 “如果代碼能夠編譯,它就能工作”。這是一個提醒大家的好時機,實際上這并不是普適的。我們的項目可以編譯,不過它完全沒有做任何工作!如果構(gòu)建一個真實且功能完整的項目,則需花費大量的時間來開始編寫單元測試來檢查代碼能否編譯 并且 擁有期望的行為。

在 new 中驗證池中線程數(shù)量

這里仍然存在警告是因為其并沒有對 new 和 execute 的參數(shù)做任何操作。讓我們用期望的行為來實現(xiàn)這些函數(shù)。以考慮 new 作為開始。之前選擇使用無符號類型作為 size 參數(shù)的類型,因為線程數(shù)為負的線程池沒有意義。然而,線程數(shù)為零的線程池同樣沒有意義,不過零是一個完全有效的 u32 值。讓我們增加在返回 ThreadPool 實例之前檢查 size 是否大于零的代碼,并使用 assert! 宏在得到零時 panic,如示例 20-13 所示:

文件名: src/lib.rs

impl ThreadPool {
    /// 創(chuàng)建線程池。
    ///
    /// 線程池中線程的數(shù)量。
    ///
    /// # Panics
    ///
    /// `new` 函數(shù)在 size 為 0 時會 panic。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}

示例 20-13: 實現(xiàn) ThreadPool::new 在 size 為零時 panic

這里用文檔注釋為 ThreadPool 增加了一些文檔。注意這里遵循了良好的文檔實踐并增加了一個部分來提示函數(shù)會 panic 的情況,正如第十四章所討論的。嘗試運行 cargo doc --open 并點擊 ThreadPool 結(jié)構(gòu)體來查看生成的 new 的文檔看起來如何!

相比像這里使用 assert! 宏,也可以讓 new 像之前 I/O 項目中示例 12-9 中 Config::new 那樣返回一個 Result,不過在這里我們選擇創(chuàng)建一個沒有任何線程的線程池應(yīng)該是不可恢復(fù)的錯誤。如果你想做的更好,嘗試編寫一個采用如下簽名的 new 版本來感受一下兩者的區(qū)別:

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

分配空間以儲存線程

現(xiàn)在有了一個有效的線程池線程數(shù),就可以實際創(chuàng)建這些線程并在返回之前將他們儲存在 ThreadPool 結(jié)構(gòu)體中。不過如何 “儲存” 一個線程?讓我們再看看 thread::spawn 的簽名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn 返回 JoinHandle<T>,其中 T 是閉包返回的類型。嘗試使用 JoinHandle 來看看會發(fā)生什么。在我們的情況中,傳遞給線程池的閉包會處理連接并不返回任何值,所以 T 將會是單元類型 ()。

示例 20-14 中的代碼可以編譯,不過實際上還并沒有創(chuàng)建任何線程。我們改變了 ThreadPool 的定義來存放一個 thread::JoinHandle<()> 的 vector 實例,使用 size 容量來初始化,并設(shè)置一個 for 循環(huán)了來運行創(chuàng)建線程的代碼,并返回包含這些線程的 ThreadPool 實例:

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }

    // --snip--
}

示例 20-14: 為 ThreadPool 創(chuàng)建一個 vector 來存放線程

這里將 std::thread 引入庫 crate 的作用域,因為使用了 thread::JoinHandle 作為 ThreadPool 中 vector 元素的類型。

在得到了有效的數(shù)量之后,ThreadPool 新建一個存放 size 個元素的 vector。本書還未使用過 with_capacity,它與 Vec::new 做了同樣的工作,不過有一個重要的區(qū)別:它為 vector 預(yù)先分配空間。因為已經(jīng)知道了 vector 中需要 size 個元素,預(yù)先進行分配比僅僅 Vec::new 要稍微有效率一些,因為 Vec::new 隨著插入元素而重新改變大小。

如果再次運行 cargo check,會看到一些警告,不過應(yīng)該可以編譯成功。

Worker 結(jié)構(gòu)體負責從 ThreadPool 中將代碼傳遞給線程

示例 20-14 的 for 循環(huán)中留下了一個關(guān)于創(chuàng)建線程的注釋。如何實際創(chuàng)建線程呢?這是一個難題。標準庫提供的創(chuàng)建線程的方法,thread::spawn,它期望獲取一些一旦創(chuàng)建線程就應(yīng)該執(zhí)行的代碼。然而,我們希望開始線程并使其等待稍后傳遞的代碼。標準庫的線程實現(xiàn)并沒有包含這么做的方法;我們必須自己實現(xiàn)。

我們將要實現(xiàn)的行為是創(chuàng)建線程并稍后發(fā)送代碼,這會在 ThreadPool 和線程間引入一個新數(shù)據(jù)類型來管理這種新行為。這個數(shù)據(jù)結(jié)構(gòu)稱為 Worker:這是一個池實現(xiàn)中的常見概念。想象一下在餐館廚房工作的員工:員工等待來自客戶的訂單,他們負責接受這些訂單并完成它們。

不同于在線程池中儲存一個 JoinHandle<()> 實例的 vector,我們會儲存 Worker 結(jié)構(gòu)體的實例。每一個 Worker 會儲存一個單獨的 JoinHandle<()> 實例。接著會在 Worker 上實現(xiàn)一個方法,它會獲取需要允許代碼的閉包并將其發(fā)送給已經(jīng)運行的線程執(zhí)行。我們還會賦予每一個 worker id,這樣就可以在日志和調(diào)試中區(qū)別線程池中的不同 worker。

首先,讓我們做出如此創(chuàng)建 ThreadPool 時所需的修改。在通過如下方式設(shè)置完 Worker 之后,我們會實現(xiàn)向線程發(fā)送閉包的代碼:

  1. 定義 ?Worker? 結(jié)構(gòu)體存放 ?id? 和 ?JoinHandle<()>?
  2. 修改 ?ThreadPool? 存放一個 ?Worker? 實例的 vector
  3. 定義 ?Worker::new? 函數(shù),它獲取一個 ?id? 數(shù)字并返回一個帶有 ?id? 和用空閉包分配的線程的 ?Worker? 實例
  4. 在 ?ThreadPool::new? 中,使用 ?for? 循環(huán)計數(shù)生成 ?id?,使用這個 ?id? 新建 ?Worker?,并儲存進 vector 中

如果你渴望挑戰(zhàn),在查示例 20-15 中的代碼之前嘗試自己實現(xiàn)這些修改。

準備好了嗎?示例 20-15 就是一個做出了這些修改的例子:

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

示例 20-15: 修改 ThreadPool 存放 Worker 實例而不是直接存放線程

這里將 ThreadPool 中字段名從 threads 改為 workers,因為它現(xiàn)在儲存 Worker 而不是 JoinHandle<()>。使用 for 循環(huán)中的計數(shù)作為 Worker::new 的參數(shù),并將每一個新建的 Worker 儲存在叫做 workers 的 vector 中。

Worker 結(jié)構(gòu)體和其 new 函數(shù)是私有的,因為外部代碼(比如 src/bin/main.rs 中的 server)并不需要知道關(guān)于 ThreadPool 中使用 Worker 結(jié)構(gòu)體的實現(xiàn)細節(jié)。Worker::new 函數(shù)使用 id 參數(shù)并儲存了使用一個空閉包創(chuàng)建的 JoinHandle<()>。

這段代碼能夠編譯并用指定給 ThreadPool::new 的參數(shù)創(chuàng)建儲存了一系列的 Worker 實例,不過 仍然 沒有處理 execute 中得到的閉包。讓我們聊聊接下來怎么做。

使用信道向線程發(fā)送請求

下一個需要解決的問題是傳遞給 thread::spawn 的閉包完全沒有做任何工作。目前,我們在 execute 方法中獲得期望執(zhí)行的閉包,不過在創(chuàng)建 ThreadPool 的過程中創(chuàng)建每一個 Worker 時需要向 thread::spawn 傳遞一個閉包。

我們希望剛創(chuàng)建的 Worker 結(jié)構(gòu)體能夠從 ThreadPool 的隊列中獲取需要執(zhí)行的代碼,并發(fā)送到線程中執(zhí)行他們。

在第十六章,我們學習了 信道 —— 一個溝通兩個線程的簡單手段 —— 對于這個例子來說則是絕佳的。這里信道將充當任務(wù)隊列的作用,execute 將通過 ThreadPool 向其中線程正在尋找工作的 Worker 實例發(fā)送任務(wù)。如下是這個計劃:

  1. ?ThreadPool? 會創(chuàng)建一個信道并充當發(fā)送端。
  2. 每個 ?Worker? 將會充當信道的接收端。
  3. 新建一個 ?Job? 結(jié)構(gòu)體來存放用于向信道中發(fā)送的閉包。
  4. ?execute? 方法會在信道發(fā)送端發(fā)出期望執(zhí)行的任務(wù)。
  5. 在線程中,?Worker? 會遍歷信道的接收端并執(zhí)行任何接收到的任務(wù)。

讓我們以在 ThreadPool::new 中創(chuàng)建信道并讓 ThreadPool 實例充當發(fā)送端開始,如示例 20-16 所示。Job 是將在信道中發(fā)出的類型,目前它是一個沒有任何內(nèi)容的結(jié)構(gòu)體:

文件名: src/lib.rs

// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

示例 20-16: 修改 ThreadPool 來儲存一個發(fā)送 Job 實例的信道發(fā)送端

在 ThreadPool::new 中,新建了一個信道,并接著讓線程池在接收端等待。這段代碼能夠編譯,不過仍有警告。

讓我們嘗試在線程池創(chuàng)建每個 worker 時將信道的接收端傳遞給他們。須知我們希望在 worker 所分配的線程中使用信道的接收端,所以將在閉包中引用 receiver 參數(shù)。示例 20-17 中展示的代碼還不能編譯:

文件名: src/lib.rs

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-17: 將信道的接收端傳遞給 worker

這是一些小而直觀的修改:將信道的接收端傳遞進了 Worker::new,并接著在閉包中使用它。

如果嘗試 check 代碼,會得到這個錯誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:27:42
   |
22 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
27 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error

這段代碼嘗試將 receiver 傳遞給多個 Worker 實例。這是不行的,回憶第十六章:Rust 所提供的信道實現(xiàn)是多 生產(chǎn)者,單 消費者 的。這意味著不能簡單的克隆信道的消費端來解決問題。即便可以,那也不是我們希望使用的技術(shù);我們希望通過在所有的 worker 中共享單一 receiver,在線程間分發(fā)任務(wù)。

另外,從信道隊列中取出任務(wù)涉及到修改 receiver,所以這些線程需要一個能安全的共享和修改 receiver 的方式,否則可能導致競爭狀態(tài)(參考第十六章)。

回憶一下第十六章討論的線程安全智能指針,為了在多個線程間共享所有權(quán)并允許線程修改其值,需要使用 Arc<Mutex<T>>Arc 使得多個 worker 擁有接收端,而 Mutex 則確保一次只有一個 worker 能從接收端得到任務(wù)。示例 20-18 展示了所需的修改:

文件名: src/lib.rs

use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}

示例 20-18: 使用 Arc 和 Mutex 在 worker 間共享信道的接收端

在 ThreadPool::new 中,將信道的接收端放入一個 Arc 和一個 Mutex 中。對于每一個新 worker,克隆 Arc 來增加引用計數(shù),如此這些 worker 就可以共享接收端的所有權(quán)了。

通過這些修改,代碼可以編譯了!我們做到了!

實現(xiàn) execute 方法

最后讓我們實現(xiàn) ThreadPool 上的 execute 方法。同時也要修改 Job 結(jié)構(gòu)體:它將不再是結(jié)構(gòu)體,Job 將是一個有著 execute 接收到的閉包類型的 trait 對象的類型別名。第十九章 “類型別名用來創(chuàng)建類型同義詞” 部分提到過,類型別名允許將長的類型變短。觀察示例 20-19:

文件名: src/lib.rs

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

示例 20-19: 為存放每一個閉包的 Box 創(chuàng)建一個 Job 類型別名,接著在信道中發(fā)出任務(wù)

在使用 execute 得到的閉包新建 Job 實例之后,將這些任務(wù)從信道的發(fā)送端發(fā)出。這里調(diào)用 send 上的 unwrap,因為發(fā)送可能會失敗,這可能發(fā)生于例如停止了所有線程執(zhí)行的情況,這意味著接收端停止接收新消息了。不過目前我們無法停止線程執(zhí)行;只要線程池存在他們就會一直執(zhí)行。使用 unwrap 是因為我們知道失敗不可能發(fā)生,即便編譯器不這么認為。

不過到此事情還沒有結(jié)束!在 worker 中,傳遞給 thread::spawn 的閉包仍然還只是 引用 了信道的接收端。相反我們需要閉包一直循環(huán),向信道的接收端請求任務(wù),并在得到任務(wù)時執(zhí)行他們。如示例 20-20 對 Worker::new 做出修改:

文件名: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {} got a job; executing.", id);

            job();
        });

        Worker { id, thread }
    }
}

示例 20-20: 在 worker 線程中接收并執(zhí)行任務(wù)

這里,首先在 receiver 上調(diào)用了 lock 來獲取互斥器,接著 unwrap 在出現(xiàn)任何錯誤時 panic。如果互斥器處于一種叫做 被污染poisoned)的狀態(tài)時獲取鎖可能會失敗,這可能發(fā)生于其他線程在持有鎖時 panic 了且沒有釋放鎖。在這種情況下,調(diào)用 unwrap 使其 panic 是正確的行為。請隨意將 unwrap 改為包含有意義錯誤信息的 expect。

如果鎖定了互斥器,接著調(diào)用 recv 從信道中接收 Job。最后的 unwrap 也繞過了一些錯誤,這可能發(fā)生于持有信道發(fā)送端的線程停止的情況,類似于如果接收端關(guān)閉時 send 方法如何返回 Err 一樣。

調(diào)用 recv 會阻塞當前線程,所以如果還沒有任務(wù),其會等待直到有可用的任務(wù)。Mutex<T> 確保一次只有一個 Worker 線程嘗試請求任務(wù)。

現(xiàn)在線程池處于可以運行的狀態(tài)了!執(zhí)行 cargo run 并發(fā)起一些請求:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: 3 warnings emitted

    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/main`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功了!現(xiàn)在我們有了一個可以異步執(zhí)行連接的線程池!它絕不會創(chuàng)建超過四個線程,所以當 server 收到大量請求時系統(tǒng)也不會負擔過重。如果請求 /sleep,server 也能夠通過另外一個線程處理其他請求。

注意如果同時在多個瀏覽器窗口打開 /sleep,它們可能會彼此間隔地加載 5 秒,因為一些瀏覽器處于緩存的原因會順序執(zhí)行相同請求的多個實例。這些限制并不是由于我們的 web server 造成的。

在學習了第十八章的 while let 循環(huán)之后,你可能會好奇為何不能如此編寫 worker 線程,如示例 20-21 所示:

文件名: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {} got a job; executing.", id);

                job();
            }
        });

        Worker { id, thread }
    }
}

示例 20-21: 一個使用 while let 的 Worker::new 替代實現(xiàn)

這段代碼可以編譯和運行,但是并不會產(chǎn)生所期望的線程行為:一個慢請求仍然會導致其他請求等待執(zhí)行。其原因有些微妙:Mutex 結(jié)構(gòu)體沒有公有 unlock 方法,因為鎖的所有權(quán)依賴 lock 方法返回的 LockResult<MutexGuard<T>> 中 MutexGuard<T> 的生命周期。這允許借用檢查器在編譯時確保絕不會在沒有持有鎖的情況下訪問由 Mutex 守護的資源,不過如果沒有認真的思考 MutexGuard<T> 的生命周期的話,也可能會導致比預(yù)期更久的持有鎖。

示例 20-20 中的代碼使用的 let job = receiver.lock().unwrap().recv().unwrap(); 之所以可以工作是因為對于 let 來說,當 let 語句結(jié)束時任何表達式中等號右側(cè)使用的臨時值都會立即被丟棄。然而 while letif let 和 match)直到相關(guān)的代碼塊結(jié)束都不會丟棄臨時值。在示例 20-21 中,job() 調(diào)用期間鎖一直持續(xù),這也意味著其他的 worker 無法接受任務(wù)。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號