mpsc
mpsc(mutli-producer,single-consumer)多生产者,单消费者的channel。
用例
1 2 3 4 5 6 7 8 9 10 11
| use std::sync::mpsc; use std::thread;
fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { tx.send(1).unwrap(); });
println!("receive {}", rx.recv().unwrap()); }
|
channel函数返回元组,分别包含了sender和receiver,其中sender实现了Clone,所以可以被Clone后move到线程闭包中,receiver没有实现clone,所以不能Clone出多个实例,正符合mpsc的语义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| use std::{ sync::mpsc::{self, channel}, thread, };
fn main() { let (sender, r) = channel::<i32>(); thread::scope(|s| { for i in 0..10 { let sender = sender.clone(); s.spawn(move || { sender.send(i).unwrap(); }); } }); while let Ok(s) = r.recv() { println!("recv:{}",s); } } ------------------------------ recv:0 recv:1 recv:2 recv:4 recv:3 recv:5 recv:6 recv:7 recv:8 recv:9
|
执行上面这段代码发现并不会退出,显然主线程在执行recv时卡住。按理说我们的10个sender全部发送数据完成了,receiver不应停止接受吗?这是因为主线程的sender还没被回收。我们在主线程加上drop看看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| fn main() { let (sender, r) = channel::<i32>(); thread::scope(|s| { for i in 0..10 { let sender = sender.clone(); s.spawn(move || { sender.send(i).unwrap(); }); } }); drop(sender); while let Ok(s) = r.recv() { println!("recv:{}",s); } }
|
此时这段代码可以正确结束。证明我们的理论:当所有sender都被drop后调用recv会返回None。我们可以认为不会有新的消息接受到了,receiver可以释放了。
Receiver迭代器
Receiver实现了IntoIter,因此可以支持for循环的消费型遍历:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| pub struct IntoIter<T> { rx: Receiver<T>, } impl<T> Iterator for IntoIter<T> { type Item = T; fn next(&mut self) -> Option<T> { self.rx.recv().ok() } } impl<T> IntoIterator for Receiver<T> { type Item = T; type IntoIter = IntoIter<T>;
fn into_iter(self) -> IntoIter<T> { IntoIter { rx: self } } }
|
同理,提前drop主线程的sender,可以在其他sender全部释放后,结束iterator的遍历:
下面是例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| use std::{ sync::mpsc::{self, channel}, thread::{self, sleep}, time::Duration, };
fn main() { let (sender, r) = channel::<i32>(); thread::scope(|s| { for i in 0..10 { let sender = sender.clone(); s.spawn(move || { sender.send(i).unwrap(); }); } }); drop(sender); for ele in r { println!("recv:{}",ele); } }
|
同步队列
上面提到的channel()
提供的是无界队列,即channel的容量无限大(仅受内存限制)。此外还有有界队列sync_channel
此时我们的用例就不用scope来阻塞join结果了,我们从主线程消费receiver同时sleep1秒,可以证明发送端也由于消费端的阻塞而产生了阻塞:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| fn main() { let (sender, r) = sync_channel::<i32>(2); for i in 0..10 { let sender = sender.clone(); thread::spawn(move || { sender.send(i).unwrap(); println!("send:{}", i); }); }; drop(sender); for ele in r { println!("recv:{}", ele); sleep(Duration::from_secs(1)); } }
|
如何关闭
Sender/Receiver的正确模式是先关闭sender,然后receiver在消费完队列中未处理完成的消息后,会自动返回None。此时就可以正确地释放receiver。这个模式和while let和消费型遍历配合的非常好。
如果:sender发送时receiver已关闭,由于Sender 调用send后返回的是Result:如果sender发送时,receiver已关闭,将导致receiver返回SendError
而不是Ok
横向对比
由于标准库队列只支持mpsc,一些开源crate支持mpmc;此外某些场景下,一些开源crate比标准库拥有更好的性能,这个后面会单独把crossbeam拎出来分析。