并發(fā)性和并行性在計(jì)算機(jī)科學(xué)中是非常重要的主題,即使在當(dāng)今工業(yè)中也是個(gè)熱門(mén)的話題。電腦得到了越來(lái)越多的核心,然而,很多程序并沒(méi)有能力來(lái)利用它們。
Rust 內(nèi)存安全特性同樣采用了并發(fā)的方式。甚至 Rust 程序內(nèi)存必須是安全,沒(méi)有數(shù)據(jù)之間的競(jìng)爭(zhēng)。Rust 的類(lèi)型系統(tǒng)的任務(wù)就是給你強(qiáng)大的方式讓程序能夠在編譯時(shí)并發(fā)執(zhí)行。
之前我們談?wù)撨^(guò) Rust 的并發(fā)特性,需要理解的重要的是: Rust 足夠低級(jí)別的,所有這些都是由標(biāo)準(zhǔn)庫(kù)提供的,而不是語(yǔ)言本身。這意味著,如果你不喜歡 Rust 某些方面處理并發(fā)性的方式,你可以自己實(shí)現(xiàn)另一種做事的方式。mio 是一個(gè)在現(xiàn)實(shí)中踐行這一原則的例子。
并發(fā)性是很難說(shuō)清楚的。在 Rust 中,我們有一個(gè)強(qiáng)大的、靜態(tài)類(lèi)型系統(tǒng)來(lái)幫助使我們的代碼可理解。因此,Rust 提供給了我們兩個(gè)特性來(lái)幫助我們理解可以并發(fā)執(zhí)行的代碼。
我們將談?wù)摰谝粋€(gè)特性是 Send。當(dāng)類(lèi)型 T 實(shí)現(xiàn) Send 時(shí),它告訴編譯器,這種類(lèi)型的線程擁有在線程之間安全轉(zhuǎn)移的所有權(quán)。
強(qiáng)行添加一些限制是很重要的。例如,如果我們有一個(gè)通道連接兩個(gè)線程,我們將希望能夠向通道中發(fā)送一些數(shù)據(jù),接著將這些數(shù)據(jù)傳送給另外一個(gè)線程。因此,我們要保證要被發(fā)送的類(lèi)型實(shí)現(xiàn)了 Send。
相反地,如果我們利用 FFI 封裝一個(gè)庫(kù),然而它不是線程安全的,此時(shí)我們不想實(shí)現(xiàn) Send,所以編譯器會(huì)幫助我們強(qiáng)制它不能離開(kāi)當(dāng)前線程。
第二個(gè)特性被稱(chēng)為 Sync。當(dāng)一個(gè)類(lèi)型 T 實(shí)現(xiàn) Sync 時(shí),它告訴編譯器,但這種類(lèi)型被使用在多個(gè)線程并發(fā)時(shí)不可能引起內(nèi)存不安全的狀態(tài)。
例如,與一個(gè)原子索引計(jì)數(shù)器共享不可變的數(shù)據(jù)是線程安全的。Rust 提供了一種類(lèi)似 Arc< T > 類(lèi)型,并且它實(shí)現(xiàn)了 Sync,所以它在線程之間共享是安全的。
這兩個(gè)特性將會(huì)讓你在并發(fā)的情況下,你所使用類(lèi)型系統(tǒng)的對(duì)代碼的屬性做出強(qiáng)有力的保證。在說(shuō)明為什么能夠這樣保障之前,我們需要首先學(xué)習(xí)如何創(chuàng)建一個(gè)并發(fā)的 Rust 程序。
Rust 的標(biāo)準(zhǔn)庫(kù)為線程提供了一個(gè)庫(kù),它允許你以并行的方式運(yùn)行 Rust 代碼。這里是使用 std::Thread 的一個(gè)簡(jiǎn)單例子:
use std::thread;
fn main() {
thread::spawn(|| {
println!("Hello from a thread!");
});
}
thread:spawn() 方法接受一個(gè)封閉參數(shù),這個(gè)參數(shù)會(huì)在一個(gè)新線程中執(zhí)行。它返回該線程的句柄,它可以用來(lái)在等待子線程完成之后提取其結(jié)果:
use std::thread;
fn main() {
let handle = thread::spawn(|| {
"Hello from a thread!"
});
println!("{}", handle.join().unwrap());
}
許多語(yǔ)言有能力執(zhí)行線程,但普遍的是非常不安全的??梢詫?xiě)整本書(shū)講關(guān)于如何防止在共享可變狀態(tài)的情況下發(fā)生錯(cuò)誤。Rust 通過(guò)他的類(lèi)型系統(tǒng)在編譯期阻止數(shù)據(jù)的競(jìng)爭(zhēng)來(lái)解決這個(gè)問(wèn)題。接下來(lái)讓我們看看如何在線程之間共享的數(shù)據(jù)。
由 Rust 的類(lèi)型系統(tǒng),我們可以有一個(gè)概念,盡管聽(tīng)起來(lái)不切實(shí)際:“安全共享可變狀態(tài)。“許多程序員都同意共享可變狀態(tài)是非常,非常糟糕的。
有人曾經(jīng)說(shuō)過(guò):共享可變狀態(tài)是一切罪惡的根源。大多數(shù)語(yǔ)言嘗試解決這個(gè)問(wèn)題通過(guò)“可變”部分,但 Rust 處理是通過(guò)解決“共享”的部分來(lái)解決這個(gè)問(wèn)題。
同一所有權(quán)系統(tǒng)有助于防止錯(cuò)誤的使用指針,同時(shí)也有助于排除數(shù)據(jù)競(jìng)爭(zhēng),這個(gè)是并發(fā)執(zhí)行時(shí)最糟糕的一種 bug。
如下是一個(gè) Rust 程序,它會(huì)在許多語(yǔ)言中存在數(shù)據(jù)競(jìng)賽。它將不會(huì)編譯通過(guò):
use std::thread;
fn main() {
let mut data = vec![1u32, 2, 3];
for i in 0..3 {
thread::spawn(move || {
data[i] += 1;
});
}
thread::sleep_ms(50);
}
會(huì)輸出如下的錯(cuò)誤:
8:17 error: capture of moved value: `data`
data[i] += 1;
^~~~
在這種情況下,我們知道我們的代碼應(yīng)該是安全的,但是 Rust 不確定。其實(shí)實(shí)際上它并不安全:如果我們?cè)诿總€(gè)線程中都引用 data,那么每個(gè)線程都會(huì)有一個(gè)自己索引數(shù)據(jù)的權(quán)限,那么同一個(gè)數(shù)據(jù)有三個(gè)所有者!那是不好的。我們可以通過(guò)使用 Arc<T>
類(lèi)型來(lái)解決這個(gè)問(wèn)題,它是一個(gè)原子引用計(jì)數(shù)器數(shù)指針?!霸印币馕吨缇€程共享是安全的。
Arc< T >
假定一個(gè)或更多關(guān)于其內(nèi)容的屬性,以確保它在跨線程共享是安全的:它假定其內(nèi)容擁有 Sync 屬性。但在我們的例子中,我們希望能夠修改變量的值。我們需要一種類(lèi)型能夠確保一次只能有一個(gè)用戶能夠修改變量值。為此,我們可以使用 Mutex< T >
類(lèi)型。這是第二個(gè)版本的代碼。它仍然不能正常工作,但是由于其他的原因:
use std::thread;
use std::sync::Mutex;
fn main() {
let mut data = Mutex::new(vec![1u32, 2, 3]);
for i in 0..3 {
let data = data.lock().unwrap();
thread::spawn(move || {
data[i] += 1;
});
}
thread::sleep_ms(50);
}
這里存在如下的錯(cuò)誤:
<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11 thread::spawn(move || {
^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11 thread::spawn(move || {
^~~~~~~~~~~~~
你可以發(fā)現(xiàn),Mutex 有一個(gè) lock 方法,它的函數(shù)聲明如下:
fn lock(&self) -> LockResult<MutexGuard<T>>
因?yàn)?Send 沒(méi)有實(shí)現(xiàn) MutexGuard < T >
,我們不能越過(guò)線程范圍進(jìn)轉(zhuǎn)換,這就是錯(cuò)誤的原因。
我們可以用 Arc< T >
來(lái)解決這個(gè)問(wèn)題。如下是可以工作的版本:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));
for i in 0..3 {
let data = data.clone();
thread::spawn(move || {
let mut data = data.lock().unwrap();
data[i] += 1;
});
}
thread::sleep_ms(50);
}
現(xiàn)在我們可以在 Arc 中調(diào)用 clone() 方法,它會(huì)增加內(nèi)部計(jì)數(shù)。這個(gè)句柄接著就會(huì)跳轉(zhuǎn)到一個(gè)新的線程中進(jìn)行執(zhí)行。讓我們更仔細(xì)地查看線程的主體代碼:
thread::spawn(move || {
let mut data = data.lock().unwrap();
data[i] += 1;
});
首先,我們調(diào)用 lock() 函數(shù)獲得互斥鎖的鎖。它將返回 Result< T, E >
,而且這只是一個(gè)例子,由于這個(gè)函數(shù)可能會(huì)失敗,因此我們使用 unwrap() 函數(shù)來(lái)得到引用的數(shù)據(jù)。真正的代碼在這里會(huì)有更健壯的錯(cuò)誤處理代碼。當(dāng)我們得到鎖之后,我們就可以隨意的修改變量了。
最后,在線程運(yùn)行時(shí),我們等待了一會(huì)。但這不是理想:我們可以選擇一個(gè)合理的時(shí)間等待,但更有可能我們會(huì)是等待的時(shí)間比必要的時(shí)間長(zhǎng)或者還要短,這取決于線程運(yùn)行時(shí)執(zhí)行計(jì)算花費(fèi)的實(shí)際時(shí)間。
更精確的計(jì)時(shí)器的替代品是使用 Rust 標(biāo)準(zhǔn)庫(kù)中提供的線程同步方法中的一個(gè)機(jī)制。接下來(lái)讓我們談?wù)勂渲幸粋€(gè)的機(jī)制:通道。
如下是使用 channel 進(jìn)行線程同步的一個(gè)版本,而不是等待一個(gè)特定的時(shí)間:
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
fn main() {
let data = Arc::new(Mutex::new(0u32));
let (tx, rx) = mpsc::channel();
for _ in 0..10 {
let (data, tx) = (data.clone(), tx.clone());
thread::spawn(move || {
let mut data = data.lock().unwrap();
*data += 1;
tx.send(());
});
}
for _ in 0..10 {
rx.recv();
}
}
我們使用 mpsc:channel() 方法來(lái)構(gòu)造一個(gè)新的通道。我們只是發(fā)送一個(gè)簡(jiǎn)單的 () 到通道中,然后等待十次執(zhí)行后返回。
雖然這通道只是發(fā)送一個(gè)通用的信號(hào),但我們可以向通道中發(fā)送任何 Send 類(lèi)型的數(shù)據(jù)!
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
for _ in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
let answer = 42u32;
tx.send(answer);
});
}
rx.recv().ok().expect("Could not receive answer");
}
u32 是 Send 類(lèi)型,因此我們可以復(fù)制。所以我們創(chuàng)建一個(gè)線程,讓它來(lái)計(jì)算答案,然后使用 send() 通過(guò)通道向我們發(fā)送答案。
panic! 將當(dāng)前執(zhí)行的線程中斷。您可以用一個(gè)簡(jiǎn)單的隔離機(jī)制執(zhí)行 Rust 的線程:
use std::thread;
let result = thread::spawn(move || {
panic!("oops!");
}).join();
assert!(result.is_err());
線程返回給我們一個(gè)結(jié)果,這讓我們能夠檢查線程是否發(fā)生了異常。
更多建議: