Rust异步系列1 认识Future

从一个例子引入

执行cargo new demo
Cargo.toml中加入:

1
2
3
[dependencies]
reqwest="0.11.4"
tokio={ version="1.9.0",features=["full"] }

main.rs中:

1
2
3
4
5
6
7
8
9
10
11
12
13
type E = Box<dyn std::error::Error>;

#[tokio::main]
async fn main() -> Result<(), E> {
let g = reqwest::get("https://www.fenix0.com").await?;
dbg!(g.content_length());
Ok(())
}
-------
output:
[src/main.rs:6] g.content_length() = Some(
28217,
)

这段代码https访问了此博客。展示了Rust异步的最基本使用。reqwest::get("https://www.fenix0.com") 是async fn。而Rust async fn是惰性的。所以async fn首先要知道的是,调用async fn不会真正执行
reqwest中 get函数的定义如下:

1
2
3
4
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
...
}

由于async的语法糖的存在,上面的这段代码等价于:

1
2
3
pub fn get<T: IntoUrl>(url: T) -> impl Future<Output = crate::Result<Response>> {
...
}

所以这段async函数并不能直接返回Response。那应该咋办呢?

目前有两种办法能执行async函数。

  1. await/直接调用poll
  2. 将future提交给Runtime

await只能在另一个async函数中使用,所以最外层的async 函数必须用方法2.

1
2
3
4
5
6
7
8
9
fn main() -> Result<(), E> {
let r = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(reqwest::get("https://www.fenix0.com"));
dbg!(r?.content_length());
Ok(())
}

实际上 这两种写法脱糖后也是相同的。

1
#[tokio::main]

过程宏展开后也是上面一样部署tokio Runtime。最终,在block_on 中传入我们的reqwest::get 也能执行Future。

那么到底什么是Future?

介绍

定位

Future位于核心库非标准库。支持no_std环境。

  • 对于用户来说,用户面向标准库,crates编程。标准库提供Future API。用户需要做的就是配置Runtime,调用crates的async 函数并await他们。由于await只能在async中调用,那么必然存在最外层的async函数无法await,必须将其交付给Runtime执行。
  • 从crates的角度来看,它并不关心用户底层使用什么运行时,它只通过标准Future API暴露接口。至于要如何处置此Future,等待其执行完毕还是跟其他Future并行执行还是其他操作,都是用户决定。
  • 从运行时的角度来看,它是真正负责执行的执行器。对用户屏蔽线程模型和调度细节,负责去轮训用户提交的Future,并给出返回值。

概念

来看下Future trait长啥样

1
2
3
4
5
6
7
8
9
10
pub trait Future {
/// The type of value produced on completion.
#[stable(feature = "futures_api", since = "1.36.0")]
type Output;


#[lang = "poll"]
#[stable(feature = "futures_api", since = "1.36.0")]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

poll

Future本质是Pollable,可轮询。说人话:实现Future的struct需要能被poll,每次poll要么返回Ready,表示轮询完毕,出结果了;要么返回Pending,表示仍需轮询。

从语义上来讲:
Future返回Pending后将再次被Poll,Future返回Ready后代表此Future已成功返回。

Future的文档中提及:

1
Once a future has finished, clients should not `poll` it again.

无论是异步crates的开发者还是Runtime的开发者都应遵守的规则,poll到Ready后就不应再次poll,否则将违反其语义。

futures(注意多了个s)工具包中的Fuse 在处理这类问题时可能会用到。

wake

Future的文档中如此描述poll:

1
2
3
4
When a future is not ready yet, `poll` returns `Poll::Pending` and stores a clone of the [`Waker`] copied from the current [`Context`].
This [`Waker`] is then woken once the future can make progress.
For example, a future waiting for a socket to become
readable would call `.clone()` on the [`Waker`] and store it.

这里提到了一个重要的机制wake。如果我们的Future是socket读写,我们肯定是不希望在socket读写的过程中poll被密集且无效的调用导致cpu空转。

1
The `poll` function is not called repeatedly in a tight loop -- instead,it should only be called when the future indicates that it is ready to make progress (by calling `wake()`).

Runtime的设计应符合这里的描述,不应密集的调用poll,只有在可以取得进展(make progress)时才能调用poll。换句话说,对于正确实现的Runtime,调用一次poll后就不会再调用了,直到可以取得进展

这里可以证明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use std::{future::Future};

type E = Box<dyn std::error::Error>;

#[tokio::main]
async fn main() -> Result<(), E> {
let i = Dummy { idx: 0 }.await;
dbg!(i);
Ok(())
}
#[pin_project::pin_project]
struct Dummy {
idx: i32,
}

impl Future for Dummy {
type Output = i32;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let p = self.project();
if p.idx < &mut 10 {
*p.idx += 1;
// cx.waker().wake_by_ref();
std::task::Poll::Pending
} else {
std::task::Poll::Ready(p.idx.clone())
}
}
}

注释掉 cx.waker().wake_by_ref(); 发现程序并没有快速poll 10次结束。打开注释则能正常打印。

1
[src/main.rs:9] i = 10

上面的例子可以看到,对于我们的poll函数,Runtime只会执行一次,除非调用了wake,否则永远不会再执行。再处理完Future中的工作可以更新状态后,需手动调用wake。比如对于tokio::time::Sleep ,tokio Runtime中维护了自己的时间轮体系,所以调用tokio的sleep并不会走syscall futex之类的阻塞当前线程。因此此类Future将waker丢给内部,不需要我们手动唤醒。

1
this.inner().state.poll(cx.waker())

同理,假设我们Future实现linux平台epoll时,epoll wait返回后也需将对应有可读事件的socket唤醒。当然这些细节tokio都为我们封装好了,上层业务只需用tokio的tcpstream代替标准库的tcpstream即可。

总结

本章我们认识什么是Future,初步了解Future的基本原理,下一篇我们将探讨Future和Rust异步的使用细节。

  • 本文作者: fenix
  • 本文链接: https://fenix0.com/rust-future/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC 许可协议。转载请注明出处!