上一篇我们聊到了Future的基本原理,本篇我们将探讨一些Future相关的设计模式。
我们先看下官方提供的futures工具包。https://crates.io/crates/futures,这是Rust官方出品的,Rust异步生态基础设施工具。包含了诸如多future流控,channel,lock,Stream等非常方便的功能,甚至还包含一个简单实现的Runtime,许多异步crates都会基于此crate开发。
这里我们就不再引入tokio了。
1 2
| [dependencies] futures="0.3.15"
|
我们分module来看
1. executor
这里包含一个内置的Runtime,线程池,由于后续的学习可能都要用到它,所以我们先来看下它。
block_on
在当前线程内阻塞执行future
一个最简单的例子
1 2 3 4 5 6 7
| type E = Box<dyn std::error::Error>; use futures::future; fn main() -> Result<(), E> { let a = future::ready(1); assert_eq!(futures::executor::block_on(a), 1); Ok(()) }
|
block_on的实现思路
首先执行enter给当前线程打个标记,避免别的线程调度。后面就是开始循环poll,poll一次就park住,等待被唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T { let _enter = enter().expect( "cannot execute `LocalPool` executor from within \ another executor", );
CURRENT_THREAD_NOTIFY.with(|thread_notify| { let waker = waker_ref(thread_notify); let mut cx = Context::from_waker(&waker); loop { if let Poll::Ready(t) = f(&mut cx) { return t; } while !thread_notify.unparked.swap(false, Ordering::Acquire) { thread::park(); } } }) }
|
使用场景就是在一些简单测试场景可以用来阻塞poll最外层的future。
LocalPool
单线程池,只应用来做纯IO操作。
1 2 3 4 5 6 7 8 9 10 11 12 13
| use futures::{executor::LocalPool, task::LocalSpawnExt};
type E = Box<dyn std::error::Error>; fn main() -> Result<(), E> { let mut l = LocalPool::new(); let s = l.spawner(); s.spawn_local(async { println!("1"); () })?; s.spawn_local(async { println!("2"); () })?; s.spawn_local(async { println!("3"); () })?; s.spawn_local(async { println!("4"); () })?; l.run(); Ok(()) }
|
用法大概如此。run函数会运行任务池中所有的task。
ThreadPool
线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| use std::time::Duration;
use futures::executor::*; type E = Box<dyn std::error::Error>; fn main() -> Result<(), E> { let t = ThreadPoolBuilder::new() .name_prefix("mythread") .stack_size(1024) .after_start(|i| println!("{} thread started", i)) .before_stop(|i| println!("{} thread stop", i)) .create()?;
t.spawn_ok(async {println!("task1")}); t.spawn_ok(async {println!("task2")}); t.spawn_ok(async {println!("task3")});
std::thread::sleep(Duration::from_secs(1)); Ok(()) }
|
future
Future模块专注于Future的组合,流控等
ready
这个函数作用是将传入的T封装为一个Future,此Future被poll时会立刻返回包含T的Ready,并支持Future的熔断。
1 2 3
| pub fn ready<T>(t: T) -> Ready<T> { assert_future::<T, _>(Ready(Some(t))) }
|
简单用法
1 2 3 4 5 6 7
| use futures::{executor::*, future::ready}; type E = Box<dyn std::error::Error>; fn main() -> Result<(), E> { let a = block_on(ready(1)); assert_eq!(a,1); Ok(()) }
|
ready!
宏
提取Poll的结果,如果Ready则提取;如果Pending则依然Pending。
1 2 3 4 5 6 7 8 9
| #[macro_export] macro_rules! ready { ($e:expr $(,)?) => { match $e { $crate::task::Poll::Ready(t) => t, $crate::task::Poll::Pending => return $crate::task::Poll::Pending, } }; }
|
join
将多个Future合并一起poll,如果所有future都Ready就返回Ready,并将所有future的output组装为:
1 2 3 4 5 6 7 8 9 10 11
| use futures::{ executor::*, future::{join}, }; type E = Box<dyn std::error::Error>; fn main() -> Result<(), E> { let j = join(futures::future::ready(1),futures::future::ready(2)); let f = block_on(j); assert_eq!(f, (1, 2)); Ok(()) }
|
join!
宏与join函数的区别在于 join!宏会帮我们把await完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| use futures::{ future::{ready}, join, executor::block_on, }; type E = Box<dyn std::error::Error>; fn main() -> Result<(), E> { block_on(all()); Ok(()) }
async fn all() { let j = join!(ready(1), ready(2)); assert_eq!(j, (1, 2)); }
|
join一般使用场景是我们需要多个IO型任务并发执行,并阻塞等待其执行完成。
select
并行执行多个任务,并当某个任务先执行完毕后取消其他任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| use futures::{ future::{ready, select}, executor::block_on, }; type E = Box<dyn std::error::Error>; fn main() -> Result<(), E> { let p = select(ready(1), ready(2)); dbg!(block_on(p)); Ok(()) }
---------- output: [src/main.rs:8] block_on(p) = Left( ( 1, Ready( Some( 2, ), ), ), )
|
这里select 对应的Ouput是 Either
类型:
1 2 3 4 5 6
| pub enum Either<A, B> { Left( A), Right( B), }
|
非左即右,select一定只能有一个返回。
那么select!
宏呢?
select!同样只能用于async fn
中,比如下面的例子其中async block a
传入我们的future,结果赋给a,然后可以用a来执行after block
1 2 3 4 5
| select! { a = { async block a } => { after block a }, b = { async block b } => { after block b }, c = { async block c } => { after block c }, }
|
需要注意的是async block 需要实现FuseFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| use futures::{ executor::block_on, future::{ready}, }; type E = Box<dyn std::error::Error>;
fn main() -> Result<(), E> { block_on(u()); Ok(()) }
async fn u(){ futures::select! { a = ready(1) => println!("a={}",a), b = ready(2) => println!("b={}",b), }; }
|
Fuse
前篇简单说过的,Future Ready后继续调用poll可能造成未知的问题,从语义上来看Future处于Ready确实代表Future已经完成,不应继续poll了。Fuse就是为了应对这个问题出现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| pin_project! { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fuse<Fut> { #[pin] inner: Option<Fut>, } }
impl<Fut: Future> Future for Fuse<Fut> { type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { Some(fut) => { let output = ready!(fut.poll(cx)); self.project().inner.set(None); output } None => return Poll::Pending, }) } }
|
逻辑就是:首先用Option封装我们的目标。poll到ready后再次poll是不会poll到inner的Future的。
需要补充说明的是,只需要use FutureExt
就可以使用fuse了,这是因为:
1 2 3 4 5
| impl<T: ?Sized> FutureExt for T where T: Future {}
pub trait FutureExt: Future { ... }
|
首先FutureExt继承
Future,FutureExt为每个fn提供了默认实现。然后FutureExt
最后为实现过Future
的T实现了,这样只要use了FutureExt,就可以在Future上直接调用FutureExt
的函数。因此我们只需要引入FutureExt就可以直接使用fuse生成Fuse
。