Rust线程
Rust 的线程模型是 1:1 模型,一个Rust Thread即对应一个内核线程,可以最小化运行时。
如何创建线程
spawn
类似C的fork,java的Runnable,Go的go func,传入function在另一个线程中执行。
一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13
| use std::thread;
fn main() { println!("start {:?}", thread::current().id()); let j = std::thread::spawn(|| { println!("Hello from a thread! {:?}",thread::current().id()) ; }); j.join().unwrap(); } --------- start ThreadId(1) Hello from a thread! ThreadId(2)
|
std::thread::spawn
函数签名为:
1 2 3 4 5 6 7 8 9
| #[stable(feature = "rust1", since = "1.0.0")] pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { Builder::new().spawn(f).expect("failed to spawn thread") }
|
接受一个F实现了FnOnce
,这语义上Rust给我们的保证:意味着我们传入的闭包只会被调用一次。同时此F返回T。我们在spawn中传入闭包后,fork线程出来执行此闭包。
其次spawn返回JoinHandle
,对此执行join
将调用libc::pthread_join
返回 crate::result::Result<T, Box<dyn Any + Send + 'static>>
这里T是我们传入闭包的返回值, 而错误的trait object如果存在,则表示thread发生了panic。
scope
scope的则保证其闭包中的spawn
的出的闭包在其结束时自动join。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| fn main() { let n = Instant::now(); thread::scope(|s| { s.spawn(|| { sleep(Duration::from_secs(2)); }); s.spawn(|| { sleep(Duration::from_secs(4)); }); }); println!("{:?}", Instant::now() - n); }
------------ output:
4.00290675s
|
threadlocal
将元素保存在线程本地的存储里,只有线程自己可以访问。与Java的ThreadLocal
基本相同。threadlocal
宏 中声明好static的变量后,就可以:
1 2 3
| thread_local! { static FOO: std::cell::RefCell<u32> = RefCell::new(1) }
|
这里FOO会被封装为LocalKey<std::cell::RefCell<u32>>
。Localkey为T
,Cell<T>
,RefCell<t>
各实现了一套API。
其中
为所有类型T都实现了with
,try_with
,initialize_with
。他们都为闭包提供了不可变引用
。
对于普通类型T,threadlocal不直接提供其对象的可变引用。这里需要借助内部可变性
:Cell
和RefCell
。
LocalKey
为类型Cell<T>
实现了set,get,take,replace T支持整体替换。其中get需实现Copy。从语义上来看和Cell也基本一致:直接替换threadlocal
中。
LocalKey
为类型RefCell<T>
实现了with_borrow
和with_borrow_mut
。提供的API和RefCell一致提供了局部可变性。
一套基本的使用方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| use std::{ borrow::Borrow, cell::RefCell, thread::{self, sleep}, time::{Duration, Instant}, };
thread_local! { static FOO: std::cell::RefCell<u32> = RefCell::new(1) }
fn main() { thread::scope(|s| { s.spawn(|| { sleep(Duration::from_secs(2)); FOO.with(|f| { *f.borrow_mut() = 4; println!("{:?} [1] {}", thread::current().id(), f.borrow()); }); }); s.spawn(|| { sleep(Duration::from_secs(4)); FOO.with(|f| { println!("{:?} [2] {}", thread::current().id(), f.borrow()); }) }); FOO.with(|f| { println!("{:?} [4] {}", thread::current().id(), f.borrow()); }); }); FOO.with(|f| { println!("{:?} [3] {}", thread::current().id(), f.borrow()); }); } ---------------- output: ThreadId(1) [4] 1 ThreadId(2) [1] 4 ThreadId(3) [2] 1 ThreadId(1) [3] 1
|
可以看出:
- 3跟4是跑在同一个线程,scope并不会创建新的线程。
- 点位4对FOO的修改,点位2并不能观察到说明这里确实是threadlocal生效了。
Send/Sync
两个 标记trait
1 2
| pub unsafe auto trait Send { } pub unsafe auto trait Sync { }
|
- 实现Send的类型可以在线程间安全的传递其所有权
- 实现Sync的类型可以在线程间安全的共享(通过引用)
由于这两个为auto trait。所以在 Rust 中,几乎所有类型都默认实现了Send和Sync。而且由于这两个特征都是可自动派生的trait。意味着一个复合类型(例如结构体), 只要它内部的所有成员都实现了Send或者Sync,那么它就自动实现了Send或Sync。但是也会有些类型因为某些原因实现了!Send
显示表明这个类型不能安全地在线程间传递。比如裸指针* mut,Rc等。
下面是一个裸指针的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| fn main() { let b = Box::new(1); let r = Box::into_raw(b); thread::spawn(move || { let b = unsafe { Box::from_raw(r) }; println!("b = {}", b); }); } ------------- error[E0277]: `*mut i32` cannot be sent between threads safely --> src/main.rs:17:19 | 17 | thread::spawn(move || { | ------------- ^------ | | | | _____|_____________within this `[closure@src/main.rs:17:19: 17:26]` | | | | | required by a bound introduced by this call 18 | | let b = unsafe { Box::from_raw(r) }; 19 | | println!("b = {}", b); 20 | | }); | |_____^ `*mut i32` cannot be sent between threads safely |
|
编译失败。
我们回头看thread这里会发现:
1 2 3 4 5 6 7 8
| pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { Builder::new().spawn(f).expect("failed to spawn thread") }
|
函数为FnOnce要求:我们的闭包配合move使用,设计意图是避免我们借用了捕获的变量,而导致另一个线程数据被回收后,其引用仍在此线程被使用的问题。
函数要求闭包F支持Send,由于闭包在编译时会将捕获的变量转为结构体。如果要求此结构体支持Send,则其中的每个字段都必须支持Send。Rust只允许存在一个活跃的可变引用,多个不可变引用。所以Arc/Rc没有实现DerefMut,只实现了Deref,所有被引用计数指针包装的东西都只能有不可变引用。
这时我们想修改引用计数指针内的东西只能寄希望于内部可变性 :整体替换(Cell),或者基于运行时检查的借用(Refcell)。然而Cell和RefCell都明确实现了!Sync 表示不支持多线程。
1 2 3 4
| #[stable(feature = "rust1", since = "1.0.0")] impl<T: ?Sized> !marker::Send for Rc<T> {} #[stable(feature = "rust1", since = "1.0.0")] impl<T: ?Sized> !marker::Sync for Rc<T> {}
|
而Arc呢?
1 2 3 4
| #[stable(feature = "rust1", since = "1.0.0")] unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {} #[stable(feature = "rust1", since = "1.0.0")] unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}
|
我们再回头梳理下Rust这里的设计思路:
- 由于thread 的闭包要求我们使用FnOnce,所以我们必须提供所有权给闭包,而不是引用。
- 提供所有权给闭包这里其实分两种情况
- 一种就是单纯的把数据从一个线程move到另一个线程,这种情况其实只要数据实现了Send就可以了。大部分基本的数据结构都默认支持,因为这里不存在任何跨线程的数据共享,内存的回收,生命周期全部都随着move 一并交付给新的线程,和原线程没有关系(对于普通的数据类型而言)。
- 另一种则是存在跨线程共享所有权,对于Rust则意味着必须有引用计数,Rc或者Arc。而Rc明确不支持Sync,Send。从实际实现的角度来看,Rc中也没有任何同步原语。那么Rust将我们的逼到了最后一条路,Arc。
- 对于Arc来说,只要其内部元素T支持Sync + Send,Arc就支持Sync + Send。因此对于大部分自动Send + Sync的类型Arc 就可以实现线程共享。
- Arc和Rc都只能获取不可变引用,换句话说,Arc 只能读。一般配合引用计数的是内部可变性,然而Cell,RefCell都标记了!Sync 所以这条路也走不通。那如何提供跨线程的内部可变性呢?
下篇文章我们将讨论标准库的sync包。