Rust异步系列2 Futures工具包(上)

上一篇我们聊到了Future的基本原理,本篇我们将探讨一些Future相关的设计模式。

我们先看下官方提供的futures工具包。https://crates.io/crates/futures,这是Rust官方出品的,Rust异步生态基础设施工具。包含了诸如多future流控,channel,lock,Stream等非常方便的功能,甚至还包含一个简单实现的Runtime,许多异步crates都会基于此crate开发。

这里我们就不再引入tokio了。

1
2
[dependencies]
futures="0.3.15"

我们分module来看

1. executor

这里包含一个内置的Runtime,线程池,由于后续的学习可能都要用到它,所以我们先来看下它。

block_on

在当前线程内阻塞执行future
一个最简单的例子

1
2
3
4
5
6
7
type E = Box<dyn std::error::Error>;
use futures::future;
fn main() -> Result<(), E> {
let a = future::ready(1);
assert_eq!(futures::executor::block_on(a), 1);
Ok(())
}

block_on的实现思路

首先执行enter给当前线程打个标记,避免别的线程调度。后面就是开始循环poll,poll一次就park住,等待被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
while !thread_notify.unparked.swap(false, Ordering::Acquire) {
thread::park();
}
}
})
}

使用场景就是在一些简单测试场景可以用来阻塞poll最外层的future。

LocalPool

单线程池,只应用来做纯IO操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
use futures::{executor::LocalPool, task::LocalSpawnExt};

type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> {
let mut l = LocalPool::new();
let s = l.spawner();
s.spawn_local(async { println!("1"); () })?;
s.spawn_local(async { println!("2"); () })?;
s.spawn_local(async { println!("3"); () })?;
s.spawn_local(async { println!("4"); () })?;
l.run();
Ok(())
}

用法大概如此。run函数会运行任务池中所有的task。

ThreadPool

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

use std::time::Duration;

use futures::executor::*;
type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> {
let t = ThreadPoolBuilder::new()
.name_prefix("mythread")
.stack_size(1024)
.after_start(|i| println!("{} thread started", i))
.before_stop(|i| println!("{} thread stop", i))
.create()?;

t.spawn_ok(async {println!("task1")});
t.spawn_ok(async {println!("task2")});
t.spawn_ok(async {println!("task3")});

std::thread::sleep(Duration::from_secs(1));
Ok(())
}

future

Future模块专注于Future的组合,流控等

ready

这个函数作用是将传入的T封装为一个Future,此Future被poll时会立刻返回包含T的Ready,并支持Future的熔断。

1
2
3
pub fn ready<T>(t: T) -> Ready<T> {
assert_future::<T, _>(Ready(Some(t)))
}

简单用法

1
2
3
4
5
6
7
use futures::{executor::*, future::ready};
type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> {
let a = block_on(ready(1));
assert_eq!(a,1);
Ok(())
}

ready!

提取Poll的结果,如果Ready则提取;如果Pending则依然Pending。

1
2
3
4
5
6
7
8
9
#[macro_export]
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
$crate::task::Poll::Ready(t) => t,
$crate::task::Poll::Pending => return $crate::task::Poll::Pending,
}
};
}

join

将多个Future合并一起poll,如果所有future都Ready就返回Ready,并将所有future的output组装为:

1
2
3
4
5
6
7
8
9
10
11
use futures::{
executor::*,
future::{join},
};
type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> {
let j = join(futures::future::ready(1),futures::future::ready(2));
let f = block_on(j);
assert_eq!(f, (1, 2));
Ok(())
}

join! 宏与join函数的区别在于 join!宏会帮我们把await完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use futures::{
future::{ready},
join, executor::block_on,
};
type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> {
block_on(all());
Ok(())
}

async fn all() {
let j = join!(ready(1), ready(2));
assert_eq!(j, (1, 2));
}

join一般使用场景是我们需要多个IO型任务并发执行,并阻塞等待其执行完成。

select

并行执行多个任务,并当某个任务先执行完毕后取消其他任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use futures::{
future::{ready, select}, executor::block_on,
};
type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> {
let p = select(ready(1), ready(2));
dbg!(block_on(p));
Ok(())
}

----------
output:
[src/main.rs:8] block_on(p) = Left(
(
1,
Ready(
Some(
2,
),
),
),
)

这里select 对应的Ouput是 Either 类型:

1
2
3
4
5
6
pub enum Either<A, B> {
/// First branch of the type
Left(/* #[pin] */ A),
/// Second branch of the type
Right(/* #[pin] */ B),
}

非左即右,select一定只能有一个返回。

那么select! 宏呢?

select!同样只能用于async fn 中,比如下面的例子其中async block a传入我们的future,结果赋给a,然后可以用a来执行after block

1
2
3
4
5
select! {
a = { async block a } => { after block a },
b = { async block b } => { after block b },
c = { async block c } => { after block c },
}

需要注意的是async block 需要实现FuseFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use futures::{
executor::block_on,
future::{ready},
};
type E = Box<dyn std::error::Error>;

fn main() -> Result<(), E> {
block_on(u());
Ok(())
}

async fn u(){
futures::select! {
a = ready(1) => println!("a={}",a),
b = ready(2) => println!("b={}",b),
};
}

Fuse

前篇简单说过的,Future Ready后继续调用poll可能造成未知的问题,从语义上来看Future处于Ready确实代表Future已经完成,不应继续poll了。Fuse就是为了应对这个问题出现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pin_project! {
/// Future for the [`fuse`](super::FutureExt::fuse) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Fuse<Fut> {
#[pin]
inner: Option<Fut>,
}
}

impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
Poll::Ready(match self.as_mut().project().inner.as_pin_mut() {
Some(fut) => {
let output = ready!(fut.poll(cx));
self.project().inner.set(None);
output
}
None => return Poll::Pending,
})
}
}

逻辑就是:首先用Option封装我们的目标。poll到ready后再次poll是不会poll到inner的Future的。

需要补充说明的是,只需要use FutureExt就可以使用fuse了,这是因为:

1
2
3
4
5
impl<T: ?Sized> FutureExt for T where T: Future {}

pub trait FutureExt: Future {
...
}

首先FutureExt继承 Future,FutureExt为每个fn提供了默认实现。然后FutureExt最后为实现过Future的T实现了,这样只要use了FutureExt,就可以在Future上直接调用FutureExt 的函数。因此我们只需要引入FutureExt就可以直接使用fuse生成Fuse