Rust 使用消息傳遞在線程間傳送數(shù)據(jù)

2023-03-22 15:15 更新
ch16-02-message-passing.md
commit 24e275d624fe85af7b5b6316e78f8bfbbcac23e7

一個日益流行的確保安全并發(fā)的方式是 消息傳遞message passing),這里線程或 actor 通過發(fā)送包含數(shù)據(jù)的消息來相互溝通。這個思想來源于 Go 編程語言文檔中 的口號:“不要通過共享內(nèi)存來通訊;而是通過通訊來共享內(nèi)存?!保ā癉o not communicate by sharing memory; instead, share memory by communicating.”)

Rust 中一個實現(xiàn)消息傳遞并發(fā)的主要工具是 信道channel),Rust 標準庫提供了其實現(xiàn)的編程概念。你可以將其想象為一個水流的渠道,比如河流或小溪。如果你將諸如橡皮鴨或小船之類的東西放入其中,它們會順流而下到達下游。

編程中的信息渠道(信道)有兩部分組成,一個發(fā)送者(transmitter)和一個接收者(receiver)。發(fā)送者位于上游位置,在這里可以將橡皮鴨放入河中,接收者則位于下游,橡皮鴨最終會漂流至此。代碼中的一部分調(diào)用發(fā)送者的方法以及希望發(fā)送的數(shù)據(jù),另一部分則檢查接收端收到的消息。當發(fā)送者或接收者任一被丟棄時可以認為信道被 關(guān)閉closed)了。

這里,我們將開發(fā)一個程序,它會在一個線程生成值向信道發(fā)送,而在另一個線程會接收值并打印出來。這里會通過信道在線程間發(fā)送簡單值來演示這個功能。一旦你熟悉了這項技術(shù),就能使用信道來實現(xiàn)聊天系統(tǒng),或利用很多線程進行分布式計算并將部分計算結(jié)果發(fā)送給一個線程進行聚合。

首先,在示例 16-6 中,創(chuàng)建了一個信道但沒有做任何事。注意這還不能編譯,因為 Rust 不知道我們想要在信道中發(fā)送什么類型:

文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

示例 16-6: 創(chuàng)建一個信道,并將其兩端賦值給 tx 和 rx

這里使用 mpsc::channel 函數(shù)創(chuàng)建一個新的信道;mpsc 是 多個生產(chǎn)者,單個消費者multiple producer, single consumer)的縮寫。簡而言之,Rust 標準庫實現(xiàn)信道的方式意味著一個信道可以有多個產(chǎn)生值的 發(fā)送sending)端,但只能有一個消費這些值的 接收receiving)端。想象一下多條小河小溪最終匯聚成大河:所有通過這些小河發(fā)出的東西最后都會來到下游的大河。目前我們以單個生產(chǎn)者開始,但是當示例可以工作后會增加多個生產(chǎn)者。

mpsc::channel 函數(shù)返回一個元組:第一個元素是發(fā)送端,而第二個元素是接收端。由于歷史原因,tx 和 rx 通常作為 發(fā)送者transmitter)和 接收者receiver)的縮寫,所以這就是我們將用來綁定這兩端變量的名字。這里使用了一個 let 語句和模式來解構(gòu)了此元組;第十八章會討論 let 語句中的模式和解構(gòu)。如此使用 let 語句是一個方便提取 mpsc::channel 返回的元組中一部分的手段。

讓我們將發(fā)送端移動到一個新建線程中并發(fā)送一個字符串,這樣新建線程就可以和主線程通訊了,如示例 16-7 所示。這類似于在河的上游扔下一只橡皮鴨或從一個線程向另一個線程發(fā)送聊天信息:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

示例 16-7: 將 tx 移動到一個新建的線程中并發(fā)送 “hi”

這里再次使用 thread::spawn 來創(chuàng)建一個新線程并使用 move 將 tx 移動到閉包中這樣新建線程就擁有 tx 了。新建線程需要擁有信道的發(fā)送端以便能向信道發(fā)送消息。

信道的發(fā)送端有一個 send 方法用來獲取需要放入信道的值。send 方法返回一個 Result<T, E> 類型,所以如果接收端已經(jīng)被丟棄了,將沒有發(fā)送值的目標,所以發(fā)送操作會返回錯誤。在這個例子中,出錯的時候調(diào)用 unwrap 產(chǎn)生 panic。不過對于一個真實程序,需要合理地處理它:回到第九章復(fù)習(xí)正確處理錯誤的策略。

在示例 16-8 中,我們在主線程中從信道的接收端獲取值。這類似于在河的下游撈起橡皮鴨或接收聊天信息:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

示例 16-8: 在主線程中接收并打印內(nèi)容 “hi”

信道的接收端有兩個有用的方法:recv 和 try_recv。這里,我們使用了 recv,它是 receive 的縮寫。這個方法會阻塞主線程執(zhí)行直到從信道中接收一個值。一旦發(fā)送了一個值,recv 會在一個 Result<T, E> 中返回它。當信道發(fā)送端關(guān)閉,recv 會返回一個錯誤表明不會再有新的值到來了。

try_recv 不會阻塞,相反它立刻返回一個 Result<T, E>Ok 值包含可用的信息,而 Err 值代表此時沒有任何消息。如果線程在等待消息過程中還有其他工作時使用 try_recv 很有用:可以編寫一個循環(huán)來頻繁調(diào)用 try_recv,在有可用消息時進行處理,其余時候則處理一會其他工作直到再次檢查。

出于簡單的考慮,這個例子使用了 recv;主線程中除了等待消息之外沒有任何其他工作,所以阻塞主線程是合適的。

如果運行示例 16-8 中的代碼,我們將會看到主線程打印出這個值:

Got: hi

完美!

信道與所有權(quán)轉(zhuǎn)移

所有權(quán)規(guī)則在消息傳遞中扮演了重要角色,其有助于我們編寫安全的并發(fā)代碼。防止并發(fā)編程中的錯誤是在 Rust 程序中考慮所有權(quán)的一大優(yōu)勢?,F(xiàn)在讓我們做一個試驗來看看信道與所有權(quán)如何一同協(xié)作以避免產(chǎn)生問題:我們將嘗試在新建線程中的信道中發(fā)送完 val 值 之后 再使用它。嘗試編譯示例 16-9 中的代碼并看看為何這是不允許的:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

示例 16-9: 在我們已經(jīng)發(fā)送到信道中后,嘗試使用 val 引用

這里嘗試在通過 tx.send 發(fā)送 val 到信道中之后將其打印出來。允許這么做是一個壞主意:一旦將值發(fā)送到另一個線程后,那個線程可能會在我們再次使用它之前就將其修改或者丟棄。其他線程對值可能的修改會由于不一致或不存在的數(shù)據(jù)而導(dǎo)致錯誤或意外的結(jié)果。然而,嘗試編譯示例 16-9 的代碼時,Rust 會給出一個錯誤:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

我們的并發(fā)錯誤會造成一個編譯時錯誤。send 函數(shù)獲取其參數(shù)的所有權(quán)并移動這個值歸接收者所有。這可以防止在發(fā)送后再次意外地使用這個值;所有權(quán)系統(tǒng)檢查一切是否合乎規(guī)則。

發(fā)送多個值并觀察接收者的等待

示例 16-8 中的代碼可以編譯和運行,不過它并沒有明確的告訴我們兩個獨立的線程通過信道相互通訊。示例 16-10 則有一些改進會證明示例 16-8 中的代碼是并發(fā)執(zhí)行的:新建線程現(xiàn)在會發(fā)送多個消息并在每個消息之間暫停一秒鐘。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

示例 16-10: 發(fā)送多個消息,并在每次發(fā)送后暫停一段時間

這一次,在新建線程中有一個字符串 vector 希望發(fā)送到主線程。我們遍歷他們,單獨的發(fā)送每一個字符串并通過一個 Duration 值調(diào)用 thread::sleep 函數(shù)來暫停一秒。

在主線程中,不再顯式調(diào)用 recv 函數(shù):而是將 rx 當作一個迭代器。對于每一個接收到的值,我們將其打印出來。當信道被關(guān)閉時,迭代器也將結(jié)束。

當運行示例 16-10 中的代碼時,將看到如下輸出,每一行都會暫停一秒:

Got: hi
Got: from
Got: the
Got: thread

因為主線程中的 for 循環(huán)里并沒有任何暫停或等待的代碼,所以可以說主線程是在等待從新建線程中接收值。

通過克隆發(fā)送者來創(chuàng)建多個生產(chǎn)者

之前我們提到了mpsc是 multiple producer, single consumer 的縮寫??梢赃\用 mpsc 來擴展示例 16-10 中的代碼來創(chuàng)建向同一接收者發(fā)送值的多個線程。這可以通過克隆信道的發(fā)送端來做到,如示例 16-11 所示:

文件名: src/main.rs

    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--

示例 16-11: 從多個生產(chǎn)者發(fā)送多個消息

這一次,在創(chuàng)建新線程之前,我們對信道的發(fā)送端調(diào)用了 clone 方法。這會給我們一個可以傳遞給第一個新建線程的發(fā)送端句柄。我們會將原始的信道發(fā)送端傳遞給第二個新建線程。這樣就會有兩個線程,每個線程將向信道的接收端發(fā)送不同的消息。

如果運行這些代碼,你 可能 會看到這樣的輸出:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

雖然你可能會看到這些值以不同的順序出現(xiàn);這依賴于你的系統(tǒng)。這也就是并發(fā)既有趣又困難的原因。如果通過 thread::sleep 做實驗,在不同的線程中提供不同的值,就會發(fā)現(xiàn)他們的運行更加不確定,且每次都會產(chǎn)生不同的輸出。

現(xiàn)在我們見識過了信道如何工作,再看看另一種不同的并發(fā)方式吧。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號