从一个例子引入
执行cargo new demo
在Cargo.toml
中加入:
1 | [dependencies] |
main.rs中:
1 | type E = Box<dyn std::error::Error>; |
这段代码https访问了此博客。展示了Rust异步的最基本使用。reqwest::get("https://www.fenix0.com")
是async fn。而Rust async fn是惰性的。所以async fn首先要知道的是,调用async fn不会真正执行。
在reqwest
中 get函数的定义如下:
1 | pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> { |
由于async的语法糖的存在,上面的这段代码等价于:
1 | pub fn get<T: IntoUrl>(url: T) -> impl Future<Output = crate::Result<Response>> { |
所以这段async函数并不能直接返回Response。那应该咋办呢?
目前有两种办法能执行async函数。
- await/直接调用poll
- 将future提交给Runtime
await只能在另一个async函数中使用,所以最外层的async 函数必须用方法2.
1 | fn main() -> Result<(), E> { |
实际上 这两种写法脱糖后也是相同的。
1 |
过程宏展开后也是上面一样部署tokio Runtime。最终,在block_on
中传入我们的reqwest::get
也能执行Future。
那么到底什么是Future?
介绍
定位
Future位于核心库非标准库。支持no_std环境。
- 对于用户来说,用户面向标准库,crates编程。标准库提供Future API。用户需要做的就是配置Runtime,调用crates的async 函数并await他们。由于await只能在async中调用,那么必然存在最外层的async函数无法await,必须将其交付给Runtime执行。
- 从crates的角度来看,它并不关心用户底层使用什么运行时,它只通过标准Future API暴露接口。至于要如何处置此Future,等待其执行完毕还是跟其他Future并行执行还是其他操作,都是用户决定。
- 从运行时的角度来看,它是真正负责执行的执行器。对用户屏蔽线程模型和调度细节,负责去轮训用户提交的Future,并给出返回值。
概念
来看下Future trait长啥样
1 | pub trait Future { |
poll
Future本质是Pollable,可轮询。说人话:实现Future的struct需要能被poll,每次poll要么返回Ready,表示轮询完毕,出结果了;要么返回Pending,表示仍需轮询。
从语义上来讲:
Future返回Pending后将再次被Poll,Future返回Ready后代表此Future已成功返回。
Future的文档中提及:
1 | Once a future has finished, clients should not `poll` it again. |
无论是异步crates的开发者还是Runtime的开发者都应遵守的规则,poll到Ready后就不应再次poll,否则将违反其语义。
futures(注意多了个s)工具包中的Fuse
在处理这类问题时可能会用到。
wake
Future的文档中如此描述poll:
1 | When a future is not ready yet, `poll` returns `Poll::Pending` and stores a clone of the [`Waker`] copied from the current [`Context`]. |
这里提到了一个重要的机制wake。如果我们的Future是socket读写,我们肯定是不希望在socket读写的过程中poll被密集且无效的调用导致cpu空转。
1 | The `poll` function is not called repeatedly in a tight loop -- instead,it should only be called when the future indicates that it is ready to make progress (by calling `wake()`). |
Runtime的设计应符合这里的描述,不应密集的调用poll,只有在可以取得进展(make progress)时才能调用poll。换句话说,对于正确实现的Runtime,调用一次poll后就不会再调用了,直到可以取得进展。
这里可以证明:
1 | use std::{future::Future}; |
注释掉 cx.waker().wake_by_ref();
发现程序并没有快速poll 10次结束。打开注释则能正常打印。
1 | [src/main.rs:9] i = 10 |
上面的例子可以看到,对于我们的poll函数,Runtime只会执行一次,除非调用了wake,否则永远不会再执行。再处理完Future中的工作可以更新状态后,需手动调用wake。比如对于tokio::time::Sleep
,tokio Runtime中维护了自己的时间轮体系,所以调用tokio的sleep并不会走syscall futex之类的阻塞当前线程。因此此类Future将waker丢给内部,不需要我们手动唤醒。
1 | this.inner().state.poll(cx.waker()) |
同理,假设我们Future实现linux平台epoll时,epoll wait返回后也需将对应有可读事件的socket唤醒。当然这些细节tokio都为我们封装好了,上层业务只需用tokio的tcpstream代替标准库的tcpstream即可。
总结
本章我们认识什么是Future,初步了解Future的基本原理,下一篇我们将探讨Future和Rust异步的使用细节。