tokio 🔗
核心是为了提高系统的吞吐量,而非延迟。
基于os的epoll/kqueue, io-uring的多路复用io,实现了线程池处理大量io task,提高吞吐率。
Task是基于Future的绿色协程,适合io密集型的场景。
event-driven non-blocking I/O。
Event poll: 没有专门的event poll线程,都是worker线程。 worker线程优先执行task。
当没有任务可执行时, 才会在park中poll events。那event poll线程可能会有饿死的问题。poll
event 也是有任务时每个61次会在执行一次,任务会为空也执行。
epoll_wait: 等待监听的fd事件, 也可以传入timeout超时后返回。
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % GLOBAL_POLL_INTERVAL == 0 {
core = self.park_timeout(core, Some(Duration::from_millis(0)));
// 继续执行task
core.maintenance(&self.worker);
}
core
}
worker线程与全局结构的关系:
忽略 time, process, signal
tokio runtime 🔗
相对独立。thread pool 怎么组织的?
task如何提交的到队列(线程局部队列,全局队列)的?
worker如何工作的? 执行本地的run queue, 61次后执行一次全局队列,空闲时执行poll任务
block task的处理 🔗
/tokio/src/runtime/blocking scheduler.rs无逻辑, block task 不会调度。
// tokio/src/runtime/blocking/pool.rs
// 入参数不是Impl Future, 是closure
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = Handle::current();
rt.spawn_blocking(func)
}
使用的例子 🔗
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8888").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0;1024];
loop {
let n = socket.read(&mut but).await?;
if n == 0 {return;}
socket.write_all(&buf[0..n]).await?;
}
});
}
}
// 主线程 block on
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async{
...
});
}
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;
// `get_unpark()` should not return a Result
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// async main 不ready就park
self.park();
}
}
runtime的初始化过程 🔗
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
// io driver
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
// 创建线程池
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
}
Core 一个worker的数据, run queue, park
Remote steal其他core中run queue的handler
Shared 所有worker的共享数据, 保存Remote, global run queue 作为scheduler的一部分
Worker 包含Core, Shared放到launch里
Inject: global task queue
steal时 run queue的steal head和real-head(本线程和steal的线程), tail(仅本线程修改)
异步程序中的mutex 🔗
std::sync::Mutex: 不能跨await, std::sync::MutexGuard不能在线程之间共享。
非阻塞io 🔗
tokio/src/io 线程在做io时若发生阻塞时,不让当前的线程阻塞,而是把task挂起.如果IOready了,waker再把相应的task放入run queue里。
async_fd.rs创建Asyncfd是先将阻塞的fd注册到mio封装的os epoll对象上, 然后返回。以此Asyncfd供后续的异步io操作。
pub struct AsyncFd<T: AsRawFd> {
registration: Registration,
inner: Option<T>,
}
mio 🔗
//通过宏的条件编译,来控制使用epoll or kqueue
#[cfg(target_os = "linux")]
pub use linux::{Event, Registrator, Selector, TcpStream};
io-uring 🔗
io-uring与epoll不同, 不需要拿到event后再去手动syscall读写数据。
由两个ring组成, submit queue, complete queue