Rust并发系列1-Thread

Rust线程

Rust 的线程模型是 1:1 模型,一个Rust Thread即对应一个内核线程,可以最小化运行时。

如何创建线程

spawn

类似C的fork,java的Runnable,Go的go func,传入function在另一个线程中执行。
一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
use std::thread;

fn main() {
println!("start {:?}", thread::current().id());
let j = std::thread::spawn(|| {
println!("Hello from a thread! {:?}",thread::current().id()) ;
});
j.join().unwrap();
}
---------
start ThreadId(1)
Hello from a thread! ThreadId(2)

std::thread::spawn 函数签名为:

1
2
3
4
5
6
7
8
9
#[stable(feature = "rust1", since = "1.0.0")]
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
}

接受一个F实现了FnOnce,这语义上Rust给我们的保证:意味着我们传入的闭包只会被调用一次。同时此F返回T。我们在spawn中传入闭包后,fork线程出来执行此闭包。

其次spawn返回JoinHandle ,对此执行join 将调用libc::pthread_join 返回 crate::result::Result<T, Box<dyn Any + Send + 'static>> 这里T是我们传入闭包的返回值, 而错误的trait object如果存在,则表示thread发生了panic。

scope

scope的则保证其闭包中的spawn 的出的闭包在其结束时自动join。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fn main() {
let n = Instant::now();
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_secs(2));
});
s.spawn(|| {
sleep(Duration::from_secs(4));
});
});
println!("{:?}", Instant::now() - n);
}

------------
output:

4.00290675s

threadlocal

将元素保存在线程本地的存储里,只有线程自己可以访问。与Java的ThreadLocal 基本相同。threadlocal 宏 中声明好static的变量后,就可以:

1
2
3
thread_local! {
static FOO: std::cell::RefCell<u32> = RefCell::new(1)
}

这里FOO会被封装为LocalKey<std::cell::RefCell<u32>>。Localkey为T,Cell<T>,RefCell<t> 各实现了一套API。
其中
为所有类型T都实现了withtry_withinitialize_with 。他们都为闭包提供了不可变引用
对于普通类型T,threadlocal不直接提供其对象的可变引用。这里需要借助内部可变性CellRefCell

LocalKey 为类型Cell<T> 实现了set,get,take,replace T支持整体替换。其中get需实现Copy。从语义上来看和Cell也基本一致:直接替换threadlocal 中。

LocalKey 为类型RefCell<T> 实现了with_borrowwith_borrow_mut。提供的API和RefCell一致提供了局部可变性。

一套基本的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::{
borrow::Borrow,
cell::RefCell,
thread::{self, sleep},
time::{Duration, Instant},
};

thread_local! {
static FOO: std::cell::RefCell<u32> = RefCell::new(1)
}

fn main() {
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_secs(2));
FOO.with(|f| {
*f.borrow_mut() = 4;
println!("{:?} [1] {}", thread::current().id(), f.borrow());
});
});
s.spawn(|| {
sleep(Duration::from_secs(4));
FOO.with(|f| {
println!("{:?} [2] {}", thread::current().id(), f.borrow());
})
});
FOO.with(|f| {
println!("{:?} [4] {}", thread::current().id(), f.borrow());
});
});
FOO.with(|f| {
println!("{:?} [3] {}", thread::current().id(), f.borrow());
});
}
----------------
output:
ThreadId(1) [4] 1
ThreadId(2) [1] 4
ThreadId(3) [2] 1
ThreadId(1) [3] 1

可以看出:

  1. 3跟4是跑在同一个线程,scope并不会创建新的线程。
  2. 点位4对FOO的修改,点位2并不能观察到说明这里确实是threadlocal生效了。

Send/Sync

两个 标记trait

1
2
pub unsafe auto trait Send { }
pub unsafe auto trait Sync { }
  • 实现Send的类型可以在线程间安全的传递其所有权
  • 实现Sync的类型可以在线程间安全的共享(通过引用)

由于这两个为auto trait。所以在 Rust 中,几乎所有类型都默认实现了Send和Sync。而且由于这两个特征都是可自动派生的trait。意味着一个复合类型(例如结构体), 只要它内部的所有成员都实现了Send或者Sync,那么它就自动实现了Send或Sync。但是也会有些类型因为某些原因实现了!Send 显示表明这个类型不能安全地在线程间传递。比如裸指针* mut,Rc等。

下面是一个裸指针的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fn main() {
let b = Box::new(1);
let r = Box::into_raw(b);
thread::spawn(move || {
let b = unsafe { Box::from_raw(r) };
println!("b = {}", b);
});
}
-------------
error[E0277]: `*mut i32` cannot be sent between threads safely
--> src/main.rs:17:19
|
17 | thread::spawn(move || {
| ------------- ^------
| | |
| _____|_____________within this `[closure@src/main.rs:17:19: 17:26]`
| | |
| | required by a bound introduced by this call
18 | | let b = unsafe { Box::from_raw(r) };
19 | | println!("b = {}", b);
20 | | });
| |_____^ `*mut i32` cannot be sent between threads safely
|

编译失败。

我们回头看thread这里会发现:

1
2
3
4
5
6
7
8
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
}

函数为FnOnce要求:我们的闭包配合move使用,设计意图是避免我们借用了捕获的变量,而导致另一个线程数据被回收后,其引用仍在此线程被使用的问题。

函数要求闭包F支持Send,由于闭包在编译时会将捕获的变量转为结构体。如果要求此结构体支持Send,则其中的每个字段都必须支持Send。Rust只允许存在一个活跃的可变引用,多个不可变引用。所以Arc/Rc没有实现DerefMut,只实现了Deref,所有被引用计数指针包装的东西都只能有不可变引用

这时我们想修改引用计数指针内的东西只能寄希望于内部可变性 :整体替换(Cell),或者基于运行时检查的借用(Refcell)。然而Cell和RefCell都明确实现了!Sync 表示不支持多线程。

1
2
3
4
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: ?Sized> !marker::Send for Rc<T> {}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: ?Sized> !marker::Sync for Rc<T> {}

而Arc呢?

1
2
3
4
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}

我们再回头梳理下Rust这里的设计思路:

  1. 由于thread 的闭包要求我们使用FnOnce,所以我们必须提供所有权给闭包,而不是引用。
  2. 提供所有权给闭包这里其实分两种情况
    1. 一种就是单纯的把数据从一个线程move到另一个线程,这种情况其实只要数据实现了Send就可以了。大部分基本的数据结构都默认支持,因为这里不存在任何跨线程的数据共享,内存的回收,生命周期全部都随着move 一并交付给新的线程,和原线程没有关系(对于普通的数据类型而言)。
    2. 另一种则是存在跨线程共享所有权,对于Rust则意味着必须有引用计数,Rc或者Arc。而Rc明确不支持Sync,Send。从实际实现的角度来看,Rc中也没有任何同步原语。那么Rust将我们的逼到了最后一条路,Arc。
  3. 对于Arc来说,只要其内部元素T支持Sync + Send,Arc就支持Sync + Send。因此对于大部分自动Send + Sync的类型Arc 就可以实现线程共享。
  4. Arc和Rc都只能获取不可变引用,换句话说,Arc 只能读。一般配合引用计数的是内部可变性,然而Cell,RefCell都标记了!Sync 所以这条路也走不通。那如何提供跨线程的内部可变性呢?

下篇文章我们将讨论标准库的sync包。