mutex 上篇我们讨论了跨线程修改数据需要Arc T支持Send + Sync。而Arc只能给出不可变引用的支持,如果要可变引用还需要别的容器。
首先来认识下mutex:
1 2 3 4 #[stable(feature = "rust1" , since = "1.0.0" )] unsafe impl <T: ?Sized + Send > Send for Mutex <T> {}#[stable(feature = "rust1" , since = "1.0.0" )] unsafe impl <T: ?Sized + Send > Sync for Mutex <T> {}
没毛病,两个都实现了。
mutex的构造 1 2 3 4 5 6 7 8 9 10 11 12 pub struct Mutex <T: ?Sized > { inner: sys::MovableMutex, poison: poison::Flag, data: UnsafeCell<T>, } pub struct Mutex { inner: UnsafeCell<libc::pthread_mutex_t>, } pub (crate ) type MovableMutex = LazyBox<Mutex>;
可以简单的看到在笔者的mac平台,这里mutex的实际实现为libc::pthread_mutex_t
。
API lock 调用pthread_mutex的lockapi阻塞线程直到抢到锁,同时返回Result<Guard, PoisonError<Guard>>;
如果加锁成功:返回Guard。
如果另一个已经持有锁的线程突然panic没有正确释放锁,返回锁中毒错误(PoisonError)。
对于加锁成功其返回的MutexGuard
。我们可以看到其实现了deref,那么我们可以直接对这个guard调用支持不可变引用
的函数。
1 2 3 4 5 6 7 8 9 #[stable(feature = "rust1" , since = "1.0.0" )] impl <T: ?Sized > Deref for MutexGuard <'_ , T> { type Target = T; fn deref (&self ) -> &T { unsafe { &*self .lock.data.get () } } }
MutexGuard 如果要进一步解引用,可以使用*
运算符。由于MutexGuard实现了Sync
Send+Sync标记为:
1 2 3 4 #[stable(feature = "rust1" , since = "1.0.0" )] impl <T: ?Sized > !Send for MutexGuard <'_ , T> {}#[stable(feature = "mutexguard" , since = "1.19.0" )] unsafe impl <T: ?Sized + Sync > Sync for MutexGuard <'_ , T> {}
所以解引用后可以支持多线程访问修改数据。
使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 fn main () { let o = Arc::new (Mutex::new (100 )); thread::scope (|s| { for i in 0 ..10 { let o = o.clone (); s.spawn (move || { let mut o = o.lock ().unwrap (); *o += 1 ; println! ("thread:{}: {}" , i, *o); }); } }); println! ("{}" ,o.clone ().lock ().unwrap ()); } ------------------ thread:0 : 101 thread:3 : 102 thread:7 : 103 thread:6 : 104 thread:1 : 105 thread:2 : 106 thread:5 : 107 thread:8 : 108 thread:4 : 109 thread:9 : 110 110
对于MutexGuard存在栈上自动释放就代表Mutex的释放
1 2 3 4 5 6 7 8 9 10 #[stable(feature = "rust1" , since = "1.0.0" )] impl <T: ?Sized > Drop for MutexGuard <'_ , T> { #[inline] fn drop (&mut self ) { unsafe { self .lock.poison.done (&self .poison); self .lock.inner.raw_unlock (); } } }
对于竞争较为激烈,且占用锁的线程持有时间较长的场景,使用mutex是比较合适的,否则应该使用parking_lot
这个crate。
读写锁RWLock 和mutex类似
1 2 3 4 5 pub struct RwLock { inner: UnsafeCell<libc::pthread_rwlock_t>, write_locked: UnsafeCell<bool >, num_readers: AtomicUsize, }
只不过read,write返回两类guard
。
语义上和posix读写锁也是一致的,读读共存,读写互斥,写写互斥。
Arc 从定义来讲是支持多线程的引用计数。作为Rc不能支持多线程的补充。
1 2 3 4 pub struct Arc <T: ?Sized > { ptr: NonNull<ArcInner<T>>, phantom: PhantomData<ArcInner<T>>, }
和Rc一样支持强引用,弱引用
1 2 3 4 5 struct ArcInner <T: ?Sized > { strong: atomic::AtomicUsize, weak: atomic::AtomicUsize, data: T, }
和Rc一样,在Clone时增加引用计数,只不过这里是支持Sync的Atomic计数
1 2 3 4 5 6 7 8 9 10 #[stable(feature = "rust1" , since = "1.0.0" )] impl <T: ?Sized > Clone for Rc <T> { fn clone (&self ) -> Arc<T> { let old_size = self .inner ().strong.fetch_add (1 , Relaxed); if old_size > MAX_REFCOUNT { abort (); } unsafe { Self ::from_inner (self .ptr) } } }
和Rc一样drop减少引用计数:
1 2 3 4 5 6 7 8 9 10 11 fn drop (&mut self ) { if self .inner ().strong.fetch_sub (1 , Release) != 1 { return ; } acquire!(self .inner ().strong); unsafe { self .drop_slow (); } }
原理 前一章thread中提及,FnOnce只接受所有权形式的闭包,这就意味着我们只能将我们需要跨线程共享的数据封装在Arc中才能共享所有权。这也是我们前面的案例中必须使用Arc的原因。
Once 保证只执行一次的锁,常用于做懒汉式初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 fn main () { let o = Arc::new (std::sync::Once::new ()); thread::scope (|s| { for i in 0 ..10 { let i = o.clone (); s.spawn (move || { println! ("thread run ..." ); i.call_once (|| {println! ("called" )}); }); } }); } ----------- thread run ... called thread run ... thread run ... thread run ... thread run ... thread run ... thread run ... thread run ... thread run ... thread run ...
call_once不需要可变引用,直接就可以在Arc内使用。
Condvar 也是继承自libc的pthread_condvar_t
,常用于控制线程间访问顺序。
比如下面这段代码控制10个线程顺序打印0..9
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 use std::{ sync::{ mpsc::{self , channel, sync_channel}, Arc, Condvar, }, thread::{self , sleep}, time::Duration, }; fn main () { let m = Arc::new (std::sync::Mutex::new (0usize )); let cond = Arc::new (Condvar::new ()); thread::scope (|s| { for i in 0 ..10 { let m = m.clone (); let cond = cond.clone (); s.spawn (move || { let mut g = *m.lock ().unwrap (); while g != i { cond.notify_one (); g = *cond.wait (m.lock ().unwrap ()).unwrap (); } println! ("no.{} thread print {}" , i, g); *m.lock ().unwrap () += 1 ; cond.notify_one (); }); } }); }