Rust并发系列2-Lock

mutex

上篇我们讨论了跨线程修改数据需要Arc T支持Send + Sync。而Arc只能给出不可变引用的支持,如果要可变引用还需要别的容器。

首先来认识下mutex:

1
2
3
4
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}

没毛病,两个都实现了。

mutex的构造

1
2
3
4
5
6
7
8
9
10
11
12
pub struct Mutex<T: ?Sized> {
inner: sys::MovableMutex,
poison: poison::Flag,
data: UnsafeCell<T>,
}


pub struct Mutex {
inner: UnsafeCell<libc::pthread_mutex_t>,
}

pub(crate) type MovableMutex = LazyBox<Mutex>;

可以简单的看到在笔者的mac平台,这里mutex的实际实现为libc::pthread_mutex_t

API

lock

调用pthread_mutex的lockapi阻塞线程直到抢到锁,同时返回Result<Guard, PoisonError<Guard>>;

  1. 如果加锁成功:返回Guard。
  2. 如果另一个已经持有锁的线程突然panic没有正确释放锁,返回锁中毒错误(PoisonError)。

对于加锁成功其返回的MutexGuard。我们可以看到其实现了deref,那么我们可以直接对这个guard调用支持不可变引用的函数。

1
2
3
4
5
6
7
8
9

#[stable(feature = "rust1", since = "1.0.0")]
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}

MutexGuard

如果要进一步解引用,可以使用* 运算符。由于MutexGuard实现了Sync

Send+Sync标记为:

1
2
3
4
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: ?Sized> !Send for MutexGuard<'_, T> {}
#[stable(feature = "mutexguard", since = "1.19.0")]
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}

所以解引用后可以支持多线程访问修改数据。

使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fn main() {
let o = Arc::new(Mutex::new(100));
thread::scope(|s| {
for i in 0..10 {
let o = o.clone();
s.spawn(move || {
let mut o = o.lock().unwrap();
*o += 1;
println!("thread:{}: {}", i, *o);
});
}
});
println!("{}",o.clone().lock().unwrap());
}
------------------
thread:0: 101
thread:3: 102
thread:7: 103
thread:6: 104
thread:1: 105
thread:2: 106
thread:5: 107
thread:8: 108
thread:4: 109
thread:9: 110
110

对于MutexGuard存在栈上自动释放就代表Mutex的释放

1
2
3
4
5
6
7
8
9
10
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
#[inline]
fn drop(&mut self) {
unsafe {
self.lock.poison.done(&self.poison);
self.lock.inner.raw_unlock();
}
}
}

对于竞争较为激烈,且占用锁的线程持有时间较长的场景,使用mutex是比较合适的,否则应该使用parking_lot这个crate。

读写锁RWLock

和mutex类似

1
2
3
4
5
pub struct RwLock {
inner: UnsafeCell<libc::pthread_rwlock_t>,
write_locked: UnsafeCell<bool>, // guarded by the `inner` RwLock
num_readers: AtomicUsize,
}

只不过read,write返回两类guard

语义上和posix读写锁也是一致的,读读共存,读写互斥,写写互斥。

Arc

从定义来讲是支持多线程的引用计数。作为Rc不能支持多线程的补充。

1
2
3
4
pub struct Arc<T: ?Sized> {
ptr: NonNull<ArcInner<T>>,
phantom: PhantomData<ArcInner<T>>,
}

和Rc一样支持强引用,弱引用

1
2
3
4
5
struct ArcInner<T: ?Sized> {
strong: atomic::AtomicUsize,
weak: atomic::AtomicUsize,
data: T,
}

和Rc一样,在Clone时增加引用计数,只不过这里是支持Sync的Atomic计数

1
2
3
4
5
6
7
8
9
10
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: ?Sized> Clone for Rc<T> {
fn clone(&self) -> Arc<T> {
let old_size = self.inner().strong.fetch_add(1, Relaxed);
if old_size > MAX_REFCOUNT {
abort();
}
unsafe { Self::from_inner(self.ptr) }
}
}

和Rc一样drop减少引用计数:

1
2
3
4
5
6
7
8
9
10
11
fn drop(&mut self) {
if self.inner().strong.fetch_sub(1, Release) != 1 {
return;
}
acquire!(self.inner().strong);

unsafe {
self.drop_slow();
}
}

原理

前一章thread中提及,FnOnce只接受所有权形式的闭包,这就意味着我们只能将我们需要跨线程共享的数据封装在Arc中才能共享所有权。这也是我们前面的案例中必须使用Arc的原因。

Once

保证只执行一次的锁,常用于做懒汉式初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fn main() {
let o = Arc::new(std::sync::Once::new());
thread::scope(|s| {
for i in 0..10 {
let i = o.clone();
s.spawn(move || {
println!("thread run ...");
i.call_once(|| {println!("called")});
});
}
});
}
-----------
thread run ...
called
thread run ...
thread run ...
thread run ...
thread run ...
thread run ...
thread run ...
thread run ...
thread run ...
thread run ...

call_once不需要可变引用,直接就可以在Arc内使用。

Condvar

也是继承自libc的pthread_condvar_t ,常用于控制线程间访问顺序。

比如下面这段代码控制10个线程顺序打印0..9

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::{
sync::{
mpsc::{self, channel, sync_channel},
Arc, Condvar,
},
thread::{self, sleep},
time::Duration,
};

fn main() {
let m = Arc::new(std::sync::Mutex::new(0usize));
let cond = Arc::new(Condvar::new());
thread::scope(|s| {
for i in 0..10 {
let m = m.clone();
let cond = cond.clone();
s.spawn(move || {
let mut g = *m.lock().unwrap();
while g != i {
cond.notify_one();
g = *cond.wait(m.lock().unwrap()).unwrap();
}
println!("no.{} thread print {}", i, g);
*m.lock().unwrap() += 1;
cond.notify_one();
});
}
});
}