Rust并发系列3-channel

mpsc

mpsc(mutli-producer,single-consumer)多生产者,单消费者的channel。

用例

1
2
3
4
5
6
7
8
9
10
11
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(1).unwrap();
});

println!("receive {}", rx.recv().unwrap());
}

channel函数返回元组,分别包含了sender和receiver,其中sender实现了Clone,所以可以被Clone后move到线程闭包中,receiver没有实现clone,所以不能Clone出多个实例,正符合mpsc的语义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::{
sync::mpsc::{self, channel},
thread,
};

fn main() {
let (sender, r) = channel::<i32>();
thread::scope(|s| {
for i in 0..10 {
let sender = sender.clone();
s.spawn(move || {
sender.send(i).unwrap();
});
}
});
while let Ok(s) = r.recv() {
println!("recv:{}",s);
}
}
------------------------------
recv:0
recv:1
recv:2
recv:4
recv:3
recv:5
recv:6
recv:7
recv:8
recv:9

执行上面这段代码发现并不会退出,显然主线程在执行recv时卡住。按理说我们的10个sender全部发送数据完成了,receiver不应停止接受吗?这是因为主线程的sender还没被回收。我们在主线程加上drop看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fn main() {
let (sender, r) = channel::<i32>();
thread::scope(|s| {
for i in 0..10 {
let sender = sender.clone();
s.spawn(move || {
sender.send(i).unwrap();
});
}
});
drop(sender);
while let Ok(s) = r.recv() {
println!("recv:{}",s);
}
}

此时这段代码可以正确结束。证明我们的理论:当所有sender都被drop后调用recv会返回None。我们可以认为不会有新的消息接受到了,receiver可以释放了。

Receiver迭代器

Receiver实现了IntoIter,因此可以支持for循环的消费型遍历:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
pub struct IntoIter<T> {
rx: Receiver<T>,
}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}
impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;

fn into_iter(self) -> IntoIter<T> {
IntoIter { rx: self }
}
}

同理,提前drop主线程的sender,可以在其他sender全部释放后,结束iterator的遍历:
下面是例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::{
sync::mpsc::{self, channel},
thread::{self, sleep}, time::Duration,
};

fn main() {
let (sender, r) = channel::<i32>();
thread::scope(|s| {
for i in 0..10 {
let sender = sender.clone();
s.spawn(move || {
sender.send(i).unwrap();
});
}
});
drop(sender);
for ele in r {
println!("recv:{}",ele);
}
}

同步队列

上面提到的channel() 提供的是无界队列,即channel的容量无限大(仅受内存限制)。此外还有有界队列sync_channel

此时我们的用例就不用scope来阻塞join结果了,我们从主线程消费receiver同时sleep1秒,可以证明发送端也由于消费端的阻塞而产生了阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

fn main() {
let (sender, r) = sync_channel::<i32>(2);
for i in 0..10 {
let sender = sender.clone();
thread::spawn(move || {
sender.send(i).unwrap();
println!("send:{}", i);
});
};
drop(sender);
for ele in r {
println!("recv:{}", ele);
sleep(Duration::from_secs(1));
}
}

如何关闭

Sender/Receiver的正确模式是先关闭sender,然后receiver在消费完队列中未处理完成的消息后,会自动返回None。此时就可以正确地释放receiver。这个模式和while let和消费型遍历配合的非常好。

如果:sender发送时receiver已关闭,由于Sender 调用send后返回的是Result:如果sender发送时,receiver已关闭,将导致receiver返回SendError 而不是Ok

横向对比

由于标准库队列只支持mpsc,一些开源crate支持mpmc;此外某些场景下,一些开源crate比标准库拥有更好的性能,这个后面会单独把crossbeam拎出来分析。