Rust 共享狀態(tài)并發(fā)

2023-03-22 15:15 更新
ch16-03-shared-state.md
commit 75b9d4a8dccc245e0343eb1480aa86f169043ea5

雖然消息傳遞是一個很好的處理并發(fā)的方式,但并不是唯一一個。再一次思考一下 Go 編程語言文檔中口號的這一部分:“不要通過共享內(nèi)存來通訊”(“do not communicate by sharing memory.”):

What would communicating by sharing memory look like? In addition, why would message passing enthusiasts not use it and do the opposite instead?

通過共享內(nèi)存通訊看起來如何?除此之外,為何消息傳遞的擁護(hù)者并不使用它并反其道而行之呢?

在某種程度上,任何編程語言中的信道都類似于單所有權(quán),因為一旦將一個值傳送到信道中,將無法再使用這個值。共享內(nèi)存類似于多所有權(quán):多個線程可以同時訪問相同的內(nèi)存位置。第十五章介紹了智能指針如何使得多所有權(quán)成為可能,然而這會增加額外的復(fù)雜性,因為需要以某種方式管理這些不同的所有者。Rust 的類型系統(tǒng)和所有權(quán)規(guī)則極大的協(xié)助了正確地管理這些所有權(quán)。作為一個例子,讓我們看看互斥器,一個更為常見的共享內(nèi)存并發(fā)原語。

互斥器一次只允許一個線程訪問數(shù)據(jù)

互斥器mutex)是 mutual exclusion 的縮寫,也就是說,任意時刻,其只允許一個線程訪問某些數(shù)據(jù)。為了訪問互斥器中的數(shù)據(jù),線程首先需要通過獲取互斥器的 lock)來表明其希望訪問數(shù)據(jù)。鎖是一個作為互斥器一部分的數(shù)據(jù)結(jié)構(gòu),它記錄誰有數(shù)據(jù)的排他訪問權(quán)。因此,我們描述互斥器為通過鎖系統(tǒng) 保護(hù)guarding)其數(shù)據(jù)。

互斥器以難以使用著稱,因為你不得不記住:

  1. 在使用數(shù)據(jù)之前嘗試獲取鎖。
  2. 處理完被互斥器所保護(hù)的數(shù)據(jù)之后,必須解鎖數(shù)據(jù),這樣其他線程才能夠獲取鎖。

作為一個現(xiàn)實(shí)中互斥器的例子,想象一下在某個會議的一次小組座談會中,只有一個麥克風(fēng)。如果一位成員要發(fā)言,他必須請求或表示希望使用麥克風(fēng)。一旦得到了麥克風(fēng),他可以暢所欲言,然后將麥克風(fēng)交給下一位希望講話的成員。如果一位成員結(jié)束發(fā)言后忘記將麥克風(fēng)交還,其他人將無法發(fā)言。如果對共享麥克風(fēng)的管理出現(xiàn)了問題,座談會將無法如期進(jìn)行!

正確的管理互斥器異常復(fù)雜,這也是許多人之所以熱衷于信道的原因。然而,在 Rust 中,得益于類型系統(tǒng)和所有權(quán),我們不會在鎖和解鎖上出錯。

Mutex<T>的 API

作為展示如何使用互斥器的例子,讓我們從在單線程上下文使用互斥器開始,如示例 16-12 所示:

文件名: src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

示例 16-12: 出于簡單的考慮,在一個單線程上下文中探索 Mutex<T> 的 API

像很多類型一樣,我們使用關(guān)聯(lián)函數(shù) new 來創(chuàng)建一個 Mutex<T>。使用 lock 方法獲取鎖,以訪問互斥器中的數(shù)據(jù)。這個調(diào)用會阻塞當(dāng)前線程,直到我們擁有鎖為止。

如果另一個線程擁有鎖,并且那個線程 panic 了,則 lock 調(diào)用會失敗。在這種情況下,沒人能夠再獲取鎖,所以這里選擇 unwrap 并在遇到這種情況時使線程 panic。

一旦獲取了鎖,就可以將返回值(在這里是num)視為一個其內(nèi)部數(shù)據(jù)的可變引用了。類型系統(tǒng)確保了我們在使用 m 中的值之前獲取鎖:Mutex<i32> 并不是一個 i32,所以 必須 獲取鎖才能使用這個 i32 值。我們是不會忘記這么做的,因為反之類型系統(tǒng)不允許訪問內(nèi)部的 i32 值。

正如你所懷疑的,Mutex<T> 是一個智能指針。更準(zhǔn)確的說,lock 調(diào)用 返回 一個叫做 MutexGuard 的智能指針。這個智能指針實(shí)現(xiàn)了 Deref 來指向其內(nèi)部數(shù)據(jù);其也提供了一個 Drop 實(shí)現(xiàn)當(dāng) MutexGuard 離開作用域時自動釋放鎖,這正發(fā)生于示例 16-12 內(nèi)部作用域的結(jié)尾。為此,我們不會忘記釋放鎖并阻塞互斥器為其它線程所用的風(fēng)險,因為鎖的釋放是自動發(fā)生的。

丟棄了鎖之后,可以打印出互斥器的值,并發(fā)現(xiàn)能夠?qū)⑵鋬?nèi)部的 i32 改為 6。

在線程間共享 Mutex<T>

現(xiàn)在讓我們嘗試使用 Mutex<T> 在多個線程間共享值。我們將啟動十個線程,并在各個線程中對同一個計數(shù)器值加一,這樣計數(shù)器將從 0 變?yōu)?10。示例 16-13 中的例子會出現(xiàn)編譯錯誤,而我們將通過這些錯誤來學(xué)習(xí)如何使用 Mutex<T>,以及 Rust 又是如何幫助我們正確使用的。

文件名: src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

示例 16-13: 程序啟動了 10 個線程,每個線程都通過 Mutex<T> 來增加計數(shù)器的值

這里創(chuàng)建了一個 counter 變量來存放內(nèi)含 i32 的 Mutex<T>,類似示例 16-12 那樣。接下來遍歷 range 創(chuàng)建了 10 個線程。使用了 thread::spawn 并對所有線程使用了相同的閉包:他們每一個都將調(diào)用 lock 方法來獲取 Mutex<T> 上的鎖,接著將互斥器中的值加一。當(dāng)一個線程結(jié)束執(zhí)行,num 會離開閉包作用域并釋放鎖,這樣另一個線程就可以獲取它了。

在主線程中,我們像示例 16-2 那樣收集了所有的 join 句柄,調(diào)用它們的 join 方法來確保所有線程都會結(jié)束。這時,主線程會獲取鎖并打印出程序的結(jié)果。

之前提示過這個例子不能編譯,讓我們看看為什么!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` due to previous error

錯誤信息表明 counter 值在上一次循環(huán)中被移動了。所以 Rust 告訴我們不能將 counter 鎖的所有權(quán)移動到多個線程中。讓我們通過一個第十五章討論過的多所有權(quán)手段來修復(fù)這個編譯錯誤。

多線程和多所有權(quán)

在第十五章中,通過使用智能指針 Rc<T> 來創(chuàng)建引用計數(shù)的值,以便擁有多所有者。讓我們在這也這么做看看會發(fā)生什么。將示例 16-14 中的 Mutex<T> 封裝進(jìn) Rc<T> 中并在將所有權(quán)移入線程之前克隆了 Rc<T>。

文件名: src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

示例 16-14: 嘗試使用 Rc<T> 來允許多個線程擁有 Mutex<T>

再一次編譯并...出現(xiàn)了不同的錯誤!編譯器真是教會了我們很多!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
   --> src/main.rs:11:22
    |
11  |           let handle = thread::spawn(move || {
    |  ______________________^^^^^^^^^^^^^_-
    | |                      |
    | |                      `Rc<Mutex<i32>>` cannot be sent between threads safely
12  | |             let mut num = counter.lock().unwrap();
13  | |
14  | |             *num += 1;
15  | |         });
    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`
    |
    = help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
    = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`
note: required by a bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` due to previous error

哇哦,錯誤信息太長不看!這里是一些需要注意的重要部分:第一行錯誤表明 `Rc<Mutex<i32>>` cannot be sent between threads safely。編譯器也告訴了我們原因 the trait `Send` is not implemented for `Rc<Mutex<i32>>`。下一部分會講到 Send:這是確保所使用的類型可以用于并發(fā)環(huán)境的 trait 之一。

不幸的是,Rc<T> 并不能安全的在線程間共享。當(dāng) Rc<T> 管理引用計數(shù)時,它必須在每一個 clone 調(diào)用時增加計數(shù),并在每一個克隆被丟棄時減少計數(shù)。Rc<T> 并沒有使用任何并發(fā)原語,來確保改變計數(shù)的操作不會被其他線程打斷。在計數(shù)出錯時可能會導(dǎo)致詭異的 bug,比如可能會造成內(nèi)存泄漏,或在使用結(jié)束之前就丟棄一個值。我們所需要的是一個完全類似 Rc<T>,又以一種線程安全的方式改變引用計數(shù)的類型。

原子引用計數(shù) Arc<T>

所幸 Arc<T> 正是 這么一個類似 Rc<T> 并可以安全的用于并發(fā)環(huán)境的類型。字母 “a” 代表 原子性atomic),所以這是一個 原子引用計數(shù)atomically reference counted)類型。原子性是另一類這里還未涉及到的并發(fā)原語:請查看標(biāo)準(zhǔn)庫中 std::sync::atomic 的文檔來獲取更多細(xì)節(jié)。目前我們只需要知道原子類就像基本類型一樣可以安全的在線程間共享。

你可能會好奇為什么不是所有的原始類型都是原子性的?為什么不是所有標(biāo)準(zhǔn)庫中的類型都默認(rèn)使用 Arc<T> 實(shí)現(xiàn)?原因在于線程安全帶有性能懲罰,我們希望只在必要時才為此買單。如果只是在單線程中對值進(jìn)行操作,原子性提供的保證并無必要,代碼可以因此運(yùn)行的更快。

回到之前的例子:Arc<T> 和 Rc<T> 有著相同的 API,所以修改程序中的 use 行和 new 調(diào)用。示例 16-15 中的代碼最終可以編譯和運(yùn)行:

文件名: src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

示例 16-15: 使用 Arc<T> 包裝一個 Mutex<T> 能夠?qū)崿F(xiàn)在多線程之間共享所有權(quán)

這會打印出:

Result: 10

成功了!我們從 0 數(shù)到了 10,這可能并不是很顯眼,不過一路上我們確實(shí)學(xué)習(xí)了很多關(guān)于 Mutex<T> 和線程安全的內(nèi)容!這個例子中構(gòu)建的結(jié)構(gòu)可以用于比增加計數(shù)更為復(fù)雜的操作。使用這個策略,可將計算分成獨(dú)立的部分,分散到多個線程中,接著使用 Mutex<T> 使用各自的結(jié)算結(jié)果更新最終的結(jié)果。

RefCell<T>/Rc<T> 與 Mutex<T>/Arc<T> 的相似性

你可能注意到了,因為 counter 是不可變的,不過可以獲取其內(nèi)部值的可變引用;這意味著 Mutex<T> 提供了內(nèi)部可變性,就像 Cell 系列類型那樣。正如第十五章中使用 RefCell<T> 可以改變 Rc<T> 中的內(nèi)容那樣,同樣的可以使用 Mutex<T> 來改變 Arc<T> 中的內(nèi)容。

另一個值得注意的細(xì)節(jié)是 Rust 不能避免使用 Mutex<T> 的全部邏輯錯誤。回憶一下第十五章使用 Rc<T> 就有造成引用循環(huán)的風(fēng)險,這時兩個 Rc<T> 值相互引用,造成內(nèi)存泄漏。同理,Mutex<T> 也有造成 死鎖deadlock) 的風(fēng)險。這發(fā)生于當(dāng)一個操作需要鎖住兩個資源而兩個線程各持一個鎖,這會造成它們永遠(yuǎn)相互等待。如果你對這個主題感興趣,嘗試編寫一個帶有死鎖的 Rust 程序,接著研究任何其他語言中使用互斥器的死鎖規(guī)避策略并嘗試在 Rust 中實(shí)現(xiàn)他們。標(biāo)準(zhǔn)庫中 Mutex<T> 和 MutexGuard 的 API 文檔會提供有用的信息。

接下來,為了豐富本章的內(nèi)容,讓我們討論一下 Send和 Sync trait 以及如何對自定義類型使用他們。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號