lock
mutex
直接在异步编程中使用std::sync::Mutex
使用不当,可能导致Runtime被阻塞,甚至死锁。
这里是一个死锁的案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #[tokio::main] async fn main() { let mtx = Mutex::new(0);
tokio::join!(work(&mtx), work(&mtx));
println!("{}", *mtx.lock().unwrap()); }
async fn work(mtx: &Mutex<i32>) { println!("lock"); { let mut v = mtx.lock().unwrap(); println!("locked"); sleep(Duration::from_millis(100)).await; *v += 1; } println!("unlock") }
|
这里使用std::sync::Mutex
将死锁 ,使用futures::lock::Mutex
则不会死锁。
futues::lock::Mutex
的特点是实现了Future,支持await,跨await的lock不会导致死锁。
然而并不是所有情况下都应使用实现了Future的锁,例如在需要高频访问时,使用parking_lot是比较好的选择。
bilock
需打开相关feature。专门用于做两个owner的锁,常用于像TcpStream这样允许将读写拆成两端的场景。
一般使用不是非常广泛,这里就不介绍了。
channel
跟lock类似,在异步环境中直接使用std channel 可能阻塞Runtime。
标准库的channel只提供mpsc模式(多生产者,单消费者)。
futures提供
- mpsc
- oneshot模式 单生产者单消费,一次只能发送一条消息
tokio提供的更丰富:
- mpsc
- oneshot
- broadcast 多生产者,多消费者,其中每一条发送的消息都可以被所有接收者收到,因此是广播
- watch 单生产者,多消费者,只保存一条最新的消息,因此接收者只能看到最近的一条消息,例如,这种模式适用于配置文件变化的监听
对比标准库
futures的API并非async,这类channel需要在自行poll Ready后在send,比较适合作为某些中间件的组件,不适合直接作为用户的API。tokio则提供了send recv的async接口用起来会比较方便。
oneshot
一次性channel。
对于sender来说send将直接消费掉channel
1 2 3 4 5
| impl<T> Sender<T> { pub fn send(self, t: T) -> Result<(), T> { self.inner.send(t) } }
|
recv则压根没暴露出来,Receiver
则直接实现了FusedFuture,await后也将消费掉Receiver
文档中给出的oneshot的用法案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| use futures::channel::oneshot; use std::{thread, time::Duration};
let (sender, receiver) = oneshot::channel::<i32>();
thread::spawn(|| { println!("THREAD: sleeping zzz..."); thread::sleep(Duration::from_millis(1000)); println!("THREAD: i'm awake! sending."); sender.send(3).unwrap(); });
println!("MAIN: doing some useful stuff");
futures::executor::block_on(async { println!("MAIN: waiting for msg..."); println!("MAIN: got: {:?}", receiver.await); });
|
mpsc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| #[tokio::main] async fn main() { use std::{thread, time::Duration};
let (mut sender, mut receiver) = futures::channel::mpsc::channel::<i32>(1);
thread::spawn(move || { thread::sleep(Duration::from_millis(1000)); sender.start_send(3); });
println!("MAIN: doing some useful stuff");
futures::executor::block_on(async { println!("MAIN: waiting for msg..."); tokio::time::sleep(Duration::from_secs(1)).await; println!("MAIN: got: {:?}", receiver.try_next()); }); }
|
IO
对std::io
的封装,避免阻塞Runtime。定义了4 trait ,对应的std的Read
,Write
,Seek
,BufRead
- Read | AsyncRead | AsyncReadExt
- Write | AsyncWrite | AsyncWriteExt
- Seek | AsyncSeek | AsyncSeekExt
- BufRead | AsyncBufRead | AsyncBufReadExt
WIP