Rust异步系列3 Futures工具包(下)

lock

mutex

直接在异步编程中使用std::sync::Mutex使用不当,可能导致Runtime被阻塞,甚至死锁。
这里是一个死锁的案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#[tokio::main]
async fn main() {
let mtx = Mutex::new(0);

tokio::join!(work(&mtx), work(&mtx));

println!("{}", *mtx.lock().unwrap());
}

async fn work(mtx: &Mutex<i32>) {
println!("lock");
{
let mut v = mtx.lock().unwrap();
println!("locked");
// slow redis network request
sleep(Duration::from_millis(100)).await;
*v += 1;
}
println!("unlock")
}

这里使用std::sync::Mutex将死锁 ,使用futures::lock::Mutex 则不会死锁。
futues::lock::Mutex 的特点是实现了Future,支持await,跨await的lock不会导致死锁。
然而并不是所有情况下都应使用实现了Future的锁,例如在需要高频访问时,使用parking_lot是比较好的选择。

bilock

需打开相关feature。专门用于做两个owner的锁,常用于像TcpStream这样允许将读写拆成两端的场景。
一般使用不是非常广泛,这里就不介绍了。

channel

跟lock类似,在异步环境中直接使用std channel 可能阻塞Runtime。
标准库的channel只提供mpsc模式(多生产者,单消费者)。
futures提供

  • mpsc
  • oneshot模式 单生产者单消费,一次只能发送一条消息

tokio提供的更丰富:

  • mpsc
  • oneshot
  • broadcast 多生产者,多消费者,其中每一条发送的消息都可以被所有接收者收到,因此是广播
  • watch 单生产者,多消费者,只保存一条最新的消息,因此接收者只能看到最近的一条消息,例如,这种模式适用于配置文件变化的监听

对比标准库

futures的API并非async,这类channel需要在自行poll Ready后在send,比较适合作为某些中间件的组件,不适合直接作为用户的API。tokio则提供了send recv的async接口用起来会比较方便。

oneshot

一次性channel。

对于sender来说send将直接消费掉channel

1
2
3
4
5
impl<T> Sender<T> {
pub fn send(self, t: T) -> Result<(), T> {
self.inner.send(t)
}
}

recv则压根没暴露出来,Receiver则直接实现了FusedFuture,await后也将消费掉Receiver

文档中给出的oneshot的用法案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use futures::channel::oneshot;
use std::{thread, time::Duration};

let (sender, receiver) = oneshot::channel::<i32>();


thread::spawn(|| {
println!("THREAD: sleeping zzz...");
thread::sleep(Duration::from_millis(1000));
println!("THREAD: i'm awake! sending.");
sender.send(3).unwrap();
});

println!("MAIN: doing some useful stuff");

futures::executor::block_on(async {
println!("MAIN: waiting for msg...");
println!("MAIN: got: {:?}", receiver.await);
});

mpsc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#[tokio::main]
async fn main() {
use std::{thread, time::Duration};

let (mut sender, mut receiver) = futures::channel::mpsc::channel::<i32>(1);

thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
sender.start_send(3);
});

println!("MAIN: doing some useful stuff");

futures::executor::block_on(async {
println!("MAIN: waiting for msg...");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("MAIN: got: {:?}", receiver.try_next());
});
}

IO

std::io 的封装,避免阻塞Runtime。定义了4 trait ,对应的std的ReadWriteSeekBufRead

  • Read | AsyncRead | AsyncReadExt
  • Write | AsyncWrite | AsyncWriteExt
  • Seek | AsyncSeek | AsyncSeekExt
  • BufRead | AsyncBufRead | AsyncBufReadExt

WIP