--- 摄于 2017 年 9 月 藏川线前段
最近 Tokio PR 好快,threadpool 已经更新了,但是 carllerche 说当前的唤醒机制不是很理想,等后面修复。并且,其实 thread pool 的 RFC 早几个月就已经在 issue 列表里面了,目前 Rust 生态中,很多底层库实际上是没有标准的,但是却有事实实现标准,异步生态,官方以 Future 为核心,runtime 其实可以有任意的实现,但是直到今年年初,社区内只有实现的事实标准 tokio,后面因为 Future 升级,强行分叉了一个 romio 出来,作为验证项目来做,实际上真正可用的还是 tokio。我们是否需要有一个不一样的实现,是否需要一个不一样的调度作为另一种或者说更多的选择呢?统一的优势是生态内的库是通用的,多样化与之相反,带来的是另一种体验。
在我近一年的观察中,我看到了一些奇怪的现象,我发现有些库本来是 tokio 生态中的一员,后来不知道因为什么原因,退出了这个生态,用 mio 重写了该项目,比如 https://github.com/ustulation/p2p、https://github.com/maidsafe/crust。这只是现象,更多的内在问题我并不清楚,我在使用 tokio 的过程中确实碰到过问题,emmm,先不说这个了,我们接着上一节进行读源码操作。
上一小节我们说到 Future 本质上是一个就绪通知机制,接下来又有几个问题:
current()
或者说 context.wake()
是如何注册进去的demo 中获取 Task 是非常简单的,只需要调用 current()
就可以了,但是,这个函数里面是什么呢,其实它是从一个 thread_local 变量中拿出了之前注册的 handle,这个 handle 关联了 task 任务,所以,只需要 notify 一下,执行器就能知道,有一个任务已经就绪,进入任务队列。
那么,这东西怎么注册呢?在哪注册呢?
注册代码:
0.1:
let mut g = Guard(fut, true);
let ret =
g.0.as_mut()
.unwrap()
.poll_future_notify(unpark, self as *const _ as usize);
0.3:
let mut g = Guard(fut, true);
let mut waker = arc_waker::waker(Arc::new(Waker {
task: me.clone(),
pool: pool.clone(),
}));
let mut cx = Context::from_waker(&mut waker);
let ret = g.0.as_mut().unwrap().as_mut().poll(&mut cx);
那么,它们的区别在哪呢?
0.1 版本下,task 存储在了 thread local 上,而 0.3 是通过 context 上下文进行传递的。thread local 的注册,可以跟踪 poll_future_notify
这个函数进行,最后到了 futures 库 的 set
函数上,thread_local 变量名是 CURRENT_TASK
。这些地方,大量使用了裸指针,可以说是一个 unsafe 的经典教程,有空的话,大家可以跟代码学习学习。
我发现,这个问题描述起来比较麻烦,我尝试先用文字,再贴代码,最后用一个示例去说明这个问题。
tokio 的 thread pool 是一个 worker steal 的工作线程池,它内部的 worker 是通过 index 动态确定的,并不是一个固定的分配,固定的是 index 的上限,也就是 worker 的数量。
work steal 是怎么实现的呢,实际上,它的实现依赖于 crossbeam 中的 Injector
、Worker
、Stealer
等无锁结构,同时 tokio 本身还实现了无锁的 stack,用来标记 worker 的状态,还有大量的状态标记,用来确认任务状态、交易池状态、线程状态等等。
Pool
中,有两种任务队列:
那么,一个 Future 的传入到底经过哪几个流程呢,大致上来说,是下面几个流程:
那么如何理解 worker 是动态指定的呢,简单描述:
首先,pool 中的线程目前是 lazy 启动的,但是任务队列是创建时初始化完成的,也就是说,可能会有任务队列大于正在运行的 worker 线程的情况;
其次,worker 线程并不是只有 work 一个状态,它还有一个状态叫 blocking,这个状态相当于当前线程失去了 worker 的获取任务的能力,并且要将多余的任务退回全局任务队列,去执行一个用户指定的可能 blocking 当前线程的任务,在执行完成后,当前线程退出。
pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
trace!(" -> submit external");
self.queue.push(task);
self.signal_work(pool);
}
fn try_run_task(&self, notify: &Arc<Notifier>) -> bool {
if self.try_run_owned_task(notify) {
return true;
}
self.try_steal_task(notify)
}
match self.pool.queue.steal_batch(&self.entry().worker) {
Steal::Success(()) => {
self.pool.signal_work(&self.pool);
break;
}
Steal::Empty => break,
Steal::Retry => {}
}
我在 tokio 源码中加了一些 debug 信息,再次运行了上一节的 demo 代码,输出的 debug 信息如下,让我们来用日志感受一下 Future 任务在 tokio Runtime 中的行踪:
[2019-06-29T09:52:00Z tokio_threadpool::sender] get a future task
[2019-06-29T09:52:00Z tokio_threadpool::pool] spawn a worker thread
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: false
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] run task
poll start
poll notready
[2019-06-29T09:52:00Z tokio_threadpool::task] -> not ready
[tokio/tokio-threadpool/src/worker/mod.rs:474] run = Idle
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] loop
[2019-06-29T09:52:00Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:00Z tokio_threadpool::worker] global queue empty: true
notify task
[2019-06-29T09:52:05Z tokio_threadpool::notifier] notify: 139748102566240
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::worker] worker WorkerId(7) queue empty: false
[2019-06-29T09:52:05Z tokio_threadpool::worker] global queue empty: true
[2019-06-29T09:52:05Z tokio_threadpool::worker] run task
poll start
poll ready
[2019-06-29T09:52:05Z tokio_threadpool::task] -> task complete
[tokio/tokio-threadpool/src/worker/mod.rs:474] run = Complete
[2019-06-29T09:52:05Z tokio_threadpool::worker] loop
[2019-06-29T09:52:05Z tokio_threadpool::pool] worker thread exit
[2019-06-29T09:52:05Z tokio_threadpool::shutdown] shutdown future
finished
我们重点关注那个 notify task
之后,tokio 内部发生了什么,很明显日志中马上输出了一个奇怪的信息 notify: 139748102566240
这就是我们传入的 Test 任务的 task id,也是一个指针地址,通过该指针,tokio 就能够还原 Task 的状态,并推入任务池,线程池中的 worker 线程就会接收到任务并执行了。
tokio 线程池中只有 worker 线程有能力响应任务分发,也就是说,工作线程是有上限的,响应 reactor 信号的能力是有限的。
正常的 Future 任务,在资源不足时,会放弃当前线程的执行权,返回 Not Ready,这样的任务再多,也不会有响应不及时的问题,但是,如果自定义 Future 有阻塞,那会导致工作线程堵塞,进而导致响应能力下降,任务无法及时处理,也就失去了众多异步任务同时处理的能力。
那么,如果有无法避免的阻塞任务需要执行该怎么办,这时候,上文也说过,worker 线程可以退化成 blocking 线程,把偷取的任务退回给全局任务队列,并执行 blocking 任务,执行完成后退出。这时候,退化的 blocking 线程已经失去了偷取任务的能力,并且 pool 会及时将失去的 worker 线程补满,blocking 不会堵塞其他任务的执行,是一个不错的解决方案。
但是,目前的实现中,也有需要优化的地方:
今天的量有点大,有兴趣的人可以慢慢消化,跟着读源码,在适配过程中 notify 的实现被删除了,因为最新的 context 已经不需要这个实现了。
请登录后评论
评论区
加载更多