藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

Tokio 源码阅读笔记 — 源码来了

最近 Tokio PR 好快,threadpool 已经更新了,但是 carllerche 说当前的唤醒机制不是很理想,等后面修复。并且,其实 thread pool 的 RFC 早几个月就已经在 issue 列表里面了,目前 Rust 生态中,很多底层库实际上是没有标准的,但是却有事实实现标准,异步生态,官方以 Future 为核心,runtime 其实可以有任意的实现,但是直到今年年初,社区内只有实现的事实标准 tokio,后面因为 Future 升级,强行分叉了一个 romio 出来,作为验证项目来做,实际上真正可用的还是 tokio。我们是否需要有一个不一样的实现,是否需要一个不一样的调度作为另一种或者说更多的选择呢?统一的优势是生态内的库是通用的,多样化与之相反,带来的是另一种体验。

在我近一年的观察中,我看到了一些奇怪的现象,我发现有些库本来是 tokio 生态中的一员,后来不知道因为什么原因,退出了这个生态,用 mio 重写了该项目,比如 https://github.com/ustulation/p2phttps://github.com/maidsafe/crust。这只是现象,更多的内在问题我并不清楚,我在使用 tokio 的过程中确实碰到过问题,emmm,先不说这个了,我们接着上一节进行读源码操作。

正经内容

上一小节我们说到 Future 本质上是一个就绪通知机制,接下来又有几个问题:

  1. current() 或者说 context.wake() 是如何注册进去的
  2. tokio 是如何调度执行一个 Future 的
  3. 为什么 tokio 强调自定义 Future 不应该有阻塞

Task(wake) 如何注册

demo 中获取 Task 是非常简单的,只需要调用 current() 就可以了,但是,这个函数里面是什么呢,其实它是从一个 thread_local 变量中拿出了之前注册的 handle,这个 handle 关联了 task 任务,所以,只需要 notify 一下,执行器就能知道,有一个任务已经就绪,进入任务队列。

那么,这东西怎么注册呢?在哪注册呢?

注册代码:

0.1

let mut g = Guard(fut, true);

let ret =
g.0.as_mut()
	.unwrap()
	.poll_future_notify(unpark, self as *const _ as usize);

0.3

let mut g = Guard(fut, true);

let mut waker = arc_waker::waker(Arc::new(Waker {
    task: me.clone(),
    pool: pool.clone(),
}));

let mut cx = Context::from_waker(&mut waker);

let ret = g.0.as_mut().unwrap().as_mut().poll(&mut cx);

那么,它们的区别在哪呢?

0.1 版本下,task 存储在了 thread local 上,而 0.3 是通过 context 上下文进行传递的。thread local 的注册,可以跟踪 poll_future_notify 这个函数进行,最后到了 futures 库 的 set 函数上,thread_local 变量名是 CURRENT_TASK。这些地方,大量使用了裸指针,可以说是一个 unsafe 的经典教程,有空的话,大家可以跟代码学习学习。

tokio threadpool 如何调度 Future

我发现,这个问题描述起来比较麻烦,我尝试先用文字,再贴代码,最后用一个示例去说明这个问题。

文字描述

tokio 的 thread pool 是一个 worker steal 的工作线程池,它内部的 worker 是通过 index 动态确定的,并不是一个固定的分配,固定的是 index 的上限,也就是 worker 的数量。

work steal 是怎么实现的呢,实际上,它的实现依赖于 crossbeam 中的 InjectorWorkerStealer 等无锁结构,同时 tokio 本身还实现了无锁的 stack,用来标记 worker 的状态,还有大量的状态标记,用来确认任务状态、交易池状态、线程状态等等。

Pool 中,有两种任务队列:

那么,一个 Future 的传入到底经过哪几个流程呢,大致上来说,是下面几个流程:

  1. Future 进入全局任务队列,包装成 Task
  2. 查看当前 worker 是否空闲、是否需要启动/唤醒 worker
  3. 某个 worker 接到任务,开始执行 poll -> Not Ready
  4. worker 空闲,开始偷取任务执行
  5. Task 底层资源 Ready,Notify Pool
  6. Task 查看自身分配的 worker,确认进入 全局队列或者 某个对应的 worker 队列
  7. 某个 worker 接到 Task,执行 poll -> Ready
  8. Task 完成,未完成的话,重复 3 操作

那么如何理解 worker 是动态指定的呢,简单描述:

首先,pool 中的线程目前是 lazy 启动的,但是任务队列是创建时初始化完成的,也就是说,可能会有任务队列大于正在运行的 worker 线程的情况;

其次,worker 线程并不是只有 work 一个状态,它还有一个状态叫 blocking,这个状态相当于当前线程失去了 worker 的获取任务的能力,并且要将多余的任务退回全局任务队列,去执行一个用户指定的可能 blocking 当前线程的任务,在执行完成后,当前线程退出。

代码

收到新任务

pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
    debug_assert_eq!(*self, **pool);

    trace!("    -> submit external");

    self.queue.push(task);
    self.signal_work(pool);
}

开启新 worker

资源就序,notify

偷取队友任务

fn try_run_task(&self, notify: &Arc<Notifier>) -> bool {
    if self.try_run_owned_task(notify) {
        return true;
    }

    self.try_steal_task(notify)
}

偷取全局任务

match self.pool.queue.steal_batch(&self.entry().worker) {
    Steal::Success(()) => {
        self.pool.signal_work(&self.pool);
        break;
    }
    Steal::Empty => break,
    Steal::Retry => {}
}

示例

我在 tokio 源码中加了一些 debug 信息,再次运行了上一节的 demo 代码,输出的 debug 信息如下,让我们来用日志感受一下 Future 任务在 tokio Runtime 中的行踪:


[2019-06-29T09:52:00Z tokio_threadpool::sender] get a future task
[2019-06-29T09:52:00Z tokio_threadpool::pool] spawn a worker thread
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] run task
poll start
poll notready
[2019-06-29T09:52:00Z tokio_threadpool::task]     -> not ready
[tokio/tokio-threadpool/src/worker/mod.rs:474] run = Idle
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
notify task
[2019-06-29T09:52:05Z tokio_threadpool::notifier] notify: 139748102566240
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] run task
poll start
poll ready
[2019-06-29T09:52:05Z tokio_threadpool::task]     -> task complete
[tokio/tokio-threadpool/src/worker/mod.rs:474] run = Complete
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::pool] worker thread exit
[2019-06-29T09:52:05Z tokio_threadpool::shutdown] shutdown future
finished

我们重点关注那个 notify task 之后,tokio 内部发生了什么,很明显日志中马上输出了一个奇怪的信息 notify: 139748102566240 这就是我们传入的 Test 任务的 task id,也是一个指针地址,通过该指针,tokio 就能够还原 Task 的状态,并推入任务池,线程池中的 worker 线程就会接收到任务并执行了。

为什么 tokio 强调自定义 Future 不应该有阻塞

tokio 线程池中只有 worker 线程有能力响应任务分发,也就是说,工作线程是有上限的,响应 reactor 信号的能力是有限的。

正常的 Future 任务,在资源不足时,会放弃当前线程的执行权,返回 Not Ready,这样的任务再多,也不会有响应不及时的问题,但是,如果自定义 Future 有阻塞,那会导致工作线程堵塞,进而导致响应能力下降,任务无法及时处理,也就失去了众多异步任务同时处理的能力。

那么,如果有无法避免的阻塞任务需要执行该怎么办,这时候,上文也说过,worker 线程可以退化成 blocking 线程,把偷取的任务退回给全局任务队列,并执行 blocking 任务,执行完成后退出。这时候,退化的 blocking 线程已经失去了偷取任务的能力,并且 pool 会及时将失去的 worker 线程补满,blocking 不会堵塞其他任务的执行,是一个不错的解决方案。

但是,目前的实现中,也有需要优化的地方:

小节

今天的量有点大,有兴趣的人可以慢慢消化,跟着读源码,在适配过程中 notify 的实现被删除了,因为最新的 context 已经不需要这个实现了。

评论区

加载更多

登录后评论