藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

在异步环境下使用被同步的 Read/Write Trait 限定的库

在找 websocket 的 Rust 实现的时候,翻源码过程中发现了这样的库,协议实现中不包含任何倾向,只限定 Read/Write trait,然后将协议实现的库作为核心来包装同步或者异步的使用库,这样,必然要遇到一个问题,如何将 AsyncRead/AsyncWrite 转换成 Read/Write 从而符合协议实现的需求,这里有两个库可以关注一下,写法大同小异,但本质没有区别:tokio-tungstenitetokio-native-tls

这两个库内部都有一个结构叫 AllowSt<S>, 用来实现 async 到 sync 的转换,并且结构大致相同,内部是一个 async stream 加上 wakers handle,这其实就是 Rust 异步生态的核心部分了,理解了 waker 的作用理论上可以自己写出来这样的东西。第一次看见的时候着实有点好奇,当翻过源码实现之后,实际上就很清晰了。代码大致如下:

struct AllowStd<S> {
    async_stream: S,
    read_waker: task::AtomicWaker,
    write_waker: task::AtomicWaker,
}

impl<S> AllowStd<S>
where
    S: Unpin,
{
    fn with_context<F, R>(&mut self, kind: ContextWaker, f: F) -> Poll<std::io::Result<R>>
    where
        F: FnOnce(&mut Context<'_>, Pin<&mut S>) -> Poll<std::io::Result<R>>,
    {
        trace!("{}:{} AllowStd.with_context", file!(), line!());
        let waker = match kind {
            ContextWaker::Read => task::waker_ref(&self.read_waker),
            ContextWaker::Write => task::waker_ref(&self.write_waker),
        };
        let mut context = task::Context::from_waker(&waker);
        f(&mut context, Pin::new(&mut self.async_stream))
    }

    pub(crate) fn get_mut(&mut self) -> &mut S {
        &mut self.inner
    }

    pub(crate) fn get_ref(&self) -> &S {
        &self.inner
    }
}

impl<S> Read for AllowStd<S>
where
    S: AsyncRead + Unpin,
{
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        trace!("{}:{} Read.read", file!(), line!());
        let mut buf = ReadBuf::new(buf);
        match self.with_context(ContextWaker::Read, |ctx, stream| {
            trace!(
                "{}:{} Read.with_context read -> poll_read",
                file!(),
                line!()
            );
            stream.poll_read(ctx, &mut buf)
        }) {
            Poll::Ready(Ok(_)) => Ok(buf.filled().len()),
            Poll::Ready(Err(err)) => Err(err),
            Poll::Pending => Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)),
        }
    }
}

这里会有两个 waker, 一个用于 可写入唤醒,一个用于 可读出唤醒,对于可读写的 async stream 是必要的,因为存在两个操作并不同时属于一个 task 的可能性,如果该结构对外不暴露,并且内部只属于一个 task,那也可以像 tokio-native-tls 一样,只存一个 context 的原始指针,因为是一致的 task,所以无论哪种唤醒都将执行同一个 task。

犹记得,在几年前,重点使用 Python 项目的时候,对于第三方库的使用,从来是只看说明文档,不看源码,上来就动手糊自己需要的东西,非常信任已经存在多年的 Python 生态,但还是会碰到不少坑爹的行为,尤其是库对 utf-8 编码的支持,天坑之一。而进入 Rust 社区之后,因为本身进入时间非常早,生态还不完善,有相当大一批人是玩票性质写个库玩玩就扔社区上去了,我个人非常讨厌这种行为,但没有任何约束能够管理这样的行为,在选择第三方库的时候,很大程度上会靠看源码这样的操作去进行,然后这几年,我真的是看了不少别人库的代码,有些实现感觉相当具有想象力,有些完全就是垃圾,甚至垃圾都不如。慢慢看多了之后,自己写代码的能力也在不断提高,感觉效果很好,在选择库的时候也会相对更慎重一点,生产环境不容有失啊。

这篇拖了超级无敌久,大概一个月了吧,距离 tentacle 正式支持 websocket 协议,就不知道 wasm 的兼容要不要也拖这么久,自己真是无能啊

评论区

加载更多

登录后评论