Rust异步系列3 IO trait

我们继续讨论之前的几个trait。

  • Read | AsyncRead | AsyncReadExt
  • Write | AsyncWrite | AsyncWriteExt
  • Seek | AsyncSeek | AsyncSeekExt
  • BufRead | AsyncBufRead | AsyncBufReadExt

Read,Write,Seek这几个trait部分借用了Posix语义。首先我们从linux man文档里摘出一部分对read的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
SYNOPSIS
#include <unistd.h>

ssize_t read(int fd, void *buf, size_t count);

DESCRIPTION
read() attempts to read up to count bytes from file descriptor fd into
the buffer starting at buf.

On files that support seeking, the read operation commences at the file
offset, and the file offset is incremented by the number of bytes read.
If the file offset is at or past the end of file, no bytes are read,
and read() returns zero.

If count is zero, read() may detect the errors described below. In the
absence of any errors, or if read() does not check for errors, a read()
with a count of 0 returns zero and has no other effects.

According to POSIX.1, if count is greater than SSIZE_MAX, the result is
implementation-defined; see NOTES for the upper limit on Linux.

read函数尝试从fd读取count个字节到从指针buf开始的缓冲区。如果文件支持定位,则每次read从偏移量开始读,每次read会增加其读出字节数的偏移量。对于返回值:

  1. 如果读取成功,则返回实际读到的字节数。这里又有两种情况:一是如果在读完count要求字节之前已经到达文件的末尾,那么实际返回的字节数将 小于count值,但是仍然大于0;二是在读完count要求字节之前,仍然没有到达文件的末尾,这是实际返回的字节数等于要求的count值。
  2. 如果读取时已经到达文件的末尾,则返回0。
  3. 如果出错,则返回-1。

Read Trait

Read Trait核心函数read:

1
fn read(&mut self, buf: &mut [u8]) -> Result<usize>;

其他函数都包含默认实现,并且其都是对read的封装,所以我们这里都不再讨论。
而Read trait的read的描述是

1
2
3
4
5
6
Pull some bytes from this source into the specified buffer, returning
how many bytes were read.

This function does not provide any guarantees about whether it blocks
waiting for data, but if an object needs to block for a read and cannot,
it will typically signal this via an [`Err`] return value.

将一些字节写到特定的缓冲区,返回其字节数。read不保证会阻塞等待数据,但是一旦不能阻塞会抛出Err错误。对于Tcp链接的场景,调用read将从内核缓冲区复制数据到我们给出的缓冲区。如果读完所有数据read将会阻塞直到内核缓冲区收到新的数据。

但是这个Read并不会返回-1,read trait这里是一个usize,从trait定义的层面要求实现者必须使用Result的方式返回错误,并不需要用户通过-1来甄别是否读取成功,这里是跟POSIX语义的一个区别。

Write trait

1
fn write(&mut self, buf: &[u8]) -> Result<usize>;

这里是write的描述

1
2
3
4
5
6
7
8
9
10
Write a buffer into this writer, returning how many bytes were written.

This function will attempt to write the entire contents of `buf`, but
the entire write might not succeed, or the write may also generate an
error. A call to `write` represents *at most one* attempt to write to
any wrapped object.

Calls to `write` are not guaranteed to block waiting for data to be
written, and a write which would otherwise block can be indicated through
an [`Err`] variant.

跟read类似,如果write阻塞失败将会返回Err,除此之外的情况,对于Tcp链接,只要内核缓冲区满了,write就会阻塞直到有充分空间能让我们复制新的数据。

BufRead

来自标准库
这个trait继承了Read:

1
pub trait BufRead: Read {}

但是大多数情况下我们不会直接使用这个trait,而是使用BufReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
A `BufRead` is a type of `Read`er which has an internal buffer, allowing it
to perform extra ways of reading.

For example, reading line-by-line is inefficient without using a buffer, so
if you want to read by line, you'll need `BufRead`, which includes a
[`read_line`] method as well as a [`lines`] iterator.



If you have something that implements [`Read`], you can use the [`BufReader`
type][`BufReader`] to turn it into a `BufRead`.

For example, [`File`] implements [`Read`], but not `BufRead`.
[`BufReader`] to the rescue!

[`File`]: crate::fs::File
[`read_line`]: BufRead::read_line
[`lines`]: BufRead::lines

BufRead trait的实现通常会在内部包含一个Buffer,这样处理分行的场景会非常方便。我们使用API时只需将支持Read的obj传入BufReader即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use std::io::{self, BufReader};
use std::io::prelude::*;
use std::fs::File;

fn main() -> io::Result<()> {
let f = File::open("foo.txt")?;
let f = BufReader::new(f);

for line in f.lines() {
println!("{}", line.unwrap());
}

Ok(())
}

总的来说,BufRead只是Read的一个封装,用于简化我们行的读取,实际处理读操作的还是Read的实现。

AsyncRead

前面的几个trait都是标准库中所支持的,但是异步类的读写trait,Rust标准库并没有定义,标准库只定义了Future API。对Future的封装由上层完成,上层库会将普通的read封装为异步read。如果将这类封装抽象为一种trait,就是这里的AsyncRead了。

futures库的封装为:

1
2
3
4
5
6
7
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>>;
}

tokio中的封装为:

1
2
3
4
5
6
7
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;
}

由于futures
tokio中是怎么用AsyncRead呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
pub(crate) fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
Read {
reader,
buf,
_pin: PhantomPinned,
}
}

pin_project! {
/// A future which can be used to easily read available number of bytes to fill
/// a buffer.
///
/// Created by the [`read`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Read<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}
}

impl<R> Future for Read<'_, R>
where
R: AsyncRead + Unpin + ?Sized,
{
type Output = io::Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = self.project();
let mut buf = ReadBuf::new(me.buf);
ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}


简单来说tokio::fs::File实现了Read,因此我们调用tokio::fs::File::open() 会返回file对象,此时使用file对象调用read就会走到AsyncRead中,进入一段冗长的状态机处理中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

impl AsyncRead for File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
let inner = me.inner.get_mut();

loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
}

buf.ensure_capacity_for(dst);
let std = me.std.clone();

inner.state = Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
}
Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;

match op {
Operation::Read(Ok(_)) => {
buf.copy_to(dst);
inner.state = Idle(Some(buf));
return Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());

inner.state = Idle(Some(buf));
return Ready(Err(e));
}
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
continue;
}
Operation::Write(Err(e)) => {
assert!(inner.last_write_err.is_none());
inner.last_write_err = Some(e.kind());
inner.state = Idle(Some(buf));
}
Operation::Seek(result) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
if let Ok(pos) = result {
inner.pos = pos;
}
continue;
}
}
}
}
}
}
}

总结下,其中Read struct实现了Future API,因此,在异步运行时poll时会调用到AsyncRead中的pollread函数,AsyncRead的实现就随之一起转起来了。

此外,Read和AsyncRead的一个重要区别是,Read trait中包含很多default function,这些都是对read的封装,在AsyncRead中,他们被抽取到AsyncReadExt中作为工具集。必须use AsyncReadExt才能使用read_to_end之类的api。同理还有AsyncWrite和AsyncWriteExt,AsyncBufRead和AsyncBufReadExt。

AsyncBufRead

这个trait则是BufRead的异步版,用于支持行读取,当然需要这个功能也要导入 extension trait。

1
2
pub trait AsyncBufReadExt: AsyncBufRead {
}